kafka发送消息-自定义消息发送的拦截器

在这里插入图片描述

1、自定义拦截器

创建自定义拦截器类,实现ProducerInterceptor接口。对消息进行拦截,可以在拦截中对消息做些处理,记录日志等操作…
在这里插入图片描述

package com.power.config;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomerProducerInterceptor implements ProducerInterceptor<String,Object> {/*** 发送消息时,会调用该方法,对消息进行拦截,可以在拦截中对消息做些处理,记录日志等操作......* @param record* @return*/@Overridepublic ProducerRecord<String,Object> onSend(ProducerRecord record) {System.out.println("拦截消息:"+record.toString());return record;}/*** 服务器收到消息后的一个确认* @param metadata* @param exception*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(null!=metadata){System.out.println("服务器接收到该消息:"+metadata.toString());}else {System.out.println("消息发送失败了,exception = "+exception);}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

2、kafak配置类

在这里插入图片描述

package com.power.config;import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitioner.class);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomerProducerInterceptor.class.getName());return props;}public ProducerFactory<String, ?> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, ?> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}//第二次创建@Beanpublic NewTopic newTopic9() {return new NewTopic("heTopic", 9, (short) 1);}
}

3、生产者

在这里插入图片描述

package com.power.producer;import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate2;public void send10(){User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();//分区是null,让kafka自己去决定把消息发送到哪个分区kafkaTemplate2.send("heTopic",user);}
}

4、测试类

在这里插入图片描述

package com.power;import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.Date;@SpringBootTest
public class SpringBoot01KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.send10();}}

5、执行测试类

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/408420.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring Boot】全局异常处理

目录 背景 前言 设计步骤 1.定义异常信息类&#xff1a; 2.自定义异常&#xff1a; 3.创建全局异常处理类 4.在控制器中抛出异常 5.输出 捕获 Valid 校验异常 背景 去面试的时候被问到SpringBoot项目中&#xff0c;如何处理全局异常的&#xff0c;也就是如何捕获全局异…

vue2 part2

数据代理 通过defineProperty里面传入obj2和x&#xff0c;然后get和set下读取接收下然后再接收set中给对象x用value接收下&#xff0c;这样就能两个数据读取和接收了 <!DOCTYPE html> <html><head><meta charset"UTF-8" /><title>何…

浅谈【数据结构】链表之单链表

目录 1、什么是数据&#xff1f; 2、什么是结构 3、什么是数据结构&#xff1f; 4、线性结构(线性表&#xff09; 4.1线性表的物理结构的实现 5、链表 5.1无头结点的单链表 5.2新内容、老面孔 5.3数组和链表的优缺点 5.4链表的概念 5.5链表的创建步骤 5.5.1创建过程…

芯片后端之 PT 使用 report_timing 产生报告 之 -input_pins 选项

今天,我们再学习一点点 后仿真相关技能。 那就是,了解 report_timing 中的 -include_hierarchical_pins 选项。 如果我们仅仅使用如下命令,执行后会发现: pt_shell> report_timing -from FF1/CK -to FF2/d -delay_type max -include_hierarchical_pins 我们使用命…

【数据库】Mysql 批量变更所有字段类型为varchar的字符集

生成变更语句 SELECT CONCAT(ALTER TABLE , TABLE_NAME, MODIFY , COLUMN_NAME, , COLUMN_TYPE, , CHARACTER SET utf8 COLLATE utf8_general_ci , CASE WHEN IS_NULLABLE YES THEN NULL DEFAULT NULL WHEN IS_NULLABLE NO AND ISNULL(COLUMN_DEFAULT) THEN NOT NULL EL…

什么是持续集成(持续交付、部署)

文章目录 1 持续集成1.1 持续集成的好处1.2 持续集成的目的1.3 没有持续集成的状况 2 持续交付3 持续部署4 持续交付和持续部署的区别 1 持续集成 持续集成&#xff08;Continuous integration&#xff0c;简称CI&#xff09;&#xff0c;简单来说持续集成就是频繁地&#xff…

数字孪生网络 (DTN)关键技术分析

数字孪生网络 (DTN): 概念、架构及关键技术 摘要 随着5G商用规模部署和下一代互联网IPv6的深化应用&#xff0c;新一代网络技术的发展引发了产业界的广泛关注。智能化被认为是新一代网络发展的趋势&#xff0c;为数字化社会的信息传输提供了基础。面向数字化、智能化的新一代网…

【Linux篇】Linux命令基础

目录 1. Linux的目录结构 1.1 Linux的目录结构 1.2 /在Linux系统中的表示 2. linux命令基础 2.1 什么是命令和命令行 2.2 Linux命令的通用格式 2.3 ls命令 2.3.1 ls命令的参数的作用&#xff1a; 2.3.2 ls命令的选项 2.3.3 命令的选项组合使用 2.4 cd切换工作目录 2…

【微信小程序】使用 npm 包 - API Promise化-- miniprogram-api-promise

1. 基于回调函数的异步 API 的缺点 默认情况下&#xff0c;小程序官方提供的异步 API 都是基于回调函数实现的&#xff0c;例如&#xff0c;网络请求的 API 需要按照如下的方式调用&#xff1a; 缺点&#xff1a;容易造成回调地狱的问题&#xff0c;代码的可读性、维护性差&a…

HTML简单了解和基础知识记录

参考视频 html的用途 超文本标记语言&#xff08;英语&#xff1a;HyperText Markup Language&#xff0c;简称&#xff1a;HTML&#xff09;&#xff0c;用来显示网页的文字和框架结构&#xff0c;可以认为是网页的骨架。 标签/元素 用于定义文字图片连接等&#xff0c;分…

VLDB 2024 即将来袭!创邻科技将带来精彩分享

8月26-30日&#xff0c;数据库领域最权威、影响力最大的顶级盛会之一&#xff0c;VLDB 2024 来了&#xff01; VLDB&#xff08;International Conference on Very Large Databases&#xff09;是数据管理、可扩展数据科学和数据库研究人员、厂商、应用开发者以及用户广泛参与…

能实现可算不可见的同态加密技术详解

目录 同态加密的基本概念 同态加密示例 同态加密的原理 同态加密的类型 同态加密的应用场景 同态加密的挑战 小结 同态加密&#xff08;Homomorphic Encryption&#xff0c;HE&#xff09;是一种满足密文同态运算性质的加密算法&#xff0c;可以在加密数据上直接执行特定…

WiFi的IP和电脑IP一样吗?怎么更改wifi的ip地址

在数字化时代&#xff0c;网络连接已成为我们日常生活和工作中不可或缺的一部分。无论是通过手机、电脑还是其他智能设备接入互联网&#xff0c;IP地址作为网络设备的唯一标识&#xff0c;扮演着至关重要的角色。然而&#xff0c;很多用户对于WiFi的IP地址与电脑&#xff08;或…

3.4-CoroutineScope/CoroutineContext:coroutineScope() 和 supervisorScope()

文章目录 coroutineScope()supervisorScope()总结 coroutineScope() coroutineScope() 和我们创建协程时的 CoroutineScope 名字是相同的&#xff0c;实际上它们也确实有所关联&#xff0c;为了方便理解我们先说下 coroutineScope() 是怎样的效果。 我们在使用 coroutineScop…

计算机网络常见面试题总结

文章目录 1 计算机网络基础1.1 网络分层模型1.1.1 OSI 七层模型是什么&#xff1f;每一层的作用是什么&#xff1f;1.1.2 TCP/IP 四层模型是什么&#xff1f;每一层的作用是什么&#xff1f;1.1.3 为什么网络要分层&#xff1f; 1.2 常见网络协议1.2.1 应用层有哪些常见的协议&…

MySQL主从复制之GTID模式

目录 1 MySQL 主从复制 GTID 模式介绍 2 传统复制模式与GTID复制模式的区别 3 GTID模式核心参数 4 GTID 实现自动复制原理 4.1 GTID基本概念 4.2 GTID复制流程 5 GTID 实现自动定位 5.1 配置 my.cnf 5.2 配置 SLAVE 实现自动定位 5.3 测试 6 GTID 模式 故障转移的方法流程 6.1…

记一次NULL与空字符串导致的分组后产生重复数据

目录 一&#xff0c;场景说明二&#xff0c;实现功能三&#xff0c;修改原实现方法四&#xff0c;说明 一&#xff0c;场景说明 想实现这样一个功能&#xff0c;统计人员信息中不同性别的人的总工资。 实现方式&#xff1a;将数据group by 分组后累加。 二&#xff0c;实现功…

SpringMVC核心机制环境搭建

文章目录 1.SpringMVC执行流程1.基础流程图2.详细流程图 2.安装Tomcat1.下载2.解压到任意目录即可3.IDEA配置Tomcat1.配置Deloyment2.配置Server 3.创建maven项目1.创建sun-springmvc模块&#xff08;webapp&#xff09;2.查看是否被父模块管理3.pom.xml引入依赖4.目录5.SunDis…

npm阿里云制品仓库

配置 配置仓库地址&#xff0c;可以再在仓库指南看到 npm config set registryxxxxx#登录&#xff0c;帐户密码可以在仓库指南看到 npm login注意&#xff1a;npm>9的版本npm login目前有问题 verbose web login not supported, trying couch&#xff0c;暂时没试验到解决…

C语言函数递归

前言与概述 本文章将通过多个代码并赋予图示&#xff0c;详细讲解C语言函数递归的定义和函数递归的运算过程。 函数递归定义 程序调用自身的编程技巧称为递归。递归作为一种算法在程序设计语言中广泛应用。一个过程或函数在其定义或说明中有直接或间接调用自身的一种方法。它…