0

I want to act outer join on 2 data streams, and it is better not to put them in a window(I've seen Cogroup always comes with a window).

I tried this:

    val controlStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream1_feat1"),
      (1, "mex2", "stream1_feat2")
    ).keyBy(x => (x._1, x._2))

    val wordStream = Flink.flinkEnv.fromElements(
      (1, "mex1", "stream2_feat1"),
      (1, "mex3", "stream2_feat3")
    ).keyBy(x => (x._1, x._2))

    val filteredStream = controlStream
        .connect(wordStream)
        .flatMap(new ControlFunction)

////////////////////////////////////////////////////////////////////////

class ControlFunction extends RichCoFlatMapFunction[
    (Int, String, String),
    (Int, String, String),
    (Int, String, String, String)] {

    // outer join
    private var state1: ValueState[(Int, String, String)] = _
    private var state2: ValueState[(Int, String, String)] = _

    override def open(parameters: Configuration): Unit = {
      state1 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s1", createTypeInformation[(Int, String, String)]))

      state2 = getRuntimeContext.getState(
        new ValueStateDescriptor[(Int, String, String)]("s2", createTypeInformation[(Int, String, String)]))

    }

    override def flatMap1(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state2Value = state2.value

      if (state2Value != null) {
        println("inside map1 not null")
        state2.clear()
        out.collect((value._1, value._2, value._3, state2Value._3))
      } else {
        println("inside map1 null")
        state1.update(value)
        out.collect((value._1, value._2, value._3, "NA"))
      }
    }

    override def flatMap2(value: (Int, String, String),
                          out: Collector[(Int, String, String, String)]): Unit = {

      val state1Value = state1.value

      if (state1Value != null) {
        println("inside map2 not null")
        state1.clear()
        out.collect((value._1, value._2, state1Value._3, value._3))
      } else {
        println("inside map2 null")
        state2.update(value)
        out.collect((value._1, value._2, "NA", value._3))
      }
    }

  }

Which gave me:

5> (1,mex2,stream1_feat2,NA)
8> (1,mex1,stream1_feat1,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

Where record (1,mex1,stream1_feat1,NA) should not be produced. The result that I want to achieve is an outer join:

5> (1,mex2,stream1_feat2,NA)
2> (1,mex3,NA,stream2_feat3)
8> (1,mex1,stream1_feat1,stream2_feat1)

By getting the print statement, I found that the 2 flapMaps were passed in sequence, which made mex1 produced twice, anyway to resolve this?

Thanks in advance!

1 Answer 1

1

You can't expect a streaming outer join to behave the same way as a batch outer join. A batch outer join can fully scan both input tables, and will only produce output rows containing nulls when matching records do not exist. With a streaming implementation, you can not know whether if by waiting you might eventually receive a matching record.

Because they can not access the future, stream processing applications are often forced to produce as output a stream that contains results that are being updated as more information becomes available.

One thing you could do would be to wait for some time to see if emitting a result containing an NA would be mistake, but eventually you'd have to stop waiting and produce a result.

Note that Flink's Table API has an outer join, but you'll notice it's marked as "Result Updating" for the reasons explained above.

Sign up to request clarification or add additional context in comments.

4 Comments

Hi, thanks for the quick response and sorry for the confusion. I updated my question, and the goal I hope to achieve is an outer join using connect.
I've rewritten my answer to try to be clearer about why want you want isn't possible.
Thanks! This makes a lot sense, and I've taken your answer. So if I want to try coGroup function, is there any example available?
I'm not aware of any examples. There are a few tests in the Flink sources that use CoGroupFunctions, but I don't see any that make for good examples.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.