Windows10配置C++版本的Kafka,并进行发布和订阅测试

配置的环境为:Release x64下的环境

完整项目:https://gitee.com/jiajingong/kafka-publisher

1、首先下载相应的库文件(.lib,.dll)

参考链接:

GitHub - eStreamSoftware/delphi-kafka

GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll

2、新建一个新的命令行C++工程

建完工程后,选择Release x64,并在生成中执行重新生成解决方案,这样会在项目目录下生成x64/Release文件夹

3、通过VS2017配置附加库目录和附加依赖项

所有的.lib、.dll等库文件均在下图x64/Release目录下

附加依赖项加入:librdkafka.lib;librdkafkacpp.lib,如下图:

4、发布端:

将主函数的CPP文件改为:

#include <iostream>
#include <thread>
#include "rdkafkacpp.h"int main()
{std::string brokers = "172.18.4.96:9092";std::string errorStr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (!conf) {std::cout << "Create RdKafka Conf failed" << std::endl;return -1;}conf->set("message.max.bytes", "10240000", errorStr); //最大字节数conf->set("replica.fetch.max.bytes", "20485760", errorStr);conf->set("bootstrap.servers", brokers, errorStr);RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);if (!producer) {std::cout << "Create Producer failed" << std::endl;return -1;}//创建TopicRdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);if (!topic) {std::cout << "Create Topic failed" << std::endl;}int count = 0;while (true){   //发送消息RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;if (resCode != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;}count += 1;std::this_thread::sleep_for(std::chrono::seconds(1));}delete conf;delete tconf;delete topic;delete producer;RdKafka::wait_destroyed(5000);return 0;
}

5、订阅端

新建一个同样的订阅端工程,同样将主函数的代码改为:

