librdkafka的简单使用

文章目录

    • 摘要
    • kafka是什么
    • 安装环境
    • librdkafka的简单使用
      • 生产者
      • 消费者

摘要

本文是Getting Started with Apache Kafka and C/C++的中文版, kafka的hello world程序。

本文完整代码见仓库,这里只列出producer/consumer的代码


kafka是什么

本节来源:Kafka - 维基百科,自由的百科全书、Kafka入门简介 - 知乎

首先我们得知道什么是Kafka。

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

在这里插入图片描述

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。
  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消费者可以指定偏移量来指定要消费的消息。

安装环境

上一节,kafka的概念看着比较简单,发布-订阅/生产-消费的模型。

为了可以调用Kafka的C/C++ API, 需要先安装环境。

# almlinux8
# dnf search kafka
# dnf install librdkafka-devel# dnf search glib
# dnf install glib2-devel# ubuntu22
# 开发库apt install librdkafka-dev  libglib2.0-dev# 安装docker环境apt install docker.io docker-compose# 本地安装 Kafka
## ref: https://docs.confluent.io/confluent-cli/current/install.html#cpwget -qO - https://packages.confluent.io/confluent-cli/deb/archive.key | sudo apt-key add
➜ add-apt-repository "deb https://packages.confluent.io/confluent-cli/deb stable main"apt install confluent-cli## 启动kafka
## usage: https://docs.confluent.io/confluent-cli/current/command-reference/local/kafka/confluent_local_kafka_start.html
## error: https://stackoverflow.com/questions/63776518/error-2-matches-found-based-on-name-network-nameofservice-default-is-ambiguo
## error:https://stackoverflow.com/questions/77985757/kafka-in-docker-using-confluent-cli-doesnt-workwhereis confluent
confluent: /usr/bin/confluent➜ export CONFLUENT_HOME=/usr/bin/confluent# 我执行下面命令后,没有看到Plaintext Ports信息
➜ confluent local kafka start# 停止,然后重新启动,管用了
➜ confluent local kafka stop
➜ confluent local kafka startThe local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.htmlPulling from confluentinc/confluent-local
Digest: sha256:ad62269bf4766820c298f7581cf872a49f46a11dbaebcccb4fd2e71049288c5b
Status: Image is up to date for confluentinc/confluent-local:7.6.0
+-----------------+-------+
| Kafka REST Port | 8082  |
| Plaintext Ports | 43465 |
+-----------------+-------+
Started Confluent Local containers "8d72d911a4".
To continue your Confluent Local experience, run `confluent local kafka topic create <topic>` and `confluent local kafka topic produce <topic>`.# Create a new topic, purchases, which you will use to produce and consume events.
➜ confluent local kafka topic create purchases
Created topic "purchases".

librdkafka的简单使用

confluenceinc/librdkafka是Apache Kafka协议的 C 库实现 ,提供生产者、消费者和管理客户端。

下面运行的程序来自:Apache Kafka and C/C++ - Getting Started Tutorial

代码中kafka的API可以查询:librdkafka: librdkafka documentation

代码中使用了glib库,日常开发我不会使用这个库,因为感觉比较冷,它的API可查询:GLib – 2.0: Automatic Cleanup


生产者

总体逻辑:

  • 从配置文件中加载配置
  • 创建生产者
  • 生产者发送消息
