flink watermark时间用的linux系统当前时间还是用的数据事件里的时间

zhidiantech · · 66 次点击 · · 开始浏览    
--- ### **1. Watermark 的核心作用** Watermark 是 Flink 事件时间(Event Time)处理的核心机制,用于解决 **乱序数据** 和 **延迟数据** 的问题。它的本质是一个**逻辑时钟**,告诉系统“某个事件时间之前的数据应该到齐了”,从而触发窗口计算。 --- ### **2. 时间类型的区别** Flink 支持三种时间语义: | 时间类型 | 描述 | |-----------------|----------------------------------------------------------------------| | **事件时间** | 数据本身携带的时间戳(如日志中的 `event_time` 字段)。 | | **处理时间** | 数据被处理时的系统时间(即 Linux 时间)。 | | **摄入时间** | 数据进入 Flink 的时间(由 Flink 自动生成)。 | **Watermark 仅与事件时间相关**,与处理时间无关。 --- ### **3. Watermark 的生成逻辑** • **步骤 1**:从数据中提取事件时间戳(如 `event_time` 字段)。 ```java DataStream<Event> events = stream.assignTimestampsAndWatermarks( WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.getEventTime()) ); ``` • **步骤 2**:根据事件时间生成 Watermark。 • 例如,设置最大乱序时间为 5 秒,则 Watermark = 当前最大事件时间 - 5 秒。 --- ### **4. 为什么不用系统时间?** • **场景需求**:处理乱序数据时,必须依赖数据本身的时序(事件时间),而非处理速度(系统时间)。 • **示例**: 假设数据中的事件时间是 `12:00:00`,但实际到达 Flink 的时间是 `12:00:10`(系统时间)。 • 如果基于系统时间,窗口会在 `12:00:10` 触发,可能漏掉延迟数据。 • 如果基于事件时间,窗口会在事件时间 `12:00:00` 的 Watermark 到达时触发,确保正确处理延迟数据。 --- ### **5. 特殊情况处理** • **处理时间模式**:如果使用处理时间,Watermark 不会生效(此时窗口基于系统时间触发)。 • **摄入时间模式**:摄入时间由 Flink 自动生成,但仍属于系统时间范畴,Watermark 不适用。 --- ### **6. 验证方法** ```java // 示例:打印事件时间和 Watermark events.process(new ProcessFunction<Event, String>() { @Override public void processElement(Event event, Context ctx, Collector<String> out) { // 事件时间(来自数据) long eventTime = event.getEventTime(); // Watermark 时间(基于事件时间生成) long watermark = ctx.timerService().currentWatermark(); System.out.println("Event Time: " + eventTime + ", Watermark: " + watermark); } }); ``` 输出会显示 Watermark 始终落后于事件时间,与系统时间无关。 --- --- ### **1. Watermark 生成机制** Flink 的 Watermark 生成主要有两种模式: #### **(1) 周期性生成(Periodic Watermarks)** • **触发方式**:通过 **处理时间(Processing Time)的定时器** 周期性触发检查。 • **生成逻辑**: • 系统会每隔一段时间(如 `200ms`,可配置)检查当前数据流中的 **最大事件时间**。 • 根据配置的 **最大乱序时间(maxOutOfOrderness)**,生成新的 Watermark: `Watermark = 当前最大事件时间 - maxOutOfOrderness` • **代码配置**: ```java WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(1), Duration.ofMillis(200)); ``` • `withWatermarkAlignment` 中的 `Duration.ofMillis(200)` 表示检查间隔。 #### **(2) 标记生成(Punctuated Watermarks)** • **触发方式**:仅在特定数据到达时生成 Watermark(如数据中携带了特殊标记)。 • **代码示例**: ```java public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> { @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { if (event.isEndOfBatch()) { // 遇到特定标记时生成 Watermark output.emitWatermark(new Watermark(eventTimestamp)); } } } ``` --- ### **2. Watermark 发送机制** #### **(1) 周期性生成的发送逻辑** • **定时检查,但非定时发送**: 虽然检查是周期性的,但 **是否发送 Watermark 取决于事件时间是否推进**。 • **示例**:如果数据流中事件时间长期未更新(如无新数据),即使定时器触发,也不会生成新的 Watermark。 • **发送频率**:取决于数据流的事件时间进展速度和检查间隔。 #### **(2) 发送到下游** • **异步传播**:生成的 Watermark 会作为特殊事件插入数据流,**广播到下游所有算子**。 • **对齐机制**:下游算子会跟踪 **所有输入通道的 Watermark**,以最小的 Watermark 作为当前时间基准。 --- ### **3. 核心设计思想** • **事件时间驱动**:Watermark 的生成和发送最终由数据中的事件时间决定,而非固定时间间隔。 • **资源优化**:避免无意义的定时发送(如无新数据时),节省计算资源。 • **动态适应**:根据数据流速自动调整 Watermark 生成频率。 --- ### **4. 配置参数** • **检查间隔**:通过 `pipeline.auto-watermark-interval` 参数调整周期性检查间隔(默认 `200ms`): ```shell # flink-conf.yaml pipeline.auto-watermark-interval: 100ms ``` • **最大乱序时间**:根据业务需求设置合理的 `maxOutOfOrderness`(如 `5秒`)。 --- ### **5. 示例场景** 假设数据流的事件时间分布如下(单位:秒): ``` 数据时间戳:10, 11, 12, 13, 14 (最大事件时间=14) ``` • **配置**:`maxOutOfOrderness=2s`,检查间隔 `200ms`。 • **Watermark 生成**: • 每次检查时,生成 `14 - 2 = 12` 作为 Watermark。 • 若后续数据时间戳推进到 `15`,则 Watermark 更新为 `13`。 --- ### **总结** • **没有独立的定时任务**:Watermark 的生成由 **周期性检查触发**,但实际发送取决于事件时间的推进。 • **事件时间是核心**:所有逻辑围绕数据中的事件时间展开,而非系统时间。 • **配置灵活**:可根据数据流速和业务需求调整检查间隔和最大乱序时间。 • **Watermark 时间**:基于数据中的事件时间(如 `event_time` 字段)。 • **系统时间**:仅用于处理时间模式,与 Watermark 无关。 通过依赖事件时间,Flink 能够保证乱序数据处理的准确性,这是流处理系统的核心设计之一。
66 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传