假设我们作业中有这样一段逻辑stream.map(xxx).filter(_ != null).xxx
,并且map算子有可能返回NULL,你觉得作业运行会抛NPE吗?明明下游有filter not null,不应该出错才对?但实际情况是运行中有可能抛出异常。
1.异常信息
可能抛出的异常信息大致如下:
// 1. 如果map算子返回值类型为Java Tuple
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:111)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
...
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
// 2. 如果map算子返回值类型为Scala Case Class或Scala Tuple
Caused by: java.lang.NullPointerException
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
...
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
// 3. 如果map算子返回值类型为Scala Option
Caused by: scala.MatchError: null
at org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:50)
at org.apache.flink.api.scala.typeutils.OptionSerializer.copy(OptionSerializer.scala:29)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
...
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
2.问题代码
可以看出,上述异常都是StreamMap.processElement
方法抛出的,这个对应了我们代码中的map操作,具体异常信息和map的返回值类型有关;
以返回值为Java/Scala Tuple为例示意一下问题代码:
// java版本
text.map(e -> {
if(xxxx) {
return Tuple2.of("hello", 1);
} else {
// 在某些条件下返回NULL
return null;
}
}).returns(new TypeHint<Tuple2<String, Integer>>(){})
.filter(Objects::nonNull)
.print()
// scala版本
text.map(e => {
if(xxxx) {
("hello", 1)
} else {
// 在某些条件下返回NULL
null
}
})
.filter(_ != null)
.print()
2. 原因分析
如图(这里为了演示故意设置了disableOperatorChaining,一般情况这两个算子会串起来),如果“Map”想要传一个NULL值给下游的“Filter”,那它必须传一个具体的值给下游来表明是NULL(如果什么都不传的话下游根本不知道有数据);那么应该传什么值来表示NULL呢?不同的数据类型实现方法不同,但其本质思路是一样的,就是通过一个标志位来表示是不是NULL,类似 <nullFlag><value>
;
以String为例,Flink中StringSerializer会先写一个int标志位来表示String的长度;如果string == null,则标志位为0;否则的话标志位为string.length() + 1;这样的话NULL就是 <0>
,空字符串就是 <1>
,其他字符串是 <string.length + 1><stringContent>
(当然,这里的标志位还担任着记录字符串长度的职责);
// 核心代码逻辑
public static final void writeString(CharSequence cs, DataOutput out) throws IOException {
if (cs != null) {
// 如果string不为null,则标志位是string的长度加1;
// the length we write is offset by one, because a length of zero indicates a null value
int lenToWrite = cs.length()+1;
// 可以看出最大能够序列化长度为Integer.MAX_VALUE - 1的字符串;
if (lenToWrite < 0) {
throw new IllegalArgumentException("CharSequence is too long.");
}
...
} else {
// 如果string为null,则标志位写0;
out.write(0);
}
}
对于大部分数据类型,使用标志位的好处是可以支持传递NULL值;缺点也很明显,就是浪费了带宽,多了标志位信息的传递。
那为什么还会出现上面的异常呢?那是因为并不是所有类型都支持NULL的,目前我所知的不支持NULL的类型包括Scala Option、Java/Scala Tuple和Scala Case Class;至于为什么不支持NULL,根据我搜到的解释,原因如下:
- Scala Option不支持NULL,是因为Option就是设计来避免NULL的;如果Option类型返回NULL,本身就是个BUG(但奇怪的是Java Optional类型就可以返回NULL,不过Java Optional是通过KryoSerializer序列化的;Scala Option是通过CaseClassSerializer序列化的);
Stephan Ewen: Using null for an Option value is by itself a bug (after all, Option is explicitly designed to avoid null)
- Java/Scala Tuple和Scala Case Class不支持NULL的原因不确定,不过它们是支持把变量设置为NULL的,具体方式见下一章节;
3. 解决方法
- 对于Option类型,用None替代NULL;
- 对于Tuple或Scala Case Class,可以设置变量为NULL;比如
new Tuple2<String, Integer>(null, null)
; - 用flatMap替换map:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
if(xxx) {// 如果生成的数据不为NULL;
out.collect(Tuple2.of("hello", 1));
}
}
}).print()
4. 参考
- Add support for null values to the java api
- NPE from CaseClassSerializer when dealing with null Option field
- Cannot pass objects with null-valued fields to the next operator