Kafka - 异步/同步发送API

文章目录

  • 异步发送
    • 普通异步发送
      • 异步发送流程
      • Code
    • 带回调函数的异步发送
      • 带回调函数的异步发送流程
      • Code
  • 同步发送API

在这里插入图片描述


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();System.out.println(art.offset());System.out.println("over - " + i);}// 5. 关闭资源kafkaProducer.close();}}

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerWithCallBack {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 添加回调// 该方法在Producer收到ack时调用,为异步调用kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {// 没有异常,输出信息到控制台System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

package com.artisan.pc;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");// key,value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 通过Future接口的get实现同步阻塞kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;}// 5. 关闭资源kafkaProducer.close();}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/175858.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

飞鼠异地组网工具全网互通实战指南

飞鼠异地组网工具全网互通实战指南 一、飞鼠异地组网工具介绍1.1 飞鼠工具简介1.2 飞鼠工具官网 二、本次实践介绍2.1 本次实践前提2.2 本次实践简介2.3 本次实践环境规划 三、异地组网配置3.1 进入中心控制器节点管理后台3.2 网卡设置3.3 进入子网节点管理后台3.4 网卡设置 四…

项目综合实训,vrrp+bfd,以及策略路由的应用

目录 一&#xff0e; 项目需求 二&#xff0e; Visio设备画图 三&#xff0e; 设备选型 三&#xff0e;vlan规划 四&#xff0e;Ip地址规划 五&#xff0e;实验拓扑图 六&#xff0e;配置过程及结果 项目需求 1.S1作为VLAN10的主网关和根桥&#xff0c;S2作为v…

Pytorch L1,L2正则化

L1正则化和L2正则化是常用的正则化技术&#xff0c;用于在机器学习模型中控制过拟合。它们的主要区别在于正则化项的形式和对模型参数的影响。 L1正则化&#xff08;Lasso正则化&#xff09;&#xff1a; 正则化项形式&#xff1a;L1正则化使用模型参数的绝对值之和作为正则化…

Emscripten + CMakeLists.txt 将 C++ 项目编译成 WebAssembly(.wasm)/js,并编译 Html 测试

背景&#xff1a;Web 端需要使用已有的 C 库&#xff08;使用 CMake 编译&#xff09;&#xff0c;需要将 C 项目编译成 WebAssembly(.wasm) 供 js 调用。 上篇文章《Mac 上安装 Emscripten》 已讲解如何安装配置 Emscripten 环境。 本篇文章主要讲解如何将基于 CMakeLists 配…

Gitee 发行版

Gitee 发行版 1、Gitee 发行版管理2、项目仓库中创建发行版本3、项目中导入3.1 gradle配置3.2 dependencies执行正常&#xff0c;包没有下载 1、Gitee 发行版管理 Gitee 发行版&#xff08;Release&#xff09;管理 2、项目仓库中创建发行版本 按照Gitee官网操作就行 3、项目…

PCIe 访问 EP 配置空间,空间映射详解,BDF 计算偏移

访问 EP 的配置空间方法 内存映射IO 访问 内存访问配置空间 前置知识 PCIe 设备的寻址是按照 BDF 即 Bus-Device-Function 来组织的。访问某个设备则需要根据BDF计算偏移地址。 两种不同的内存访问配置空间方法 类 xilinx&#xff0c;基地址 偏移地址访问 // linux-5.10\…

http1,https,http2,http3总结

1.HTTP 当我们浏览网页时&#xff0c;地址栏中使用最多的多是https://开头的url&#xff0c;它与我们所学的http协议有什么区别&#xff1f; http协议又叫超文本传输协议&#xff0c;它是应用层中使用最多的协议&#xff0c; http与我们常说的socket有什么区别吗&#xff1f; …

【ARM 嵌入式 C 入门及渐进 10 -- 冒泡排序 选择排序 插入排序 快速排序 归并排序 堆排序 比较介绍】

文章目录 排序算法小结排序算法C实现排序方法的稳定性 排序算法小结 C语言中常用的排序算法包括冒泡排序、选择排序、插入排序、快速排序、归并排序、堆排序。下面我们来一一介绍&#xff1a; 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff1a;冒泡排序是通过比较相邻…

android 8.1 disable unsupported sensor

如果device不支持某种sensor,可以在android/frameworks/base/core/java/android/hardware/SystemSensorManager.java里将其disabled掉。以disable proximity sensor为例。 public SystemSensorManager(Context context, Looper mainLooper) {synchronized(sLock) {if (!sNativ…

MWeb Pro for Mac:博客生成编辑器,助力你的创作之旅

在当今数字化时代&#xff0c;博客已经成为了许多人记录生活、分享知识和表达观点的重要渠道。而要打造一个专业、美观且易于管理的博客&#xff0c;选择一款强大的博客生成编辑器至关重要。今天&#xff0c;我向大家推荐一款备受好评的Mac软件——MWeb Pro。 MWeb Pro是一款专…

flutter深研

https://www.douyin.com/video/7020336319058627853 关闭系统风扇 在 Windows 操作系统上安装和配置 Flutter 开发环境 - Flutter 中文文档 - Flutter 中文开发者网站 - Flutter 下载Git - Downloading Package 推荐使用迅雷下载 系统配置要求 要想安装和运行 Flutter&#xf…

使用FastAPI部署Ultralytics YOLOv5模型

YOLO是You Only Look Once(你只看一次)的缩写&#xff0c;它具有识别图像中的物体的非凡能力&#xff0c;在日常应用中会经常被使用。所以在本文中&#xff0c;我们将介绍如何使用FastAPI的集成YOLOv5&#xff0c;这样我们可以将YOLOv5做为API对外提供服务。 Python有几个web框…

如何将 ruby 打包类似于jdk在另一台相同架构的机器上面开箱即用

需求 目前工作中使用到了ruby作为java 项目的中转语言&#xff0c;但是部署ruby的时候由于环境的不同会出现安装依赖包失败的问题&#xff0c;如何找到一种开箱即用的方式类似于java 中的jdk内置jvm这种方式 解决 TruffleRuby 完美解决问题&#xff0c;TruffleRuby 是使用 T…

基于STC系列单片机实现外部中断0控制按键调节定时器0产生PWM(脉宽调制)的功能

#define uchar unsigned char//自定义无符号字符型为uchar #define uint unsigned int//自定义无符号整数型为uint sbit PwmOut P1^0;//位定义脉宽调制输出为单片机P1.0脚 uchar PwmTimeCount;//声明脉宽调制时间计数变量 uchar PwmDutyCycle;//声明脉宽调制占空比变量 void Ti…

Apache服务的搭建与配置(超详细版)

前言 Apache是一种常见的Web服务器软件&#xff0c;广泛用于Linux和其他UNIX操作系统上。它是自由软件&#xff0c;可以通过开放源代码的方式进行自由分发和修改。Apache提供了处理静态和动态内容的能力&#xff0c;而且还支持多种编程语言和脚本&#xff0c;如PHP、Python和P…

python数据可视化

内容主要介绍了python模块matplotlib即seaborn数据可视化 matplotlib模块通过import matplotlib.pyplot as plt生成图形&#xff0c;如生成图形没展示&#xff0c;可调用plt.show()方法展示图形&#xff1b; 对于颜色属性设置&#xff0c;既可以使用十六进制颜色表达(#7777aa…

cdrx8和2020哪个版本更好用?有什么区别

经过多年的发展&#xff0c;cdr推出了很多优秀的版本&#xff0c;并顺应时代的发展更新了多项功能。随着cdr推出的软件版本增多&#xff0c;小伙伴们可选择的产品也在增多&#xff0c;那么该怎么选择呢&#xff1f;本文会给大家介绍cdrx8和2020的区别&#xff0c;CDRX8和2020哪…

Pytorch 猫狗识别案例

猫狗识别数据集https://download.csdn.net/download/Victor_Li_/88483483?spm1001.2014.3001.5501 训练集图片路径 测试集图片路径 训练代码如下 import torch import torchvision import matplotlib.pyplot as plt import torchvision.models as models import torch.nn as…

基于静电放电算法的无人机航迹规划-附代码

基于静电放电算法的无人机航迹规划 文章目录 基于静电放电算法的无人机航迹规划1.静电放电搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用静电放电算法来优化无人机航迹规划。 …

0基础学习PyFlink——用户自定义函数之UDF

大纲 标量函数入参并非表中一行&#xff08;Row&#xff09;入参是表中一行&#xff08;Row&#xff09;alias PyFlink中关于用户定义方法有&#xff1a; UDF&#xff1a;用户自定义函数。UDTF&#xff1a;用户自定义表值函数。UDAF&#xff1a;用户自定义聚合函数。UDTAF&…