Flink+ClickHouse 玩转企业级实时大数据开发(完结)

sdfs · · 43 次点击 · · 开始浏览    

 

获课♥》jzit.top/1869/

Flink本地开发快速上手指南

一、环境准备(5分钟搞定)

1. 开发环境要求

  • Java 8/11(推荐JDK 11)

  • Maven 3.0+(管理依赖)

  • IDE选择:IntelliJ IDEA(推荐)或Eclipse

2. 项目初始化

使用Maven原型快速创建:

bash

复制

mvn archetype:generate \  -DarchetypeGroupId=org.apache.flink \  -DarchetypeArtifactId=flink-quickstart-java \  -DarchetypeVersion=1.15.0 \  -DgroupId=com.your.company \  -DartifactId=flink-demo \  -Dversion=1.0 \  -Dpackage=com.your.company.flink \  -DinteractiveMode=false

二、核心API速成

1. 四步编写第一个Flink作业

java

复制

// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 定义数据源(本地集合示例)DataStream<String> text = env.fromElements(    "Flink is awesome",    "Stream processing made easy",    "Real-time analytics");// 3. 转换操作(单词计数)DataStream<Tuple2<String, Integer>> counts = text    .flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {        for (String word : line.split(" ")) {            out.collect(new Tuple2<>(word, 1));        }    })    .keyBy(0)    .sum(1);// 4. 输出结果counts.print();// 执行作业env.execute("WordCount Example");

2. 开发模式对比

三、调试技巧宝典

1. 本地运行配置

java

复制

// 设置并行度(本地开发建议1-2)env.setParallelism(2);// 开启Web UI(本地访问http://localhost:8081)env.enableWebUI();// 设置重启策略(开发时禁用)env.setRestartStrategy(RestartStrategies.noRestart());

2. 数据查看方法

  • 控制台输出dataStream.print()

  • IDE调试工具:在转换操作后设置断点

  • Web UI:查看执行计划和指标

四、常用Connector配置

1. 本地文件源/汇

java

复制

// 读取文本文件DataStream<String> lines = env.readTextFile("input/words.txt");// 写入文本文件(并行度=1时生成单个文件)lines.writeAsText("output/result", FileSystem.WriteMode.OVERWRITE)     .setParallelism(1);

2. Socket流测试

java

复制

// 接收Socket文本流(需先启动nc -lk 9999)DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

五、测试数据生成方案

1. 模拟事件流

java

复制

// 周期性生成事件(开发测试用)DataStream<Event> events = env.addSource(new SourceFunction<Event>() {    private volatile boolean isRunning = true;        @Override    public void run(SourceContext<Event> ctx) throws Exception {        Random random = new Random();        while (isRunning) {            ctx.collect(new Event(                UUID.randomUUID().toString(),                System.currentTimeMillis(),                random.nextInt(100)            ));            Thread.sleep(500); // 控制速率        }    }    @Override    public void cancel() {        isRunning = false;    }});

2. 单元测试框架

java

复制

@Testpublic void testWordCount() throws Exception {    StreamExecutionEnvironment env =         StreamExecutionEnvironment.createLocalEnvironment();        // 测试输入    DataStream<String> input = env.fromElements("hello world", "hello flink");        // 执行转换    DataStream<Tuple2<String, Integer>> result = ...;        // 验证输出    List<Tuple2<String, Integer>> output = new ArrayList<>();    result.addSink(new SinkFunction<>() {        @Override        public void invoke(Tuple2<String, Integer> value) {            output.add(value);        }    });        env.execute();        // 断言检查    assertTrue(output.contains(new Tuple2<>("hello", 2)));}

六、性能调优入门

1. 本地开发调优参数

java

复制

// 设置本地环境参数Configuration conf = new Configuration();conf.setInteger(RestOptions.PORT, 8082); // Web UI端口conf.setString("taskmanager.memory.network.min", "64mb");StreamExecutionEnvironment env =     StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

2. 关键配置项

七、进阶学习路径

  1. 状态管理ValueState/ListState使用

  2. 时间语义:Event Time/Watermark配置

  3. Connector实战:Kafka/MySQL集成

  4. 部署实践:Standalone/YARN模式

开发完成后,可通过以下命令打包:

bash

复制

mvn clean package -Pbuild-jar

生成的JAR包位于target/flink-demo-1.0.jar,可直接提交到集群运行。建议在pom.xml中添加flink-runtime-web依赖,便于本地Web UI调试。

43 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传