#include "rdkafkacpp.h"
#include <chrono>
#include <time.h>
#include <sstream>
#include <iomanip>
#include <iostream>
#include <algorithm>
#include <iterator>void consume_cb(RdKafka::Message &message, void *opaque)
{switch (message.err()) {case RdKafka::ERR__TIMED_OUT:std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;break;case RdKafka::ERR_NO_ERROR:/* Real message */RdKafka::MessageTimestamp ts;ts = message.timestamp();if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {std::string timeprefix;if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {timeprefix = "created time";}else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {timeprefix = "log append time";}unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改auto mTime = std::chrono::milliseconds(milli);auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);auto tt = std::chrono::system_clock::to_time_t(tp);tm timeinfo;::gmtime_s(&timeinfo, &tt);//char s[60]{ 0 };//::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);// std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
#if 0std::stringstream ss;std::string dateStr;ss << timeinfo.tm_year + 1900 << "-"<< timeinfo.tm_mon + 1 << "-"<< timeinfo.tm_mday;ss >> dateStr;ss.clear();ss << timeinfo.tm_hour << ":"<< timeinfo.tm_min << ":"<< timeinfo.tm_sec;std::string timeStr;ss >> timeStr;std::string dateTimeStr;dateTimeStr += dateStr;dateTimeStr.push_back(' ');dateTimeStr += timeStr;
#endif // 0//std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;std::cout << "TimeStamp   " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;}std::cout << message.topic_name() << " offset" << message.offset() << "  partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;break;case RdKafka::ERR__PARTITION_EOF:/* Last message */std::cout << "EOF reached for" << std::endl;break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cout << "Consume failed: " << message.errstr();break;default:/* Errors */std::cout << "Consume failed: " << message.errstr();break;}
}
int main()
{std::string brokers = "172.18.4.96:9092";std::string errstr;std::vector<std::string> topics{ "koala-stqf-03","klai-seim-alert-koala-test-03"};std::string group_id = "whl-consumer-group";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if (conf->set("group.id", group_id, errstr)) {std::cout << errstr << std::endl;return -1;}conf->set("bootstrap.servers", brokers, errstr);conf->set("max.partition.fetch.bytes", "1024000", errstr);//conf->set("enable-auto-commit", "true", errstr);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);tconf->set("auto.offset.reset", "latest", errstr);conf->set("default_topic_conf", tconf, errstr);RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!m_consumer) {std::cout << "failed to create consumer " << errstr << std::endl;return -1;}#if 0 //从上一次消费结束的位置开始消费RdKafka::ErrorCode err = m_consumer->subscribe(topics);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;return -1;}
#else //指定每个topic的每个分区开始消费的位置//基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费RdKafka::Metadata *metadataMap{ nullptr };RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);if (err != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(err) << std::endl;}const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();std::cout << "broker topic size: " << topicList->size() << std::endl;RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();});std::vector<RdKafka::TopicPartition*> topicpartions;std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {auto parVec = data->partitions();std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));});});m_consumer->assign(topicpartions);
#endif // 0RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);if (errccc != RdKafka::ERR_NO_ERROR) {std::cout << RdKafka::err2str(errccc) << std::endl;return -1;}while (true){RdKafka::Message *msg = m_consumer->consume(6000);consume_cb(*msg, nullptr); //消息一条消息delete msg;}return 0;
}

6、发布 订阅展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/21900.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于云的物联网系统用于实时有害藻华监测:通过MQTT和REST API无缝集成ThingsBoard

论文标题 **英文标题&#xff1a;**Cloud-Based IoT System for Real-Time Harmful Algal Bloom Monitoring: Seamless ThingsBoard Integration via MQTT and REST API **中文标题&#xff1a;**基于云的物联网系统用于实时有害藻华监测&#xff1a;通过MQTT和REST API无缝集…

构建医疗Mini DeepSeek R1:用强化学习训练

构建 医疗迷你 DeepSeek R1&#xff1a;用强化学习训练 在当今快速发展的技术时代&#xff0c;大语言模型&#xff08;LLMs&#xff09;与医疗的结合带来了无限的机遇和独特的挑战。本文探索如何利用 Group Relative Policy Optimization&#xff08;GRPO&#xff09;——由 D…

在mfc中使用自定义三维向量类和计算多个三维向量的平均值

先添加一个普通类, Vector3.h, // Vector3.h: interface for the Vector3 class. // //#if !defined(AFX_VECTOR3_H__53D34D26_95FF_4377_BD54_57F4271918A4__INCLUDED_) #define AFX_VECTOR3_H__53D34D26_95FF_4377_BD54_57F4271918A4__INCLUDED_#if _MSC_VER > 1000 #p…

DeepSeek、微信、硅基流动、纳米搜索、秘塔搜索……十种不同方法实现DeepSeek使用自由

为了让大家实现 DeepSeek 使用自由&#xff0c;今天分享 10 个畅用 DeepSeek 的平台。 一、官方满血版&#xff1a;DeepSeek官网与APP 首推&#xff0c;肯定是 DeepSeek 的官网和 APP&#xff0c;可以使用满血版 R1 和 V3 模型&#xff0c;以及联网功能。 网址&#xff1a; htt…

Solon Cloud —— 介绍

说明 前面的章节&#xff0c;我们讲解了 Solon 的开发应用&#xff0c;接下来准备讲解 Solon Cloud 的的开发。Solon Cloud 是为微服务和云原生准备的分布式开发套件。 微服务 就像 MVC 一样&#xff0c;对于微服务的理解也是有不同的。微服务是一组协调工作的小而自治的服务…

python中的异常-模块-包

文章目录 异常异常的定义异常捕获语法捕获常规异常捕获指定异常捕获多个异常捕获所有异常异常else异常finally 异常传递总结 模块概念导入自定义模块及导入main方法all变量 总结 包自定义包定义pycharm中建包的基本步骤导入方式 第三方包 异常 异常的定义 当检测到一个错误时…

跟着柳叶刀数字健康,学习如何通过病理切片预测分子分类对预后的影响|项目复现

小罗碎碎念 项目复现 今天和大家分享一个非常具有参考价值的项目,手把手带着大家复现一篇发表在柳叶刀数字健康的文章。 花了六个小时才完成的这篇推送,信息量非常大,遇到了很多报错问题,但是解决以后的感觉是非常爽的,先给大家展示一下最终的成果——在同一张切片上,通…

Android Http-server 本地 web 服务

时间&#xff1a;2025年2月16日 地点&#xff1a;深圳.前海湾 需求 我们都知道 webview 可加载 URI&#xff0c;他有自己的协议 scheme&#xff1a; content:// 标识数据由 Content Provider 管理file:// 本地文件 http:// 网络资源 特别的&#xff0c;如果你想直接…

PyTorch 源码学习:阅读经验 代码结构

分享自己在学习 PyTorch 源码时阅读过的资料。本文重点关注阅读 PyTorch 源码的经验和 PyTorch 的代码结构。因为 PyTorch 不同版本的源码实现有所不同&#xff0c;所以笔者在整理资料时尽可能按版本号升序&#xff0c;版本号见标题前[]。最新版本的源码实现还请查看 PyTorch 仓…

Flowmix/Docx 多模态文档编辑器:新增【操作留痕】功能,让文档编辑有迹可循!...

hi, 大家好, 我是徐小夕. 最近 flowmix/docx 多模态文档编辑器新增了【操作留痕】功能&#xff1a; 体验地址: https://orange.turntip.cn/docx-react 在和大家分享更新功能之前&#xff0c;我简单介绍一下flowmix/docx 的【操作留痕】功能。 操作留痕功能就像是一位忠实的助手…

[每周一更]-(第135期):AI融合本地知识库(RAG),谁才是最强王者!

文章目录 简单看下DeepSeek满血版配置RAG是什么&#xff1f;**RAG 的核心原理**RAG的局限性**RAG 技术栈**技术实现 **RAG 的应用场景****RAG vs 传统 LLM****方法 1&#xff1a;配合 LangChain 加载知识库****方法 2&#xff1a;使用 Ollama****方法 3&#xff1a;结合 Anythi…

go orm GORM

官网&#xff1a;https://gorm.io/ docs&#xff1a;https://gorm.io/docs/ 博客&#xff1a;https://www.tizi365.com/archives/6.html import ("fmt""gorm.io/driver/mysql""gorm.io/gorm" )type Product struct {gorm.ModelCode stringPr…

毕业项目推荐:基于yolov8/yolo11的100种中药材检测识别系统(python+卷积神经网络)

文章目录 概要一、整体资源介绍技术要点功能展示&#xff1a;功能1 支持单张图片识别功能2 支持遍历文件夹识别功能3 支持识别视频文件功能4 支持摄像头识别功能5 支持结果文件导出&#xff08;xls格式&#xff09;功能6 支持切换检测到的目标查看 二、数据集三、算法介绍1. YO…

基于Python CNN和词向量的句子相似性度量

毕业设计&#xff1a;基于CNN和词向量的句子相似性度量 注意&#xff1a;因为要计算WMD距离所以需要安装依赖库pyemd 开发环境 Anaconda Pycharm 项目说明 按照老师要求复现论文(论文提出了一个新概念相似元&#xff0c;通过相似元来计算两个句子的相似度‘)&#xff0c;同…

CPU安装pytorch(别点进来)

终于&#xff01; 深度学习环境配置5——windows下的torch-cpu1.2.0环境配置_requirement怎么写torch cu-CSDN博客

Django-Vue 学习-VUE

主组件中有多个Vue组件 是指在Vue.js框架中&#xff0c;主组件是一个父组件&#xff0c;它包含了多个子组件&#xff08;Vue组件&#xff09;。这种组件嵌套的方式可以用于构建复杂的前端应用程序&#xff0c;通过拆分功能和视图&#xff0c;使代码更加模块化、可复用和易于维…

MATLAB基础学习相关知识

MATLAB安装参考&#xff1a;抖音-记录美好生活 MATLAB基础知识学习参考&#xff1a;【1小时Matlab速成教程-哔哩哔哩】 https://b23.tv/CnvHtO3 第1部分&#xff1a;变量定义和基本运算 生成矩阵&#xff1a; % 生成矩阵% 直接法% ,表示行 ;表示列 a [1,2,3;4,5,6;7,8,9];%…

TypeScript - 数据类型 - 声明变量

TypeScript 是一种强类型的 JavaScript 超集&#xff0c;它引入了静态类型检查和类型注解。基础类型是 TypeScript 中最基本的类型&#xff0c;用于定义变量的类型。 一、数据类型 常用基本类型&#xff1a;boolean 、number 、string 常用&#xff0c;都是小写 1.布尔类型&…

有序任务规划的局限性

有序任务规划的局限性&#xff08;Limitation of Ordered-Task Planning&#xff09; 1. 任务前向分解&#xff08;TFD&#xff09;的限制 TFD&#xff08;Task Forward Decomposition&#xff09;是一种 基于完全有序方法&#xff08;Totally Ordered Methods&#xff09;的任…

MATLAB学习之旅:数据插值与曲线拟合

在MATLAB的奇妙世界里,我们已经走过了一段又一段的学习旅程。从基础的语法和数据处理,到如今,我们即将踏入数据插值与曲线拟合这片充满魅力的领域。这个领域就像是魔法中的艺术创作,能够让我们根据现有的数据点,构建出更加丰富的曲线和曲面,从而更好地理解和描述数据背后…