spark-udf
虽然spark.sql.function
中的已经包含了大多数常用的函数,但是总有一些场景是内置函数无法满足要求的,此时就需要使用自定义函数了(UDF)。刚好最近用spark时,scala
,java
,python
轮换着用,因此这里总结一下spark中自定义函数的简单用法。
这里总结了scala
,java
,python
三种接口的DataFrame和sparkSQL的自定义函数定义和使用方法,对于比较复杂的分组自定义函数未涉及,对于这类复杂需求,应该有变通之法吧。
1、pyspark接口的UDF
1.1、在dataframe中使用
# 定义自定义函数
import numpy as np
def log_py(num):
return float(np.log(num))
# 注册自定义函数
log_udf = functions.udf(log_py, FloatType())
# 使用自定义函数
dataframe = dataframe.withColumn(col, log_udf(col))
特别说明:np.log
的返回值类型是numpy.float类型,spark是无法识别的,因此要转换成Python的float类型,因此写成float(np.log(num))
1.2、在sparkSQL中使用
# 定义自定义函数
def is_nulludf(fieldValue, defaultValue):
if fieldValue == None:
return defaultValue
return fieldValue
# 注册自定义函数
spark.udf.register("is_nulludf", is_nulludf)
# 使用自定义函数
spark.sql("select col_name, is_nulludf(col_name) as col_name2 from tble ")
2、scala接口的UDF
2.1、在dataframe中使用
# 定义自定义函数
def add_one(col: Double) = {
col + 1
}
# 注册自定义函数
spark.udf.register("add_one", add_one _)
# 使用自定义函数
import org.apache.spark.sql.functions
dataframe.withColumn("a2", functions.callUDF("add_one", functions.col("a")))
2.2、在sparkSQL中使用
# 定义自定义函数
def strLen(col: String) = {
str.length()
}
# 注册自定义函数
spark.udf.register("strLen", strLen _)
# 使用自定义函数
spark.sql("select name,strLen(name) from table ")
3、Java接口的UDF
3.1、在dataframe中使用
# 自定义并注册自定义函数
import static org.apache.spark.sql.types.DataTypes.DoubleType;
spark.udf().register("toFloat",new UDF1<String, Double>(){
@Override
public Double call(String number) {
return Double.valueOf(number)
}
}}, DoubleType);
# 使用自定义函数
import org.apache.spark.sql.functions;
dataframe = dataframe.withColumn(col, functions.callUDF("toFloat", functions.col(col)));
说明:UDF1的参数顺序表示java-udf函数的输入输出类型,最后面的DoubleType是spark中定义的float类型。
大家看完记得关注点赞.