Flink链接Kafka

一、基于 Flink 的 Kafka 消息生产者

  • Kafka 生产者的创建与配置
    • 代码通过 FlinkKafkaProducer 创建 Kafka 生产者,用于向 Kafka 主题发送消息。
  • Flink 执行环境的配置
    • 配置了 Flink 的检查点机制,确保消息的可靠性,支持"精确一次"的消息交付语义。
  • 模拟数据源
    • 通过 env.fromElements() 方法创建了简单的消息流,发送了三条消息 "a", "b", 和 "c"
package com.example.kafka_flink.service;import com.example.kafka_flink.util.MyNoParalleSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.Properties;
@Service
public class SimpleKafkaProducer {public static void main(String[] args) throws Exception {// 创建 SimpleKafkaProducer 的实例SimpleKafkaProducer kafkaProducer = new SimpleKafkaProducer();// 调用 producer 方法kafkaProducer.producer();}public void producer() throws Exception {// 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "xxxx");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 创建 Kafka 生产者实例,并设置目标主题和序列化模式FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("WJ-TEST",// 使用 SimpleStringSchema 进行字符串序列化new SimpleStringSchema(),properties);// 模拟数据源,生产一些简单的消息,并将消息写入 Kafkaenv.fromElements("a", "b", "c").addSink(producer);// 启动 Flink 作业env.execute("Kafka Producer Job");}
}

二、基于 Flink 的 Kafka 消息消费者

2.1 消费一个Topic

  • 设置 Flink 执行环境

    • 使用 StreamExecutionEnvironment.getExecutionEnvironment() 创建执行环境。
  • 启用检查点机制

    • 调用 env.enableCheckpointing(5000),设置检查点时间间隔为 5 秒。
    • 配置检查点模式为 EXACTLY_ONCE,确保数据一致性。
  • 配置 Kafka 属性

    • 设置 Kafka 服务器地址(bootstrap.servers)。
    • 指定消费组 ID(group.id)。
    • 配置安全协议和认证机制(SASL_PLAINTEXTSCRAM-SHA-512)。
  • 创建 Kafka 消费者

    • 使用 FlinkKafkaConsumer<String> 指定单个 Kafka Topic(如 "WJ-TEST")。
    • 设置消息反序列化方式为 SimpleStringSchema
    • 配置消费者从最早偏移量开始消费(setStartFromEarliest())。
  • 将 Kafka 消费者添加到 Flink 数据流

    • 调用 env.addSource(consumer) 添加 Kafka 消费者作为数据源。
    • 使用 FlatMapFunction 处理消息,将其打印或进一步处理。
  • 启动 Flink 作业

    • 使用 env.execute("start consumer...") 启动 Flink 作业,开始消费 Kafka 的消息流。
 //消费单个topicpublic static void consumerOneTopic() throws Exception {// 设置 Flink 执行环境// 创建一个流处理的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点机制,设置检查点的时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();// 设置 Kafka 集群地址properties.setProperty("bootstrap.servers", "xxxx");// 设置消费组 ID,用于管理消费偏移量properties.setProperty("group.id", "group_test");// 设置安全协议为 SASL_PLAINTEXTproperties.setProperty("security.protocol", "SASL_PLAINTEXT");// 设置 SASL 认证机制为 SCRAM-SHA-512properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");// 配置 SASL 登录模块,包含用户名和密码properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 创建一个 Kafka 消费者实例FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(// 设置要消费的 Kafka 主题名称"WJ-TEST",// 使用 SimpleStringSchema 将 Kafka 的消息反序列化为字符串new SimpleStringSchema(),// 传入 Kafka 的配置属性properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println(s);// 收集消费到的消息,供后续处理collector.collect(s);}});// 启动并执行 Flink 作业,作业名称为 "start consumer..."env.execute("start consumer...");}

生产消息结果:

2.2 消费多个Topic

  • 设置 Flink 执行环境

    • 使用 StreamExecutionEnvironment.getExecutionEnvironment() 创建执行环境。
  • 启用检查点机制

