Flink本地idea运行环境配置webui
1.添加依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>1.13.6</version><scope>provided</scope></dependency>
2. 代码如下
public class FlinkWithLocalWebui {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setString(RestOptions.BIND_PORT, "8081"); // 设置WebUI端口为8081StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); // 创建带有WebUI的本地流执行环境env.setParallelism(1); // 设置并行度为1DataStream<Map<String, String>> stream = env.addSource(new SourceFunction<Map<String, String>>() {@Overridepublic void run(SourceContext<Map<String, String>> ctx) throws Exception {while (true) {HashMap<String, String> hashMap = new HashMap<>();hashMap.put("ID", new Random().nextInt(3) + 1 + ""); // 随机生成IDhashMap.put("AMT", "1"); // 设置AMT为1System.out.println("生产数据:" + hashMap); // 打印生产的数据ctx.collect(hashMap); // 发射数据Thread.sleep(1000); // 每隔1秒发送一次数据}}@Overridepublic void cancel() {}})// 按照ID字段进行分区.keyBy(new KeySelector<Map<String, String>, String>() {@Overridepublic String getKey(Map<String, String> value) throws Exception {return value.get("ID");}})// 对AMT字段进行累加.reduce(new ReduceFunction<Map<String, String>>() {@Overridepublic Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) throws Exception {HashMap<String, String> hashMap = new HashMap<>();hashMap.put("ID", value1.get("ID"));hashMap.put("AMT", Integer.valueOf(value1.get("AMT")) + Integer.valueOf(value2.get("AMT")) + "");return hashMap;}});// 输出数据流stream.print();// 执行作业并指定作业名称env.execute("job-" + FlinkWithLocalWebui.class.getSimpleName());}
}
//这段代码是一个基于Apache Flink的实时数据处理程序。
//程序创建了一个带有WebUI的本地流执行环境,设置了并行度为1。
//通过自定义的SourceFunction生成随机数据流,数据包含ID和AMT字段,每秒发送一次数据。
//然后对数据流按照ID字段进行分区,并对AMT字段进行累加操作。
//最后,将处理后的数据流打印输出,并执行作业。
//整体流程是一个简单的实时数据处理流水线,用于生成、处理和输出数据流。
3. 执行结果:
生产数据:{AMT=1, ID=3}
{AMT=1, ID=3}
生产数据:{AMT=1, ID=1}
{AMT=1, ID=1}
生产数据:{AMT=1, ID=2}
{AMT=1, ID=2}
...
生产数据:{AMT=1, ID=2}
{AMT=9, ID=2}
生产数据:{AMT=1, ID=3}
{AMT=13, ID=3}
生产数据:{AMT=1, ID=3}
...
4. 访问Webui,截图如下:
地址:http://localhost:8081/#/overview