Java版Flink使用指南——从RabbitMQ中队列中接入消息流

大纲

  • 创建RabbitMQ队列
  • 新建工程
    • 新增依赖
    • 编码
      • 设置数据源配置
      • 读取、处理数据
      • 完整代码
    • 打包、上传和运行任务
    • 测试
  • 工程代码

在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。

创建RabbitMQ队列

我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。
在这里插入图片描述

后续我们将在后台通过默认交换器,给这个队列新增消息。

新建工程

我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>

编码

设置数据源配置

String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());

读取、处理数据

下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。

final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);

完整代码

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}

打包、上传和运行任务

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试

在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq
在这里插入图片描述
然后使用下面指令可以看到Flink读取到消息并执行了print方法

tail log/flink-*-taskexecutor-*.out

==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/375913.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智能大模型如何助力电商产品经理打造高效的商品工业属性画像

摘要 商品工业属性画像是电商产品经理在进行商品管理、推荐、搜索、广告等业务时的重要依据。通过对商品的工业属性&#xff08;如品类、品牌、规格、功能、风格等&#xff09;的准确识别和标注&#xff0c;可以提高商品的展示效果、匹配度、转化率和用户满意度。然而&#xf…

如何为IP申请SSL证书

目录 以下是如何轻松为IP地址申请SSL证书的详细步骤&#xff1a; 申请IP证书的基本条件&#xff1a; 申请IP SSL证书的方式&#xff1a; 确保网络通信安全的核心要素之一&#xff0c;是有效利用SSL证书来加密数据传输&#xff0c;特别是对于那些直接通过IP地址访问的资源。I…

Java 将图片转base64和base64转图片

工具 Base64 和 图片互转。 导入的依赖 <!-- https://mvnrepository.com/artifact/com.sun.xml.bind/jaxb-impl --><dependency><groupId>com.sun.xml.bind</groupId><artifactId>jaxb-impl</artifactId><version>4.0.5</versi…

【linux】进程间通信(IPC)——匿名管道,命名管道与System V内核方案的共享内存,以及消息队列和信号量的原理概述

目录 ✈必备知识 进程间通信概述 &#x1f525;概述 &#x1f525;必要性 &#x1f525;原理 管道概述 &#x1f525;管道的本质 &#x1f525;管道的相关特性 &#x1f525;管道的同步与互斥机制 匿名管道 &#x1f525;系统调用接口介绍 &#x1f525;内核原理 …

【开发环境】搭建PX4+ROS2+MAVROS2+Simulink+Optitrack实物联合仿真环境

搭建PX4ROS2MAVROS2SimulinkOptiTrack实物联合仿真环境 Ubuntu中的安装过程下载并编译PX4固件代码安装ROS2安装VRPN动捕数据转换ROS2话题库安装VRPN库拉取vrpn_client_ros2节点代码并配置VRPN server参数编译软件包启动vrpn_client_ros2节点重命名话题名 /vrpn/fly/pose 为 /ma…

Mac VSCode 突然闪退、崩溃、打不开了

vscode 1.90.2版本下载&#xff0c;刚上传还在审核中 1、 思路历程 VSCode 作为前端常用开发工具&#xff0c;其重要性就不一一描述了。 所以 VSCode 突然打不开了&#xff0c;真的是让我一脸懵逼。 本来以为问题不大&#xff0c;于是 &#xff1a; 1、重启了一下VSCode 2、…

Python 爬虫:使用打码平台来识别各种验证码:

本课程使用的是 超级鹰 打码平台&#xff0c; 没有账户的请自行注册&#xff01; 超级鹰验证码识别-专业的验证码云端识别服务,让验证码识别更快速、更准确、更强大 使用打码平台来攻破验证码难题&#xff0c; 是很简单容易的&#xff0c; 但是要钱&#xff01; 案例代码及测…

记录些Redis题集(1)

Redis内存淘汰触发条件的相关配置如下&#xff1a; Redis通过配置项maxmemory来设定其允许使用的最大内存容量。当Redis实际占用的内存达到这一阈值时&#xff0c;将触发内存淘汰机制&#xff0c;开始删除部分数据以释放内存空间&#xff0c;防止服务因内存溢出而异常。 Redi…

vitest 单元测试应用与配置

vitest 应用与配置 一、简介 Vitest 旨在将自己定位为 Vite 项目的首选测试框架&#xff0c;即使对于不使用 Vite 的项目也是一个可靠的替代方案。它本身也兼容一些Jest的API用法。 二、安装vitest // npm npm install -D vitest // yarn yarn add -D vitest // pnpm pnpm …

