总结起来,复杂事件处理(
CEP
)的流程可以分成三个步骤:
(
1
)定义一个匹配规则
(2)将匹配规则应用到事件流上,检测满足规则的复杂事件
(3)对检测到的复杂事件进行处理,得到结果进行输出
入门
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
把连续3次登录失败的用户查询出来
public class LoginEvent {
public String userId;
public String ipAddress;
public String eventType;
public Long timestamp;
public LoginEvent(String userId, String ipAddress, String eventType, Long
timestamp) {
this.userId = userId;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timestamp = timestamp;
}
public LoginEvent() {
}
@Override
public String toString() {
return "LoginEvent{" +
"userId='" + userId + '\'' +
", ipAddress='" + ipAddress + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
'}';
}
}
应用程序
public class FlinkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 获取登录事件流,并提取时间戳、生成水位线
KeyedStream<LoginEvent, String> stream = env
.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "171.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
)
.assignTimestampsAndWatermarks(
//处理乱序的情况
WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(
new SerializableTimestampAssigner<LoginEvent>() {
@Override
public long extractTimestamp(LoginEvent loginEvent, long l) {
return loginEvent.timestamp;
}
}
)
)
.keyBy(r -> r.userId);
// 1. 定义 Pattern,连续的三个登录失败事件
Pattern<LoginEvent, LoginEvent> pattern = Pattern
.<LoginEvent>begin("first") // 以第一个登录失败事件开始
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("second") // 接着是第二个登录失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("third") // 接着是第三个登录失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
});
// 2. 将 Pattern 应用到流上,检测匹配的复杂事件,得到一个 PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);
// 3. 将匹配到的复杂事件选择出来,然后包装成字符串报警信息输出
patternStream
.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws
Exception {
//根据上面定义的模式的名称依次得到和模式匹配得到的数据过滤出来进行处理
LoginEvent first = map.get("first").get(0);
LoginEvent second = map.get("second").get(0);
LoginEvent third = map.get("third").get(0);
return first.userId + " 连续三次登录失败!登录时间:" +
first.timestamp + ", " + second.timestamp + ", " + third.timestamp;
}
})
.
print("warning");
env.execute();
}
}
输出的结果
warning> user_1 连续三次登录失败!登录时间:2000, 3000, 5000
模式API
个体模式
基本形式
.<LoginEvent > begin("first") // 以第一个登录失败事件开始
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
如果在后面追加规则,那么如下
.next("second") // 接着是第二个登录失败事件
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
量词
⚫ .oneOrMore ()匹配事件出现一次或多次,假设 a 是一个个体模式, a.oneOrMore() 表示可以匹配 1 个或多个 a 的事件组合。我们有时会用 a+ 来简单表示。⚫ .times ( times )匹配事件发生特定次数( times ),例如 a.times(3) 表示 aaa ;⚫ .times ( fromTimes , toTimes )指定匹配事件出现的次数范围,最小次数为 fromTimes ,最大次数为 toTimes 。例如 a.times(2,4) 可以匹配 aa , aaa 和 aaaa 。⚫ .greedy()只能用在循环模式后,使当前循环模式变得“贪心”( greedy ),也就是总是尽可能多地去匹配。例如 a.times(2, 4).greedy() ,如果出现了连续 4 个 a ,那么会直接把 aaaa 检测出来进行处理,其他任意 2 个 a 是不算匹配事件的。⚫ .optional()使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。