antlr4 + spark sql对业务sql进行解析

简书 · · 6339 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

antlr4 + spark sql对业务sql进行解析

通过Spark Sql实现SQL解析

在大数据平台开发过程中,会遇到血缘分析,对SQL解析并进行权限的鉴权,需要提前对SQL进行基本语法校验,这些场景都需要对SQL进行解析。

常用的sql解析工具
  1. 阿里 Druid:支持的数据库类型不少,但是解析时需要制定数据库类型,并且在使用中,对hive的语法解析版本比较老,兼容性不太好
    2.Hive原生sql解析:由于在大数据平台进行业务开发时,开发人员写的SQL并一定是完全符合hive规范的,因为在运行时是先通过spark进行解析的,所以也并不能完全满足需要
    3.General SQL Parser(未测试)这款工具在解析的时候也是需要指定数据库

最后通过调研之后,决定还是采用spark原生的sql解析,下面对采用spark sql原生解析进行介绍,并且最终可以做到不依赖spark的任何jar包。

antlr4包依赖

由于spark sql的解析是通过antlr4实现的,所以首先需要添加依赖

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.5.3</version>
</dependency>

拷贝spark sql对应的sql解析代码

在spark源码中找到以下代码,并将这部分代码拷贝到自己的工程中,包名请修改成自己项目包名。


image.png
拷贝完之后,在对应的包下新建两个类
  1. ANTLRNoCaseStringStream
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.IntStream;

public class ANTLRNoCaseStringStream extends ANTLRInputStream {
   public ANTLRNoCaseStringStream(String input){
        super(input);
   }

    @Override
    public int LA(int i){
        int la = super.LA(i);
        if (la == 0 || la == IntStream.EOF){
            return la;
        } else {
            return Character.toUpperCase(la);
        }
    }
}
  1. MySqlBaseBaseListener 在解析过程中获取需要的数据,此类实现的是获得解析过程中select的表和insert的表,如果需要获取列名等其他的,请参考SqlBaseBaseListener 类中的方法,来获得自己需要的数据。
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class MySqlBaseBaseListener extends SqlBaseBaseListener {
    private Map<String, Set<String>> dataBaseTablenameAndOper = new HashMap<>();//用来保存表与操作的对应关系

    public Map<String, Set<String>> getDataBaseTablenameAndOper() {
        return dataBaseTablenameAndOper;
    }

    public void enterQuerySpecification(SqlBaseParser.QuerySpecificationContext ctx) {
        final SqlBaseParser.QuerySpecificationContext baseCtx = ctx;
        ParseTreeWalker queryWalker = new ParseTreeWalker();
        queryWalker.walk(new SqlBaseBaseListener() {
            public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
                if(ctx.table!=null) {
                    String table = ctx.getText().toLowerCase();
                    Set<String> oper;
                    if (dataBaseTablenameAndOper.containsKey(table)) {
                        oper = dataBaseTablenameAndOper.get(table);
                    } else {
                        oper = new HashSet<>();
                    }
                    oper.add("SELECT");
                    dataBaseTablenameAndOper.put(table, oper);
                }
            }
        }, ctx);
    }

    public void enterInsertInto(SqlBaseParser.InsertIntoContext ctx){
        final SqlBaseParser.InsertIntoContext baseCtx = ctx;
        ParseTreeWalker queryWalker = new ParseTreeWalker();
        final Set<String> simpleTables = new HashSet<String>();
        queryWalker.walk(new SqlBaseBaseListener() {
            public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
                if(ctx.table!=null) {
                    String table = ctx.getText().toLowerCase();
                    Set<String> oper;
                    if (dataBaseTablenameAndOper.containsKey(table)) {
                        oper = dataBaseTablenameAndOper.get(table);
                    } else {
                        oper = new HashSet<>();
                    }
                    oper.add("INSERT");
                    dataBaseTablenameAndOper.put(table, oper);
                }
            }
        }, ctx);
    }
}

  1. 编写sql解析工具类SparkSqlUtil
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import com.luckincoffee.datas.spark.sql.catalyst.parser.*;

import java.util.Map;
import java.util.Set;

public class SparkSqlUtil {
    public static Map<String, Set<String>> getDataBaseTablenameAndOper(String sql){
        SqlBaseLexer lexer = new SqlBaseLexer(new ANTLRNoCaseStringStream(sql));

        CommonTokenStream tokenStream = new CommonTokenStream(lexer);
        SqlBaseParser parser = new SqlBaseParser(tokenStream);
        ParseTreeWalker walker = new ParseTreeWalker();
        MySqlBaseBaseListener mySqlBaseBaseListener = new MySqlBaseBaseListener();

        walker.walk(mySqlBaseBaseListener, parser.statement());

        return mySqlBaseBaseListener.getDataBaseTablenameAndOper();
    }
}

至此通过spark sql进行sql解析就完成了,解析的内容和返回的数据格式,都可以自定义完成。联系邮箱:czmqlgr@163.com

推荐阅读更多精彩内容

本文来自:简书

感谢作者:简书

查看原文:antlr4 + spark sql对业务sql进行解析

6339 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传