【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/468856.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring源码(十二):Spring MVC之Spring Boot

本篇将详细讨论Spring Boot 的启动/加载、处理请求的具体流程。我们先从一个简单的Spring Boot项目日志开始分析(这里假设读者已经仔细阅读完了前面的文章,且对Spring源码有一定深度的了解,否则会看得一脸懵逼)。 本文为2024重置…

c语言-教零基础/新手入门

C 简介 一提到语言这个词语,自然会想到的是像英语,汉语等这样的自然语言,因为它是人和人交换信息不可缺少的工具。而今天计算机普遍了我们生活的每一个角落,除了人和人的相互交流之外,我们必须和计算机交流。就像人类…

【安装配置教程】二、VMware安装并配置ubuntu22.04

一、准备: 虚拟机安装ubuntu,首先要先找到一个镜像,可以去ubuntu官方下载一个,地址:下载Ubuntu桌面系统 | Ubuntu,下载好iso的镜像文件后保存好,接下来打开VMware。 二、安装&#xff…

立体视觉的核心技术:视差计算与图像校正详解

立体视觉的核心技术:视差计算与图像校正详解 在立体视觉中,通过双目相机(即左右两台相机)的不同视角捕获的图像,结合几何关系,我们可以推算出场景中物体的深度。本文将深入讲解如何基于视差(di…

深度学习基础练习:从pytorch API出发复现LSTM与LSTMP

2024/11/5-2024/11/7: 前置知识: [译] 理解 LSTM(Long Short-Term Memory, LSTM) 网络 - wangduo - 博客园 【官方双语】LSTM(长短期记忆神经网络)StatQuest_哔哩哔哩_bilibili 大部分思路来自于: PyTorch LSTM和LSTMP…

scala学习记录,Set,Map

set:集合,表示没有重复元素的集合,特点:唯一 语法格式:val 变量名 Set [类型](元素1,元素2...) 可变不可变 可变(mutable)可对元素进行添加,删…

Docker了解

Docker是一种容器化技术,它可以将应用程序和其依赖项打包到一个独立的、可移植的容器中,以便在不同的环境中运行。Docker基于Linux操作系统的容器化技术,可以提供更轻量、更快速、更灵活、更一致的应用部署和管理方式。 Docker的基本概念包括…

‘conda‘ 不是内部或外部命令,也不是可运行的程序或批处理文件,Miniconda

下载了conda,但是在cmd里执行conda --version会显示’conda’ 不是内部或外部命令,也不是可运行的程序或批处理文件。 原因是环境变量里没有添加conda,无法识别路径。 需要在系统环境变量里添加如下路径: 保存之后重新打开cmd&am…

《TCP/IP网络编程》学习笔记 | Chapter 4:基于TCP的服务器端/客户端(1)

《TCP/IP网络编程》学习笔记 | Chapter 4:基于TCP的服务器端/客户端(1) 《TCP/IP网络编程》学习笔记 | Chapter 4:基于TCP的服务器端/客户端(1)理解TCP和UDPTCP/IP协议栈TCP/IP协议的诞生背景链路层网络层T…

【C语言】。末尼

用js写一个冒泡排序 当然可以。在JavaScript中&#xff0c;你可以使用递归的方式来实现冒泡排序算法。这是一个简单的示例&#xff1a; function bubbleSort(arr) { let len arr.length; if(len < 1){ return arr; // 如果数组只有一个元素或为空&#xf…

深度学习笔记12

1.神经网络的代价函数 神经网络可同时用于解决分类问题和回归问题&#xff0c;对于不同的问题会在输出层后&#xff0c;加上不同的变换函数。一般来说&#xff0c;回归问题使用恒等函数,分类问题使用sigmoid或softmax函数。而不同的变换函数&#xff0c;也对应不同的代价函数。…

RabbitMQ队列详细属性(重要)

RabbitMQ队列详细属性 1、队列的属性介绍1.1、Type&#xff1a;队列类型1.2、Name&#xff1a;队列名称1.3、Durability&#xff1a;声明队列是否持久化1.4、Auto delete&#xff1a; 是否自动删除1.5、Exclusive&#xff1a;1.6、Arguments&#xff1a;队列的其他属性&#xf…

json即json5新特性,idea使用json5,fastjson、gson、jackson对json5支持

文章目录 1.新特性1.1.JSON&#xff06;JSON5官网2.示例2.1. IntelliJ IDEA2.1.1.支持.json5文件2.1.2.md支持json5代码块 2.9. 示例源码 1.新特性 【通用】 注释尾随逗号key无需引号&#xff08;或单引号&#xff09; 【字符串】 字符串可以用单引号引起来。字符串可以通过转…

【NOIP普及组】摆花

【NOIP普及组】摆花 C语言代码C 代码Java代码Python代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 小明的花店新开张&#xff0c;为了吸引顾客&#xff0c;他想在花店的门口摆上一排花&#xff0c;共 m 盆。通过调 查顾客的喜好&am…

pdf转excel;pdf中表格提取

一、问题描述 在工作中或多或少会遇到&#xff1a;需要将某份pdf中的表格数据提取出来&#xff0c;以便能够“修改使用”数据 可将pdf中的表格提取出来&#xff0c;解决办法还有点复杂 尤其涉及“pdf中表格不是标准的单元格”的时候&#xff0c;提取数据到excel不太容易 比…

Qt中 QWidget 和 QMainWindow 区别

QWidget 用来构建简单窗口 QMainWindow 用来构建更复杂的窗口&#xff0c;QMainWindow 继承自QWidget&#xff0c;在QWidget 的基础上提供了菜单栏、工具栏、状态栏等功能 菜单栏&#xff08;QMenuBar&#xff09;工具栏&#xff08;QToolBar&#xff09;状态栏&#xff08;Q…

《深入浅出Apache Spark》系列③:Spark SQL解析层优化策略与案例解析

导读&#xff1a;本系列是Spark系列分享的第三期。第一期分享了Spark Core的一些基本原理和一些基本概念&#xff0c;包括一些核心组件。Spark的所有组件都围绕Spark Core来运转&#xff0c;其中最活跃的一个上层组件是Spark SQL。第二期分享则专门介绍了Spark SQL的基本架构和…

安全的时钟启动

Note&#xff1a;文章内容以 Xilinx 系列 FPGA 进行讲解 1、什么是安全启动时钟 通常情况下&#xff0c;在MMCM/PLL的LOCKED信号抬高之后&#xff08;由0变为1&#xff09;&#xff0c;MMCM/PLL就处于锁定状态&#xff0c;输出时钟已保持稳定。但在此之前&#xff0c;输出时钟会…

【mongodb】数据库的安装及连接初始化简明手册

NoSQL(NoSQL Not Only SQL )&#xff0c;意即"不仅仅是SQL"。 在现代的计算系统上每天网络上都会产生庞大的数据量。这些数据有很大一部分是由关系数据库管理系统&#xff08;RDBMS&#xff09;来处理。 通过应用实践证明&#xff0c;关系模型是非常适合于客户服务器…

丹韵红墙成红毯至美背景!冠珠华脉「雍华京韵」于M essential大秀绽放京韵时尚

东方美学代表品牌M essential近日于上海科学会堂举办十周年大秀&#xff0c;并发布品牌全新2024/25冬春系列。冠珠瓷砖作为国风新韵合作品牌&#xff0c;以高定岩板华脉「雍华京韵」系列的宫墙丹韵打造红毯背景墙&#xff0c;中国高定岩板与中国高级时装作品碰撞着“中国美”的…