#include <glib.h>
#include <librdkafka/rdkafka.h>#include "common.c"#define ARR_SIZE(arr) ( sizeof((arr)) / sizeof((arr[0])) )/* Optional per-message delivery callback (triggered by poll() or flush())* when a message has been successfully delivered or permanently* failed delivery (after retries).*/
static void dr_msg_cb (rd_kafka_t *kafka_handle,const rd_kafka_message_t *rkmessage,void *opaque) {if (rkmessage->err) {g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));}
}int main (int argc, char **argv) {rd_kafka_t *producer;rd_kafka_conf_t *conf;char errstr[512];// Parse the command line.if (argc != 2) {g_error("Usage: %s <config.ini>", argv[0]);return 1;}// Parse the configuration.// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdconst char *config_file = argv[1];g_autoptr(GError) error = NULL;g_autoptr(GKeyFile) key_file = g_key_file_new();if (!g_key_file_load_from_file (key_file, config_file, G_KEY_FILE_NONE, &error)) {g_error ("Error loading config file: %s", error->message);return 1;}// Load the relevant configuration sections.conf = rd_kafka_conf_new();load_config_group(conf, key_file, "default");// Install a delivery-error callback.rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);// Create the Producer instance.producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!producer) {g_error("Failed to create new producer: %s", errstr);return 1;}// Configuration object is now owned, and freed, by the rd_kafka_t instance.conf = NULL;// Produce data by selecting random values from these lists.int message_count = 10;const char *topic = "purchases";const char *user_ids[6] = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};const char *products[5] = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};for (int i = 0; i < message_count; i++) {const char *key =  user_ids[random() % ARR_SIZE(user_ids)];const char *value =  products[random() % ARR_SIZE(products)];size_t key_len = strlen(key);size_t value_len = strlen(value);rd_kafka_resp_err_t err;err = rd_kafka_producev(producer,RD_KAFKA_V_TOPIC(topic),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_KEY((void*)key, key_len),RD_KAFKA_V_VALUE((void*)value, value_len),RD_KAFKA_V_OPAQUE(NULL),RD_KAFKA_V_END);if (err) {g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));return 1;} else {g_message("Produced event to topic %s: key = %12s value = %12s", topic, key, value);}rd_kafka_poll(producer, 0);}// Block until the messages are all sent.g_message("Flushing final messages..");rd_kafka_flush(producer, 10 * 1000);if (rd_kafka_outq_len(producer) > 0) {g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));}g_message("%d events were produced to topic %s.", message_count, topic);rd_kafka_destroy(producer);return 0;
}

消费者

总体逻辑:

  • 从配置文件中加载配置
  • 创建消费者
  • 订阅topic
  • 轮询消费者的消息
