0 摘要
在Flink实时流数据处理中,经常用到keyBy算子, 虽然能够大致不差的使用它,实现自己的需求。然而这个算子到底做了什么事情,心里一直没有底。这篇文章算是对keyBy算子稍微深入一点的探究。
1 Spark中的按key分组操作
对于经常使用spark的同学而言,分组操作数据,那是再熟悉不过。比如groupBy, reduceBy, aggregateBy等一系列算子。基本思路都是指定key之后, 将相同key的元素集合到一个集合里面,形成一个新的集合元素,然后对每个key对应的元素集合进行操作
1.1 准备数据
1.2 spark分组操作处理数据
下面的操作以groupBy算子为例,分组后,输出结果数据,观察结果数据结构
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]")
.appName(this.getClass.getSimpleName)
.getOrCreate()
import spark.implicits._
val path = this.getClass.getResource("/data/wc.txt").getPath
spark.sparkContext.textFile(path)
.flatMap(_.split(" "))
.map((_,1))
.groupBy(_._1).foreach(println(_))
}
spark groupBy结果数据
(is,CompactBuffer((is,1)))
(book,CompactBuffer((book,1), (book,1), (book,1)))
(have,CompactBuffer((have,1), (have,1), (have,1)))
(spark,CompactBuffer((spark,1), (spark,1)))
(flink,CompactBuffer((flink,1), (flink,1)))
(hadoop,CompactBuffer((hadoop,1), (hadoop,1), (hadoop,1)))
.....
.....
从结果数据可以看出,spark的分组是将相同的key的元素集合到一个集合类型元素里面,后续操作一个key只需处理该key对应的集合元素即可。
2 Flink中keyBy操作
Flink 的keyBy本质上并不是将相同key的元素集合到一个集合元素里面,而是将相同key的元素散列到一个子任务中,而并不改变原来的元素数据结构。
2.1 flink中的keyBy操作代码
这里用到的数据仍然是上面spark操作用到的数据
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val path = this.getClass.getResource("/data/wc.txt").getPath
val stream = env.readTextFile(path)
.flatMap(_.split(" "))
.map(Word(_))
.keyBy(_.word) //按照word字段分区
stream.process(new KeyedProcessFunction[String,Word, String] {
override def processElement(value: Word, ctx: KeyedProcessFunction[String, Word, String]#Context, out: Collector[String]): Unit = {
val rowkey = ctx.getCurrentKey //获取每一行的key
val rowvalue = value // 获取每一行的value
val output_value = s"key=${rowkey}###value=$rowvalue" //定义新的输出行
out.collect(output_value)
}
}).print()
env.execute("output value")
}
结果数据
19> key=flink###value=Word(flink)
22> key=hadoop###value=Word(hadoop)
4> key=i###value=Word(i)
23> key=is###value=Word(is)
12> key=book###value=Word(book)
4> key=i###value=Word(i)
9> key=just###value=Word(just)
14> key=you###value=Word(you)
17> key=good###value=Word(good)
2> key=spark###value=Word(spark)
21> key=have###value=Word(have)
16> key=a###value=Word(a)
17> key=no###value=Word(no)
12> key=book###value=Word(book)
19> key=flink###value=Word(flink)
22> key=hadoop###value=Word(hadoop)
2> key=it###value=Word(it)
21> key=have###value=Word(have)
22> key=and###value=Word(and)
2> key=spark###value=Word(spark)
12> key=book###value=Word(book)
22> key=but###value=Word(but)
21> key=have###value=Word(have)
22> key=hadoop###value=Word(hadoop)
结果数据是我们在keyBy后自定义的输出类型,但仍然可以看出一个输入元素是对应这一个输出元素的,并且相同key元素对应的子任务id相同,比如key为hadoop时,输出元素还是3个(wc.txt输入文件中有3个hadoop单词),对应的子任务id都是22.
3 结论
Flink中的keyBy不会改变数据的每个元素的数据结构,仅仅时根据指定的key对输入数据重新划分子任务,相同的key对应的元素会被划分到一个子任务当中,这一点恰恰对应spark当中的repartition, 所以不加探究的话,真的难以理清它的本质。深入研究方可豁然开朗。
附录
对应keyBy后的数据处理,我们定义了KeyedProcessFunction 类,并且规定了它的泛型。泛型的含义通过我代码里面的注释,应该也能大致理解。下面给出源代码的定义说明,相信更有助于大家的理解
* @param <K> Type of the key. 输入元素的key
* @param <I> Type of the input elements. 输入元素
* @param <O> Type of the output elements. 处理完后的输出元素
*/
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
....
....
}