    • 配置检查点模式为 EXACTLY_ONCE,确保数据一致性。
    • 调用 env.enableCheckpointing(5000) 设置检查点时间间隔为 5 秒。
  • 配置 Kafka 属性

    • 设置 Kafka 服务器地址(bootstrap.servers)。
    • 指定消费组 ID(group.id)。
    • 配置安全协议和认证机制(SASL_PLAINTEXTSCRAM-SHA-512)。
  • 定义 Kafka Topic 列表

    • 创建一个 List<String>,添加多个 Kafka Topic 名称(如 "WJ-TEST""KAFKA_TEST_001")。
  • 创建 Kafka 消费者

    • 使用 FlinkKafkaConsumer,传入 Kafka Topic 列表和自定义反序列化器(CustomDeSerializationSchema)。
    • 配置消费者从最早偏移量开始消费(setStartFromEarliest())。
  • 将 Kafka 消费者添加到 Flink 数据流

    • 调用 env.addSource(consumer) 添加 Kafka 消费者作为数据源。
    • 使用 FlatMapFunction 处理消息,打印消息的 Topic、分区、偏移量、键和值,并收集消息值进行进一步处理。
  • 启动 Flink 作业

    • 使用 env.execute("start consumer...") 启动 Flink 作业,开始消费 Kafka 的多个主题消息流。
//消费多个topicpublic static void consumerTopics() throws Exception {// 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "xxxx");properties.setProperty("group.id", "group_test");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 定义需要消费的 Kafka 主题列表List<String> topics = new ArrayList<>();topics.add("WJ-TEST");topics.add("KAFKA_TEST_001");// 使用自定义反序列化器创建 Kafka 消费者实例FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(topics,new CustomDeSerializationSchema(),properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() {@Overridepublic void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println("Topic: " + record.topic() +", Partition: " + record.partition() +", Offset: " + record.offset() +", Key: " + record.key() +", Value: " + record.value());// 收集消费到的消息,供后续处理collector.collect(record.value());}});// 启动并执行 Flink 作业env.execute("start consumer...");}

2.3 消费Topic的总体代码

package com.example.kafka_flink.service;import com.example.kafka_flink.util.CustomDeSerializationSchema;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/*** @author wangjian*/
@Service
public class SimpleKafkaConsumer {public static void main(String[] args) throws Exception {
//         SimpleKafkaConsumer.consumerOneTopic();SimpleKafkaConsumer.consumerTopics();}//消费单个topicpublic static void consumerOneTopic() throws Exception {// 设置 Flink 执行环境// 创建一个流处理的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点机制,设置检查点的时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();// 设置 Kafka 集群地址properties.setProperty("bootstrap.servers", "xxxx");// 设置消费组 ID,用于管理消费偏移量properties.setProperty("group.id", "group_test");// 设置安全协议为 SASL_PLAINTEXTproperties.setProperty("security.protocol", "SASL_PLAINTEXT");// 设置 SASL 认证机制为 SCRAM-SHA-512properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");// 配置 SASL 登录模块,包含用户名和密码properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 创建一个 Kafka 消费者实例FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(// 设置要消费的 Kafka 主题名称"WJ-TEST",// 使用 SimpleStringSchema 将 Kafka 的消息反序列化为字符串new SimpleStringSchema(),// 传入 Kafka 的配置属性properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println(s);// 收集消费到的消息,供后续处理collector.collect(s);}});// 启动并执行 Flink 作业,作业名称为 "start consumer..."env.execute("start consumer...");}//消费多个topicpublic static void consumerTopics() throws Exception {// 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置检查点机制,设置检查点模式为 "Exactly Once"(精确一次)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 启用检查点机制,设置检查点时间间隔为 5000 毫秒(5 秒)env.enableCheckpointing(5000);// 配置 Kafka 属性,包括身份验证信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "xxxx");properties.setProperty("group.id", "group_test");properties.setProperty("security.protocol", "SASL_PLAINTEXT");properties.setProperty("sasl.mechanism", "SCRAM-SHA-512");properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\";");// 定义需要消费的 Kafka 主题列表List<String> topics = new ArrayList<>();topics.add("WJ-TEST");topics.add("KAFKA_TEST_001");// 使用自定义反序列化器创建 Kafka 消费者实例FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(topics,new CustomDeSerializationSchema(),properties);// 设置消费者从 Kafka 的最早偏移量开始消费消息consumer.setStartFromEarliest();// 将 Kafka 消费者作为数据源添加到 Flink 的执行环境中env.addSource(consumer).flatMap(new FlatMapFunction<ConsumerRecord<String, String>, Object>() {@Overridepublic void flatMap(ConsumerRecord<String, String> record, Collector<Object> collector) throws Exception {// 打印消费到的消息内容到控制台System.out.println("Topic: " + record.topic() +", Partition: " + record.partition() +", Offset: " + record.offset() +", Key: " + record.key() +", Value: " + record.value());// 收集消费到的消息,供后续处理collector.collect(record.value());}});// 启动并执行 Flink 作业env.execute("start consumer...");}}

2.4 自定义的 Kafka 反序列化器 (CustomDeSerializationSchema)

实现了一个自定义的 Kafka 反序列化器 (CustomDeSerializationSchema),主要功能是将从 Kafka 中消费到的消息(字节数组格式)解析为包含更多元数据信息的 ConsumerRecord<String, String> 对象。以下是其作用的具体说明:

