久热香蕉在线视频免费_自_午夜福利院中文字幕_欧美精品在线视频中文_欧美人成在线播放网站色

  • <address id="i8pvn"><var id="i8pvn"><center id="i8pvn"></center></var></address>
  • <button id="i8pvn"><acronym id="i8pvn"></acronym></button>
  • 甘肅信息港

    Flink教程:DataStream上的Join操作

    分享到:
     2020-03-28 07:11:25 來源: 閱讀:-G0

    批處理經(jīng)常要解決的問題是將兩個數(shù)據(jù)源做關(guān)聯(lián)Join操作。比如,很多手機(jī)APP都有一個用戶數(shù)據(jù)源User,同時APP會記錄用戶的行為,我們稱之為Behavior,兩個表按照userId來進(jìn)行Join。在流處理場景下,F(xiàn)link也支持了Join,只不過Flink是在一個時間窗口上來進(jìn)行兩個表的Join。

    Join示例圖

    目前,F(xiàn)link支持了兩種Join:Window Join(窗口連接)和Interval Join(時間間隔連接。

    Window Join

    從名字中能猜到,Window Join主要在Flink的窗口上進(jìn)行操作,它將兩個流中落在相同窗口的元素按照某個Key進(jìn)行Join。一個Window Join的大致骨架結(jié)構(gòu)為:

    input1.join(input2)    .where(&lt;KeySelector&gt;)      &lt;- input1使用哪個字段作為Key    .equalTo(&lt;KeySelector&gt;)    &lt;- input2使用哪個字段作為Key    .window(&lt;WindowAssigner&gt;)  &lt;- 指定WindowAssigner    [.trigger(&lt;Trigger&gt;)]      &lt;- 指定Trigger(可選)    [.evictor(&lt;Evictor&gt;)]      &lt;- 指定Evictor(可選)    .apply(&lt;JoinFunction&gt;)     &lt;- 指定JoinFunction

    下圖展示了Join的大致過程。兩個輸入數(shù)據(jù)流先分別按Key進(jìn)行分組,然后將元素劃分到窗口中。窗口的劃分需要使用WindowAssigner來定義,這里可以使用Flink提供的滾動窗口、滑動窗口或會話窗口等默認(rèn)的WindowAssigner。隨后兩個數(shù)據(jù)流中的元素會被分配到各個窗口上,也就是說一個窗口會包含來自兩個數(shù)據(jù)流的元素。相同窗口內(nèi)的數(shù)據(jù)會以INNER JOIN的語義來相互關(guān)聯(lián),形成一個數(shù)據(jù)對。當(dāng)窗口的時間結(jié)束,F(xiàn)link會調(diào)用JoinFunction來對窗口內(nèi)的數(shù)據(jù)對進(jìn)行處理。當(dāng)然,我們也可以使用Trigger或Evictor做一些自定義優(yōu)化,他們的使用方法和普通窗口的使用方法一樣。

    Join的大致流程

    接下來我們重點分析一下兩個數(shù)據(jù)流是如何INNER JOIN的:

    窗口內(nèi)數(shù)據(jù)INNER JOIN示意圖

    一般滴,INNER JOIN只對兩個數(shù)據(jù)源都出現(xiàn)的元素做Join,形成一個數(shù)據(jù)對,即數(shù)據(jù)源input1中的某個元素與數(shù)據(jù)源input2中的所有元素逐個配對。當(dāng)數(shù)據(jù)源某個窗口內(nèi)沒數(shù)據(jù)時,比如圖中的第三個窗口,Join的結(jié)果也是空的。

    class MyJoinFunction extends JoinFunction[(String, Int), (String, Int), String] {  override def join(input1: (String, Int), input2: (String, Int)): String = {    &#34;input 1 :&#34; + input1._2 + &#34;, input 2 :&#34; + input2._2  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val joinResult = input1.join(input2)      .where(i1 =&gt; i1._1)      .equalTo(i2 =&gt; i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyJoinFunction)

    上面的代碼自定義了JoinFunction,并將Join結(jié)果打印出來。無論代碼中演示的滾動窗口,還是滑動窗口或會話窗口,其原理都是一樣的。除了JoinFunction,F(xiàn)link還提供了FlatJoinFunction,其功能是輸出零到多個結(jié)果。

    如果INNER JOIN不能滿足我們的需求,CoGroupFunction提供了更多可自定義的功能。需要注意的是,在調(diào)用時,要寫成input1.coGroup(input2).where(&lt;KeySelector&gt;).equalTo(&lt;KeySelecotr&gt;)。

    class MyCoGroupFunction extends CoGroupFunction[(String, Int), (String, Int), String] {  // 這里的類型是Java的Iterable,需要引用 collection.JavaConverters._ 并轉(zhuǎn)成Scala  override def coGroup(input1: lang.Iterable[(String, Int)], input2: lang.Iterable[(String, Int)], out: Collector[String]): Unit = {    input1.asScala.foreach(element =&gt; out.collect(&#34;input1 :&#34; + element.toString()))    input2.asScala.foreach(element =&gt; out.collect(&#34;input2 :&#34; + element.toString()))  }}val input1: DataStream[(String, Int)] = ...val input2: DataStream[(String, Int)] = ...val coGroupResult = input1.coGroup(input2)      .where(i1 =&gt; i1._1)      .equalTo(i2 =&gt; i2._1)      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))      .apply(new MyCoGroupFunction)

    Interval Join

    與Window Join不同,Interval Join不依賴Flink的WindowAssigner,而是根據(jù)一個時間間隔(Interval)界定時間。Interval需要一個時間下界(lower bound)和上界(upper bound),如果我們將input1和input2進(jìn)行Interval Join,input1中的某個元素為input1.element1,時間戳為input1.element1.ts,那么一個Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在這個時間段內(nèi)的元素將會和input1.element1組成一個數(shù)據(jù)對。用數(shù)學(xué)公式表達(dá)為,凡是符合下面公式input1.element1.ts + lower bound &lt;= input2.elementx.ts &lt;=input1.element1.ts + upper bound的元素使用INNER JOIN語義,兩兩組合在一起。上下界可以是正數(shù)也可以是負(fù)數(shù)。

    注意,目前Flink(1.9)的Interval Join只支持Event Time語義。

    Interval Join示意圖

    下面的代碼展示了如何對兩個數(shù)據(jù)流進(jìn)行Interval Join:

    class MyProcessFunction extends ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String] {  override def processElement(input1: (String, Long, Int),                              input2: (String, Long, Int),                              context: ProcessJoinFunction[(String, Long, Int), (String, Long, Int), String]#Context,                              out: Collector[String]): Unit = {    out.collect(&#34;input 1: &#34; + input1.toString() + &#34;, input 2: &#34; + input2.toString)  }}// 數(shù)據(jù)流有三個字段:(key, 時間戳, 數(shù)值)val input1: DataStream[(String, Long, Int)] = ...val input2: DataStream[(String, Long, Int)] = ...val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .process(new MyProcessFunction)

    默認(rèn)的時間間隔是包含上下界的,我們可以使用.lowerBoundExclusive() 和.upperBoundExclusive來確定是否需要包含上下界。

    val intervalJoinResult = input1.keyBy(_._1)      .intervalJoin(input2.keyBy(_._1))      .between(Time.milliseconds(-5), Time.milliseconds(10))      .upperBoundExclusive()      .lowerBoundExclusive()      .process(new MyProcessFunction)

    Interval Join內(nèi)部是用緩存來存儲所有數(shù)據(jù)的,因此需要注意緩存數(shù)據(jù)不能太大,以免對內(nèi)存造成絕大壓力。

    推薦閱讀:lofree

    文章評價COMMENT

    還可以輸入2000個字

    暫無網(wǎng)友的評論

    意見反饋

    ×
    J