Flink入门到实战-阶段八(CEP)_flink cep geteventsforpattern

CSDN博客 · · 475 次点击 · · 开始浏览    
总结起来,复杂事件处理( CEP )的流程可以分成三个步骤:
1 )定义一个匹配规则
(2)将匹配规则应用到事件流上,检测满足规则的复杂事件
(3)对检测到的复杂事件进行处理,得到结果进行输出

入门 

引入依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

把连续3次登录失败的用户查询出来

public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;

    public LoginEvent(String userId, String ipAddress, String eventType, Long
            timestamp) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.timestamp = timestamp;

    }

    public LoginEvent() {
    }

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId='" + userId + '\'' +
                ", ipAddress='" + ipAddress + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

应用程序

public class FlinkApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 获取登录事件流,并提取时间戳、生成水位线
        KeyedStream<LoginEvent, String> stream = env
                .fromElements(
                        new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
                        new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
                        new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
                        new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
                        new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
                        new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
                        new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
                )
                .assignTimestampsAndWatermarks(
                        //处理乱序的情况
                        WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<LoginEvent>() {
                                            @Override
                                            public long extractTimestamp(LoginEvent loginEvent, long l) {
                                                return loginEvent.timestamp;
                                            }
                                        }
                                )
                )
                .keyBy(r -> r.userId);
        // 1. 定义 Pattern,连续的三个登录失败事件
        Pattern<LoginEvent, LoginEvent> pattern = Pattern
                .<LoginEvent>begin("first") // 以第一个登录失败事件开始
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .next("second") // 接着是第二个登录失败事件
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .next("third") // 接着是第三个登录失败事件
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                });
        // 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);

        // 3. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws
                            Exception {
                        //根据上面定义的模式的名称依次得到和模式匹配得到的数据过滤出来进行处理
                        LoginEvent first = map.get("first").get(0);
                        LoginEvent second = map.get("second").get(0);
                        LoginEvent third = map.get("third").get(0);
                        return first.userId + " 连续三次登录失败!登录时间:" +
                                first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
                    }
                })
                        .

                print("warning");
        env.execute();
    }
}

输出的结果

warning> user_1 连续三次登录失败!登录时间:2000, 3000, 5000

模式API

个体模式

基本形式

.<LoginEvent > begin("first") // 以第一个登录失败事件开始
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })

如果在后面追加规则,那么如下

.next("second") // 接着是第二个登录失败事件
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })

量词

.oneOrMore ()
匹配事件出现一次或多次,假设 a 是一个个体模式, a.oneOrMore() 表示可以匹配 1 个或多
a 的事件组合。我们有时会用 a+ 来简单表示。
.times times
匹配事件发生特定次数( times ),例如 a.times(3) 表示 aaa
.times fromTimes toTimes
指定匹配事件出现的次数范围,最小次数为 fromTimes ,最大次数为 toTimes 。例如 a.times(2,
4) 可以匹配 aa aaa aaaa
.greedy()
只能用在循环模式后,使当前循环模式变得“贪心”( greedy ),也就是总是尽可能多地去
匹配。例如 a.times(2, 4).greedy() ,如果出现了连续 4 a ,那么会直接把 aaaa 检测出来进行处
理,其他任意 2 a 是不算匹配事件的。
.optional()
使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。

本文来自:CSDN博客

感谢作者:CSDN博客

查看原文:Flink入门到实战-阶段八(CEP)_flink cep geteventsforpattern

475 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传