  • 解析 Kafka 消息

    • 消息的 keyvalue 由字节数组转换为字符串格式,便于后续业务逻辑处理。
    • 同时保留 Kafka 消息的元数据信息(如主题名称 topic、分区号 partition、偏移量 offset)。
  • 扩展 Flink 的 Kafka 数据处理能力

    • 默认的反序列化器只处理消息内容(keyvalue),而该自定义类将消息的元数据(如 topicpartition)也作为输出的一部分,为复杂业务需求提供了更多上下文信息。
  • 控制流数据的结束逻辑

    • 实现了 isEndOfStream 方法,返回 false,表示 Kafka 的数据流是持续的,Flink 不会主动终止数据消费。
  • 定义 Flink 数据类型

    • 使用 getProducedType 方法,明确告诉 Flink 输出的数据类型是 ConsumerRecord<String, String>,便于 Flink 在运行时正确处理流数据。
package com.example.kafka_flink.util;import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.nio.charset.StandardCharsets;/*** @author wangjian*/
public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {// 是否表示流的最后一条元素// 返回 false,表示数据流会源源不断地到来,Flink 不会主动停止消费@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {return false;}// 反序列化方法// 将 Kafka 消息从字节数组转换为 ConsumerRecord<String, String> 类型的数据// 返回的数据不仅包括消息内容(key 和 value),还包括 topic、offset 和 partition 等元数据信息@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {// 检查 key 和 value 是否为 null,避免空指针异常String key = consumerRecord.key() == null ? null : new String(consumerRecord.key(), StandardCharsets.UTF_8);String value = consumerRecord.value() == null ? null : new String(consumerRecord.value(), StandardCharsets.UTF_8);// 构造并返回一个 ConsumerRecord 对象,其中包含反序列化后的 key 和 value,以及其他元数据信息return new ConsumerRecord<>(// Kafka 主题名称consumerRecord.topic(),// 分区号consumerRecord.partition(),// 消息偏移量consumerRecord.offset(),// 消息的 keykey,// 消息的 valuevalue);}// 指定数据的输出类型// 告诉 Flink 消费的 Kafka 数据类型是 ConsumerRecord<String, String>@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}
}

2.5 消费到消息的结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/2500.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows 10 ARM工控主板连接I2S音频芯片

在Windows工控主板应用中&#xff0c;音频功能是一项基本的需求&#xff0c;USB声卡在x86/x64 Windows系统上就可直接免驱使用&#xff0c;但这些USB声卡通常不提供ARM上的Windows系统驱动。本文将介绍如何利用安装在ARM上的Windows工控主板——ESM8400的I2S接口、连接WM8960音…

机器学习实战33-LSTM+随机森林模型在股票价格走势预测与买卖点分类中的应用

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下机器学习实战33-LSTM随机森林模型在股票价格走势预测与买卖点分类中的应用。对于LSTM随机森林模型的融合应用&#xff0c;我们选择股票价格走势预测与买卖点分类作为应用场景。股票市场数据丰富且对投资者具有实际价…

在VS2022中用C++连接MySQL数据库读取数据库乱码问题

1.正确安装mysql 安装之后的配置文件 2.在VS2022中进行相关配置 &#xff08;1&#xff09;右键项目&#xff0c;打开属性 注意是右键项目&#xff0c;不是.cpp文件 &#xff08;2&#xff09;配置属性-> VC目录 -> 包含目录 ->添加头文件路径&#xff08;如图&am…

vue3+ts+element-plus 输入框el-input设置背景颜色

普通情况&#xff1a; 组件内容&#xff1a; <el-input v-model"applyBasicInfo.outerApplyId"/> 样式设置&#xff1a; ::v-deep .el-input__wrapper {background-color: pink; }// 也可以这样设置 ::v-deep(.el-input__wrapper) {background-color: pink…

Realsense相机驱动安装及其ROS通讯配置——机器人抓取系统基础系列(四)

文章目录 概要1 Realsense相机驱动安装Method1: 使用Intel服务器预编译包Method2: 使用ROS服务器预编译包Method3: 使用SDK源代码方法对比总结 2 Realsense-ROS通讯配置与使用2.1 Realsense-ROS包安装2.2 ROS节点启动 小结Reference 概要 本文首先阐述了Realsense相机驱动安装…

初识 Git——《Pro Git》

Why Git&#xff1f; 1. 本地版本控制系统 Why&#xff1a; 许多人习惯用复制整个项目目录的方式来保存不同的版本&#xff0c;或许还会改名加上备份时间以示区别。 这么做唯一的好处就是简单&#xff0c;但是特别容易犯错。 有时候会混淆所在的工作目录&#xff0c;一不小心…

Freeswitch使用media_bug能力实现回铃音检测

利用freeswitch的media bug能力来在智能外呼时通过websocket对接智能中心的声音检测接口&#xff0c;来实现回铃音检测&#xff0c;来判断用户当前是否已响应&#xff0c;拒接&#xff0c;关机等。 1.回铃音处理流程 2.模块源码目录结构 首先新建一个freeswitch的源码的src/a…

我的年度总结

这一年的人生起伏&#xff1a;从曙光到低谷再到新的曙光 其实本来没打算做年度总结的&#xff0c;无聊打开了帅帅的视频&#xff0c;结合自己最近经历的&#xff0c;打算简单聊下。因为原本打算做的内容会是一篇比较丧、低能量者的呻吟。 实习生与创业公司的零到一 第一段工…

Hive SQL必刷练习题:留存率问题

首次登录算作当天新增&#xff0c;第二天也登录了算作一日留存。可以理解为&#xff0c;在10月1号登陆了。在10月2号也登陆了&#xff0c;那这个人就可以算是在1号留存 今日留存率 &#xff08;今日登录且明天也登录的用户数&#xff09; / 今日登录的总用户数 * 100% 解决思…

ASP.NET Core - 缓存之分布式缓存

ASP.NET Core - 缓存之分布式缓存 1. 分布式缓存的使用2. 分布式缓存的接入2.1 基于内存的分布式缓存2.2 基于 Redis 的分布式缓存 分布式缓存是由多个应用服务器共享的缓存&#xff0c;通常作为访问它的应用服务器的外部服务进行维护。 分布式缓存可以提高 ASP.NET Core 应用的…

机器学习中的凸函数和梯度下降法

一、凸函数 在机器学习中&#xff0c;凸函数 和 凸优化 是优化问题中的重要概念&#xff0c;许多机器学习算法的目标是优化一个凸函数。这些概念的核心思想围绕着优化问题的简化和求解效率。下面从简单直观的角度来解释。 1. 什么是凸函数&#xff1f; 数学定义 一个函数 f…

flutter R库对图片资源进行自动管理

项目中对资源的使用是开发过程中再常见不过的一环。 一般我们在将资源导入到项目中后,会通过资源名称来访问。 但在很多情况下由于我们疏忽输入错了资源名称,从而导致资源无法访问。 所以,急需解决两个问题: 资源编译期可检查可方便预览资源安装相关插件 在vscode中安装两…

闲谭SpringBoot--ShardingSphere分布式事务探究

文章目录 0. 背景1. 未分库分表时2. 仅分表时3. 分库分表时3.1 不涉及分库表3.2 涉及分库表&#xff0c;且分库表处于一个库3.3 涉及分库表&#xff0c;且分库表处于多个库3.4 涉及分库表&#xff0c;且运行中某库停机 4. 小结 0. 背景 接上篇文章《闲谭SpringBoot–ShardingS…

【Linux】--- 进程的等待与替换

进程的等待与替换 一、进程等待1、进程等待的必要性2、获取子进程status3、进程等待的方法&#xff08;1&#xff09;wait&#xff08;&#xff09;函数&#xff08;2&#xff09;waitpid函数 4、多进程创建以及等待的代码模型5、非阻塞接口 轮询 二、进程替换1、替换原理2、替…

AI 编程工具—Cursor进阶使用 阅读开源项目

AI 编程工具—Cursor进阶使用 阅读开源项目 首先我们打开一个最近很火的项目browser-use ,直接从github 上克隆即可 索引整个代码库 这里我们使用@Codebase 这个选项会索引这个代码库,然后我们再选上这个项目的README.md 文件开始提问 @Codebase @README.md 这个项目是用…

keepalived双机热备(LVS+keepalived)实验笔记

目录 前提准备&#xff1a; keepalived1&#xff1a; keepalived2&#xff1a; web1&#xff1a; web2&#xff1a; keepalived介绍 功能特点 工作原理 应用场景 前提准备&#xff1a; 准备4台centos&#xff0c;其中两台为keepalived&#xff0c;两台为webkeepalive…

【dockerros2】ROS2节点通信:docker容器之间/docker容器与宿主机之间

&#x1f300; 一个中大型ROS项目常需要各个人员分别完成特定的功能&#xff0c;而后再组合部署&#xff0c;而各人员完成的功能常常依赖于一定的环境&#xff0c;而我们很难确保这些环境之间不会相互冲突&#xff0c;特别是涉及深度学习环境时。这就给团队项目的部署落地带来了…

ASP.NET Core - IStartupFilter 与 IHostingStartup

ASP.NET Core - IStartupFilter 与 IHostingStartup 1. IStartupFilter2 IHostingStartup2.5.1 创建外部程序集2.5.2 激活外部程序集 1. IStartupFilter 上面讲到的方式虽然能够根据不同环境将Startup中的启动逻辑进行分离&#xff0c;但是有些时候我们还会可以根据应用中的功能…

HarmonyOS NEXT应用开发边学边玩系列:从零实现一影视APP (五、电影详情页的设计实现)

在上一篇文章中&#xff0c;完成了电影列表页的开发。接下来&#xff0c;将进入电影详情页的设计实现阶段。这个页面将展示电影的详细信息&#xff0c;包括电影海报、评分、简介以及相关影人等。将使用 HarmonyOS 提供的常用组件&#xff0c;并结合第三方库 nutpi/axios 来实现…

【excel】VBA股票数据获取(搜狐股票)

文章目录 一、序二、excel 自动刷新股票数据三、付费获取 一、序 我其实不会 excel 的函数和 visual basic。因为都可以用matlab和python完成。 今天用了下VBA&#xff0c;还挺不错的。分享下。 上传写了个matlab获取股票数据的&#xff0c;是雅虎财经的。这次是搜狐股票的数…