#include <glib.h>
#include <librdkafka/rdkafka.h>#include "common.c"static volatile sig_atomic_t run = 1;/*** @brief Signal termination of program*/
static void stop(int sig) { run = 0; }int main(int argc, char **argv) {rd_kafka_t *consumer;rd_kafka_conf_t *conf;rd_kafka_resp_err_t err;char errstr[512];// Parse the command line.if (argc != 2) {g_error("Usage: %s <config.ini>", argv[0]);return 1;}// Parse the configuration.// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdconst char *config_file = argv[1];g_autoptr(GError) error = NULL;g_autoptr(GKeyFile) key_file = g_key_file_new();if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE,&error)) {g_error("Error loading config file: %s", error->message);return 1;}// Load the relevant configuration sections.conf = rd_kafka_conf_new();load_config_group(conf, key_file, "default");load_config_group(conf, key_file, "consumer");// Create the Consumer instance.consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!consumer) {g_error("Failed to create new consumer: %s", errstr);return 1;}rd_kafka_poll_set_consumer(consumer);// Configuration object is now owned, and freed, by the rd_kafka_t instance.conf = NULL;// Convert the list of topics to a format suitable for librdkafka.const char *topic = "purchases";rd_kafka_topic_partition_list_t *subscription =rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA);// Subscribe to the list of topics.err = rd_kafka_subscribe(consumer, subscription);if (err) {g_error("Failed to subscribe to %d topics: %s", subscription->cnt,rd_kafka_err2str(err));rd_kafka_topic_partition_list_destroy(subscription);rd_kafka_destroy(consumer);return 1;}rd_kafka_topic_partition_list_destroy(subscription);// Install a signal handler for clean shutdown.signal(SIGINT, stop);// Start polling for messages.while (run) {rd_kafka_message_t *consumer_message;consumer_message = rd_kafka_consumer_poll(consumer, 500);if (!consumer_message) {g_message("Waiting...");continue;}if (consumer_message->err) {if (consumer_message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {/* We can ignore this error - it just means we've read* everything and are waiting for more data.*/} else {g_message("Consumer error: %s",rd_kafka_message_errstr(consumer_message));return 1;}} else {g_message("Consumed event from topic %s: key = %.*s value = %s",rd_kafka_topic_name(consumer_message->rkt),(int)consumer_message->key_len, (char *)consumer_message->key,(char *)consumer_message->payload);}// Free the message when we're done.rd_kafka_message_destroy(consumer_message);}// Close the consumer: commit final offsets and leave the group.g_message("Closing consumer");rd_kafka_consumer_close(consumer);// Destroy the consumer.rd_kafka_destroy(consumer);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/293399.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

踏入网页抓取的旅程:使用 grequests 构建 Go 视频下载器

引言 在当今数字化的世界中&#xff0c;网页抓取技术变得越来越重要。无论是获取数据、分析信息&#xff0c;还是构建自定义应用程序&#xff0c;我们都需要从互联网上抓取数据。本文将介绍如何使用 Go 编程语言和 grequests 库来构建一个简单的 Bilibili 视频下载器&#xff…

UE4_碰撞_碰撞蓝图节点——Line Trace For Objects(对象的线条检测)

一、Line Trace For Objects&#xff08;对象的线条检测&#xff09;&#xff1a;沿给定线条执行碰撞检测并返回遭遇的首个命中&#xff0c;这只会找到由Object types指定类型的对象。注意他与Line Trace By Channel(由通道检测线条&#xff09;的区别&#xff0c;一个通过Obje…

AI如何影响装饰器模式与组合模式的选择与应用

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》《MYSQL应用》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 &#x1f680; 转载自热榜文章&#xff1a;设计模式深度解析&#xff1a;AI如何影响…

腾讯云轻量2核2G3M云服务器优惠价格61元一年,限制200GB月流量

腾讯云轻量2核2G3M云服务器优惠价格61元一年&#xff0c;配置为轻量2核2G、3M带宽、200GB月流量、40GB SSD盘&#xff0c;腾讯云优惠活动 yunfuwuqiba.com/go/txy 活动链接打开如下图&#xff1a; 腾讯云轻量2核2G云服务器优惠价格 腾讯云&#xff1a;轻量应用服务器100%CPU性能…

计算机网络—VLAN 间路由配置

目录 1.拓扑图 2.实验环境准备 3.为 R3 配置 IP 地址 4.创建 VLAN 5.配置 R2 上的子接口实现 VLAN 间路由 6.配置文件 1.拓扑图 2.实验环境准备 配置R1、R3和S1的设备名称&#xff0c;并按照拓扑图配置R1的G0/0/1接口的IP地址。 [Huawei]sysname R1 [R1]interface Giga…

Kubernetes之Projected Volume

目录 四种Projected Volume Secret 使用方法 应用场景 示例 ConfigMap 使用方法 应用场景 示例 Downward API 使用方法 应用场景 示例 ServiceAccountToken 使用方法 应用场景 示例 在 Kubernetes 中,有几类特殊的 Volume,它们存在的意义不是为了存放容器里的…

家庭网络防御系统搭建-配置流量镜像到NDR系统

由于需要将家庭网络中的全部流量送到NDR分析系统进行分析&#xff0c;因此需要一个具备流量镜像功能的交换机或者路由器。在前面文章所提及的家庭网络架构中&#xff0c;需要一台交换机即可拷贝东西向流量以及南北向流量。当然如果家庭中的路由器或者其他设备具备交换机镜像功能…

【IntelliJ IDEA】运行测试报错解决方案(附图)

IntelliJ IDEA 版本 2023.3.4 (Ultimate Edition) 测试报错信息 命令行过长。 通过 JAR 清单或通过类路径文件缩短命令行&#xff0c;然后重新运行 解决方案 修改运行配置&#xff0c;里面如果没有缩短命令行&#xff0c;需要再修改选项里面勾选缩短命令行让其显示&#x…

在宝塔面板中,为自己的云服务器安装SSL证书,为所搭建的网站启用https(主要部分攻略)

前提条件 My HTTP website is running Nginx on Debian 10&#xff08;或者11&#xff09; 时间&#xff1a;2024-3-28 16:25:52 你的网站部署在Debain 10&#xff08;或者11&#xff09;的 Nginx上 安装单域名证书&#xff08;默认&#xff09;&#xff08;非泛域名&#xf…

前端学习<二>CSS基础——15-Sass入门

Sass简介 大家都知道&#xff0c;js 中可以自定义变量&#xff0c;css 仅仅是一个标记语言&#xff0c;不是编程语言&#xff0c;因此不可以自定义变量、不可以引用等等。 面对这些问题&#xff0c;我们现在来引入 Sass&#xff0c;简单的说&#xff0c;他是 css 的升级版&am…

2024年网络安全运营体系建设方案

以下是部分WORD内容&#xff0c;请您参阅。如需下载完整WORD文件&#xff0c;请前往星球获取&#xff1a; 网络安全运营监控工作整体构想 工作目标及原则 工作目标 为进一步落实强化公司网络安全保障&#xff0c;有效支撑公司数字化转型战略&#xff0c;建立健全公司网省两级协…

说说线路巡检系统解决的那些实际问题

线路巡检系统无疑是解决实际问题的一把利剑&#xff0c;尤其在统一调配资源整合、检维修定位及沟通、人员自身安全、人员工作监督等方面展现出了显著优势。 通过线路巡检系统&#xff0c;我们能够轻松实现资源的统一调配和整合。在地图上定点&#xff0c;结合详细的GIS地理信息…

Linux速览(2)——环境基础开发工具篇(其二)

本章我们来继续介绍一些linux的常用工具 目录 三. Linux编译器-gcc/g使用 1. 代码生成可执行程序的过程 2. gcc如何完成 2.1 格式&#xff1a; 2.2 预处理(进行宏替换) 2.3 编译&#xff08;生成汇编&#xff09; 2.4 汇编&#xff08;生成机器可识别代码&#xff09; …

ZK友好代数哈希函数安全倡议

1. 引言 前序博客&#xff1a; ZKP中的哈希函数如何选择ZK-friendly 哈希函数&#xff1f;snark/stark-friendly hash函数Anemoi Permutation和Jive Compression模式&#xff1a;高效的ZK友好的哈希函数Tip5&#xff1a;针对Recursive STARK的哈希函数 随着Incrementally Ve…

Netty学习——源码篇9 Handler其他处理与异步处理

1 ChannelHandlerContext 每个ChannelHandler被添加到ChannelPipeline后&#xff0c;都会创建一个ChannelHandlerContext&#xff0c;并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加…

Redis超好用可视化工具--RedisInsight工具安装

RedisInsight 保姆级安装 RedisInsight 是Redis官方出品的可视化redis管理工具&#xff0c;具有很强大的功能。接下来&#xff0c;让我们一起去完成这款炫酷工具的安装 1. RedisInsight 下载 RedisInsight 官方下载地址&#xff0c;https://redis.io/docs/connect/insight/ …

邀请媒体采访报道对企业宣传有何意义?

传媒如春雨&#xff0c;润物细无声的&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 邀请媒体采访报道对企业宣传具有多重意义&#xff1a; 提升品牌知名度和曝光度&#xff1a;媒体是信息传播的重要渠道&#xff0c;通过媒体的报道&#xff0c;企业及其活动、产品能够…

从vrrp、bfd、keepalived到openflow多控制器--理论篇

vrrp 在一个网络中&#xff0c;通常会使用vrrp技术来实现网关的高可用。 vrrp&#xff0c;即Virtual Router Redundancy Protocol&#xff0c;虚拟路由冗余协议。 应用场景 典型的如下面这个例子&#xff1a; 当Router故障后&#xff0c;将会导致HostA-C都无法连接外部的I…

C#.net8创建webapi,使用SqlSugar,仓储模式,DTO,服务层,控制层的综合应用(企业级)

本文源码地址: https://download.csdn.net/download/u012563853/89036104 源码中,也有详细的注释说明。 代码总览: 这是一个综合性比较强的文章,需要有一定的基础,没有基础的人,看了后,会全面的了解一下,有基础的人,看了后会加强认识,更加的巩固,直接在项目中去应…

自贡市第一人民医院:超融合与 SKS 承载 HIS 等核心业务应用,加速国产化与云原生转型

自贡市第一人民医院始建于 1908 年&#xff0c;现已发展成为集医疗、科研、教学、预防、公共卫生应急处置为一体的三级甲等综合公立医院。医院建有“全国综合医院中医药工作示范单位”等 8 个国家级基地&#xff0c;建成高级卒中中心、胸痛中心等 6 个国家级中心。医院日门诊量…