通过Spark Sql实现SQL解析
在大数据平台开发过程中,会遇到血缘分析,对SQL解析并进行权限的鉴权,需要提前对SQL进行基本语法校验,这些场景都需要对SQL进行解析。
常用的sql解析工具
- 阿里 Druid:支持的数据库类型不少,但是解析时需要制定数据库类型,并且在使用中,对hive的语法解析版本比较老,兼容性不太好
2.Hive原生sql解析:由于在大数据平台进行业务开发时,开发人员写的SQL并一定是完全符合hive规范的,因为在运行时是先通过spark进行解析的,所以也并不能完全满足需要
3.General SQL Parser(未测试)这款工具在解析的时候也是需要指定数据库
最后通过调研之后,决定还是采用spark原生的sql解析,下面对采用spark sql原生解析进行介绍,并且最终可以做到不依赖spark的任何jar包。
antlr4包依赖
由于spark sql的解析是通过antlr4实现的,所以首先需要添加依赖
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.5.3</version>
</dependency>
拷贝spark sql对应的sql解析代码
在spark源码中找到以下代码,并将这部分代码拷贝到自己的工程中,包名请修改成自己项目包名。
拷贝完之后,在对应的包下新建两个类
- ANTLRNoCaseStringStream
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.IntStream;
public class ANTLRNoCaseStringStream extends ANTLRInputStream {
public ANTLRNoCaseStringStream(String input){
super(input);
}
@Override
public int LA(int i){
int la = super.LA(i);
if (la == 0 || la == IntStream.EOF){
return la;
} else {
return Character.toUpperCase(la);
}
}
}
- MySqlBaseBaseListener 在解析过程中获取需要的数据,此类实现的是获得解析过程中select的表和insert的表,如果需要获取列名等其他的,请参考SqlBaseBaseListener 类中的方法,来获得自己需要的数据。
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class MySqlBaseBaseListener extends SqlBaseBaseListener {
private Map<String, Set<String>> dataBaseTablenameAndOper = new HashMap<>();//用来保存表与操作的对应关系
public Map<String, Set<String>> getDataBaseTablenameAndOper() {
return dataBaseTablenameAndOper;
}
public void enterQuerySpecification(SqlBaseParser.QuerySpecificationContext ctx) {
final SqlBaseParser.QuerySpecificationContext baseCtx = ctx;
ParseTreeWalker queryWalker = new ParseTreeWalker();
queryWalker.walk(new SqlBaseBaseListener() {
public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
if(ctx.table!=null) {
String table = ctx.getText().toLowerCase();
Set<String> oper;
if (dataBaseTablenameAndOper.containsKey(table)) {
oper = dataBaseTablenameAndOper.get(table);
} else {
oper = new HashSet<>();
}
oper.add("SELECT");
dataBaseTablenameAndOper.put(table, oper);
}
}
}, ctx);
}
public void enterInsertInto(SqlBaseParser.InsertIntoContext ctx){
final SqlBaseParser.InsertIntoContext baseCtx = ctx;
ParseTreeWalker queryWalker = new ParseTreeWalker();
final Set<String> simpleTables = new HashSet<String>();
queryWalker.walk(new SqlBaseBaseListener() {
public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
if(ctx.table!=null) {
String table = ctx.getText().toLowerCase();
Set<String> oper;
if (dataBaseTablenameAndOper.containsKey(table)) {
oper = dataBaseTablenameAndOper.get(table);
} else {
oper = new HashSet<>();
}
oper.add("INSERT");
dataBaseTablenameAndOper.put(table, oper);
}
}
}, ctx);
}
}
- 编写sql解析工具类SparkSqlUtil
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import com.luckincoffee.datas.spark.sql.catalyst.parser.*;
import java.util.Map;
import java.util.Set;
public class SparkSqlUtil {
public static Map<String, Set<String>> getDataBaseTablenameAndOper(String sql){
SqlBaseLexer lexer = new SqlBaseLexer(new ANTLRNoCaseStringStream(sql));
CommonTokenStream tokenStream = new CommonTokenStream(lexer);
SqlBaseParser parser = new SqlBaseParser(tokenStream);
ParseTreeWalker walker = new ParseTreeWalker();
MySqlBaseBaseListener mySqlBaseBaseListener = new MySqlBaseBaseListener();
walker.walk(mySqlBaseBaseListener, parser.statement());
return mySqlBaseBaseListener.getDataBaseTablenameAndOper();
}
}
至此通过spark sql进行sql解析就完成了,解析的内容和返回的数据格式,都可以自定义完成。联系邮箱:czmqlgr@163.com