【Linux】vim详解

1.什么是vi/vim? 简单来说&#xff0c;vi是老式的文本编辑器&#xff0c;不过功能已经很齐全了&#xff0c;但是还是有可以进步的地方。vim则可以说是程序开发者的一项很好用的工具&#xff0c;就连 vim的官方网站&#xff08; http://www.vim.org&#xff09;自己也说vim是一…

【公益案例展】华为云X《无尽攀登》——攀登不停,向上而行

‍ 华为云公益案例 本项目案例由华为云投递并参与数据猿与上海大数据联盟联合推出的 #榜样的力量# 《2024中国数据智能产业最具社会责任感企业》榜单/奖项”评选。 大数据产业创新服务媒体 ——聚焦数据 改变商业 夏伯渝&#xff0c;中国无腿登珠峰第一人&#xff0c;一生43年…

语言模型演进:从NLP到LLM的跨越之旅

在人工智能的浩瀚宇宙中&#xff0c;自然语言处理&#xff08;NLP&#xff09;一直是一个充满挑战和机遇的领域。随着技术的发展&#xff0c;我们见证了从传统规则到统计机器学习&#xff0c;再到深度学习和预训练模型的演进。如今&#xff0c;我们站在了大型语言模型&#xff…

免费开源数字人生成工具

使用步骤更是简单到不行&#xff1a; 1. 输入图片&#xff1a;选择你想要生成动态视频的肖像图片。 2. 输入音频&#xff1a;提供与图片匹配的音频文件&#xff0c;EchoMimic会根据音频内容驱动肖像的动态效果。 3. 设置参数&#xff1a;一般保持默认设置即可&#xff0c;当然&…

端到端自动驾驶系列(一):自动驾驶综述解析

端到端自动驾驶系列(一)&#xff1a;自动驾驶综述解析 End-to-end-Autonomous-Driving Abstract Abstract—The autonomous driving community has witnessed a rapid growth in approaches that embrace an end-to-end algorithm framework, utilizing raw sensor input to …

模块化(一)nodejs

模块化 一.模块化的基本概念1.1 什么是模块化1.2 模块化规范 二.Node.js 中的模块化2.1 Node.js 中模块的分类2.2 加载模块2.3 Node.js 中的模块作用域2.4 向外共享模块作用域中的成员 一.模块化的基本概念 1.1 什么是模块化 模块化 是指解决一个 复杂问题 时&#xff0c;自顶…

uni-app 保存号码到通讯录

1、 添加模块 2、添加权限 3、添加策略 Android&#xff1a; "permissionExternalStorage" : {"request" : "none","prompt" : "应用保存运行状态等信息&#xff0c;需要获取读写手机存储&#xff08;系统提示为访问设备上的照片…

安卓14中Zygote初始化流程及源码分析

文章目录 日志抓取结合日志与源码分析systemServer zygote创建时序图一般应用 zygote 创建时序图向 zygote socket 发送数据时序图 本文首发地址 https://h89.cn/archives/298.html 最新更新地址 https://gitee.com/chenjim/chenjimblog 本文主要结合日志和代码看安卓 14 中 Zy…

HarmonyOS ArkUi 字符串<展开/收起>功能

效果图&#xff1a; 官方API&#xff1a; ohos.measure (文本计算) 方式一 measure.measureTextSize 跟方式二使用一样&#xff0c;只是API调用不同&#xff0c;可仔细查看官网方式二 API 12 import { display, promptAction } from kit.ArkUI import { MeasureUtils } fr…

基于蓝牙iBeacon定位技术的商场3D楼层导视软件功能详解与实施效益

在现代商场的繁华与复杂中&#xff0c;寻找目的地往往令人头疼。维小帮3D楼层导视软件以其创新技术&#xff0c;为顾客带来无缝、直观的跨楼层导航体验&#xff0c;让每一次商场消费都成为享受。 商场3D楼层导视软件功能服务 3D多楼层导视地图&#xff0c;商场布局一览无遗 …

【Go】函数的使用

目录 函数返回多个值 init函数和import init函数 main函数 函数的参数 值传递 引用传递&#xff08;指针&#xff09; 函数返回多个值 用法如下&#xff1a; package mainimport ("fmt""strconv" )// 返回多个返回值&#xff0c;无参数名 func Mu…