「Kafka」监控、集成篇

Kafka-Eagle 监控

Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。

MySQL环境准备

Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。

安装步骤参考:P61 尚硅谷 kafka监控_MySQL环境准备

Kafka 环境准备

  1. 关闭 Kafka 集群

    [atguigu@hadoop102 kafka]$ kf.sh stop
    
  2. 修改 /opt/module/kafka/bin/kafka-server-start.sh

    [atguigu@hadoop102 kafka]$ vim bin/kafka-server-start.sh
    

    修改如下参数值:

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    fi
    

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"export JMX_PORT="9999"#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    fi
    

    初始内存只分配1G,如果要使用 Eagle 功能,我们可以将内存设置为 2G。

    注意:修改之后在启动 Kafka 之前要分发至其他节点。

    [atguigu@hadoop102 bin]$ xsync kafka-server-start.sh
    

Kafka-Eagle 安装

  1. 官网:https://www.kafka-eagle.org/

  2. 上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群 /opt/software 目录

  3. 解压到本地

    [atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
    
  4. 进入刚才解压的目录

    [atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ ll
    
    总用量 79164
    -rw-rw-r--. 1 atguigu atguigu 81062577 1013 00:00 efak-web-2.0.8-bin.tar.gz
    
  5. efak-web-2.0.8-bin.tar.gz 解压至 /opt/module

    [atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/
    
  6. 修改名称

    [atguigu@hadoop102 module]$ mv efak-web-2.0.8/ efak
    
  7. 修改配置文件/opt/module/efak/conf/system-config.properties

    [atguigu@hadoop102 conf]$ vim system-config.properties
    
    ######################################
    # multi zookeeper & kafka cluster list
    # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.'instead
    ######################################
    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
    ######################################
    # zookeeper enable acl
    ######################################
    cluster1.zk.acl.enable=false
    cluster1.zk.acl.schema=digest
    cluster1.zk.acl.username=test
    cluster1.zk.acl.password=test123
    ######################################
    # broker size online list
    ######################################
    cluster1.efak.broker.size=20
    ######################################
    # zk client thread limit
    ######################################
    kafka.zk.limit.size=32
    ######################################
    # EFAK webui port
    ######################################
    efak.webui.port=8048
    ######################################
    # kafka jmx acl and ssl authenticate
    ######################################
    cluster1.efak.jmx.acl=false
    cluster1.efak.jmx.user=keadmin
    cluster1.efak.jmx.password=keadmin123
    cluster1.efak.jmx.ssl=false
    cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
    cluster1.efak.jmx.truststore.password=ke123456
    ######################################
    # kafka offset storage
    ######################################
    # offset 保存在 kafka
    cluster1.efak.offset.storage=kafka
    ######################################
    # kafka jmx uri
    ######################################
    cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
    ######################################
    # kafka metrics, 15 days by default
    ######################################
    efak.metrics.charts=true
    efak.metrics.retain=15
    ######################################
    # kafka sql topic records max
    ######################################
    efak.sql.topic.records.max=5000
    efak.sql.topic.preview.records.max=10
    ######################################
    # delete kafka topic token
    ######################################
    efak.topic.token=keadmin
    ######################################
    # kafka sasl authenticate
    ######################################
    cluster1.efak.sasl.enable=false
    cluster1.efak.sasl.protocol=SASL_PLAINTEXT
    cluster1.efak.sasl.mechanism=SCRAM-SHA-256
    cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramL
    oginModule required username="kafka" password="kafka-eagle";
    cluster1.efak.sasl.client.id=
    cluster1.efak.blacklist.topics=
    cluster1.efak.sasl.cgroup.enable=false
    cluster1.efak.sasl.cgroup.topics=
    cluster2.efak.sasl.enable=false
    cluster2.efak.sasl.protocol=SASL_PLAINTEXT
    cluster2.efak.sasl.mechanism=PLAIN
    cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainL
    oginModule required username="kafka" password="kafka-eagle";
    cluster2.efak.sasl.client.id=
    cluster2.efak.blacklist.topics=
    cluster2.efak.sasl.cgroup.enable=false
    cluster2.efak.sasl.cgroup.topics=
    ######################################
    # kafka ssl authenticate
    ######################################
    cluster3.efak.ssl.enable=false
    cluster3.efak.ssl.protocol=SSL
    cluster3.efak.ssl.truststore.location=
    cluster3.efak.ssl.truststore.password=
    cluster3.efak.ssl.keystore.location=
    cluster3.efak.ssl.keystore.password=
    cluster3.efak.ssl.key.password=
    cluster3.efak.ssl.endpoint.identification.algorithm=https
    cluster3.efak.blacklist.topics=
    cluster3.efak.ssl.cgroup.enable=false
    cluster3.efak.ssl.cgroup.topics=
    ######################################
    # kafka sqlite jdbc driver address
    ######################################
    # 配置 mysql 连接
    efak.driver=com.mysql.jdbc.Driver
    efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=root
    efak.password=000000
    ######################################
    # kafka mysql jdbc driver address
    ######################################
    #efak.driver=com.mysql.cj.jdbc.Driver
    #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    #efak.username=root
    #efak.password=123456
    
  8. 添加环境变量

    [atguigu@hadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh
    
    # kafkaEFAK
    export KE_HOME=/opt/module/efak
    export PATH=$PATH:$KE_HOME/bin
    

    注意:source /etc/profile

    [atguigu@hadoop102 conf]$ source /etc/profile
    
  9. 启动

    • 注意:启动之前需要先启动 zk 以及 kafka

      [atguigu@hadoop102 kafka]$ kf.sh start
      
    • 启动 efak

      [atguigu@hadoop102 efak]$ bin/ke.sh start
      
      Version 2.0.8 -- Copyright 2016-2021
      *****************************************************************
      * EFAK Service has started success.
      * Welcome, Now you can visit 'http://192.168.10.102:8048'
      * Account:admin ,Password:123456
      *****************************************************************
      * <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
      * <Usage> https://www.kafka-eagle.org/ </Usage>
      *****************************************************************
      
    • 如果停止 efak,执行命令:

      [atguigu@hadoop102 efak]$ bin/ke.sh stop
      

Kafka-Eagle 页面操作

  • 登录页面查看监控数据
    • http://192.168.10.102:8048/

image-20240204143036443

主面板

image-20240204143050044

Brokers

image-20240218110059499

Topics

image-20240218110144531

Zookeepers

image-20240218110250816

Consumers

image-20240218110331631

大屏信息

image-20240218110432475

Kafka-Kraft 模式

Kafka-Kraft 架构

image-20240204143136303

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。

右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。

这样做的好处有以下几个:

  • Kafka 不再依赖外部框架,而是能够独立运行;
  • controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
  • controller 不再动态选举,而是由配置文件规定。
    • 这样我们可以有针对性的加强 controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。

Kafka-Kraft 集群部署

image-20240204143313058

image-20240204143325634

image-20240204143335374

image-20240204143618688

image-20240204143638105

Kafka-Kraft 集群启动停止脚本

  1. /home/atguigu/bin 目录下创建文件 kf2.sh 脚本文件

    [atguigu@hadoop102 bin]$ vim kf2.sh
    

    脚本如下:

    #! /bin/bashcase $1 in
    "start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties"done
    };;
    "stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka2-------"ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "done
    };;
    esac
    
  2. 添加执行权限

    [atguigu@hadoop102 bin]$ chmod +x kf2.sh
    
  3. 启动集群命令

    [atguigu@hadoop102 ~]$ kf2.sh start
    
  4. 停止集群命令

    [atguigu@hadoop102 ~]$ kf2.sh stop
    

Kafka 集成

Kafka 集成 Flume

Flume 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。

image-20240204143942891

Flume 环境准备

  1. 启动 kafka 集群

    [atguigu@hadoop102 ~]$ zk.sh start
    [atguigu@hadoop102 ~]$ kf.sh start
    
  2. 启动 kafka 消费者

    [atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    
  3. Flume 安装步骤

    • 参考:P66 尚硅谷 Kafka 集成 Flume 环境准备

Flume 生产者

  • 通过 Flume 实时监控 app.log 文件数据的变化
  • 使用 taildir source,支持断点续传、实时监控文件变化,并获取到数据
  • 由于我们传输的就是普通的日志,没有必要追求太高的可靠性,使用 memory channel,完全基于内存,速度非常快;断电后会丢数据,最多丢 100 条日志(因为内存大小最大上线就是 100)
  • 数据是发往到 kafka 的,所以使用 kafka sink
  • 发到 first 主题中,启动消费者消费。

image-20240204144002200

  1. 配置 Flume

    • 在 hadoop102 节点的 Flume 的 job 目录下创建 file_to_kafka.conf

      [atguigu@hadoop102 flume]$ mkdir jobs
      [atguigu@hadoop102 flume]$ vim jobs/file_to_kafka.conf 
      
    • 配置文件内容如下:

      # 1 组件定义
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1# 2 配置source
      a1.sources.r1.type = TAILDIR
      a1.sources.r1.filegroups = f1
      a1.sources.r1.filegroups.f1 = /opt/module/applog/app.* # 监控文件目录
      a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # offset文件 支持断点续传# 3 配置channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100# 4 配置sink
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
      a1.sinks.k1.kafka.topic = first
      a1.sinks.k1.kafka.flumeBatchSize = 20
      a1.sinks.k1.kafka.producer.acks = 1
      a1.sinks.k1.kafka.producer.linger.ms = 1# 5 拼接组件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
  2. 启动 Flume

    [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f  jobs/file_to_kafka.conf &
    
  3. /opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况

    [atguigu@hadoop102 module]$ mkdir applog
    [atguigu@hadoop102 applog]$ echo hello >> /opt/module/applog/app.log
    
  4. 观察 kafka 消费者,能够看到消费的 hello 数据

    image-20240221151429346

Flume 消费者

  • Flume 作为消费者,首先肯定选用 kafka source
  • 通道选择 memory channel
  • 打印到控制台选择 logger sink

image-20240204144036222

  1. 配置 Flume

    • 在 hadoop102 节点的 Flume 的 /opt/module/flume/jobs 目录下创建 kafka_to_file.conf

      [atguigu@hadoop102 jobs]$ vim kafka_to_file.conf
      
    • 配置文件内容如下:

      # 1 组件定义
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1# 2 配置source
      a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.r1.batchSize = 50
      a1.sources.r1.batchDurationMillis = 200
      a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
      a1.sources.r1.kafka.topics = first
      a1.sources.r1.kafka.consumer.group.id = custom.g.id# 3 配置channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100# 4 配置sink
      a1.sinks.k1.type = logger# 5 拼接组件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
  2. 启动 Flume

    [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
    
  3. 启动 kafka 生产者

    [atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    

    并输入数据,例如:hello

  4. 观察控制台输出的日志

    image-20240221151300792

Kafka 集成 Flink

Flink是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。

image-20240220170317460

Flink 环境准备

  1. 创建一个 maven 项目 flink-kafka

  2. 添加配置文件

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.0</version></dependency>
    </dependencies>
    
  3. 将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error

    log4j.rootLogger=error, stdout,R
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender
    log4j.appender.R.File=../log/agent.log
    log4j.appender.R.MaxFileSize=1024KB
    log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout
    log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n
    
  4. 在 java 文件夹下创建包名为 com.atguigu.flink

Flink 生产者

  1. com.atguigu.flink 包下创建 java 类:FlinkKafkaProducer1(系统也有一个 FlinkKafkaProducer,会重名,所以这里命名为 1)。

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;import java.util.ArrayList;
    import java.util.Properties;public class FlinkKafkaProducer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); // 3个槽 对应kafka主题题的3个分区// 1 准备数据源 读取集合中数据ArrayList<String> wordsList = new ArrayList<>();wordsList.add("hello");wordsList.add("atguigu");DataStream<String> stream = env.fromCollection(wordsList);// 2 kafka生产者配置信息Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 3 创建kafka生产者FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(), // 序列化和反序列化模板类 string类型properties);// 4 生产者和flink流关联stream.addSink(kafkaProducer);// 5 执行env.execute();}
    }
    
  2. 启动Kafka消费者

    [atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    
  3. 执行 FlinkKafkaProducer1 程序,观察 kafka 消费者控制台情况

    image-20240221174508165

    Q:

    • 为什么先接收到 atguigu,然后才是 hello 呢?

    A:

    • 在 Flink 中,对于并行度大于 1 的情况,不同的算子实例是并行运行的,也就是说当你的 env.setParallelism(3) 时,会有 3 个线程同时运行。在你的例子中,"hello""atguigu" 可能由不同的线程处理,并且处理的顺序是不确定的。
    • 如果你希望严格按照顺序处理,你可以将并行度设置为 1,即 env.setParallelism(1)。但是这样可能会影响处理速度。此外,Flink 也提供了一些方法来保证在并行处理时的顺序,可以查阅相关资料来了解更多。

Flink 消费者

  1. com.atguigu.flink 包下创建 java 类:FlinkKafkaConsumer1

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;public class FlinkKafkaConsumer1 {public static void main(String[] args) throws Exception {// 0 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);// 1 kafka消费者配置信息Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// group.id可选,不配置不会报错// 2 创建kafka消费者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleStringSchema(),properties);// 3 消费者和flink流关联env.addSource(kafkaConsumer).print();// 4 执行env.execute();}
    }
    
  2. 启动 FlinkKafkaConsumer1 消费者

  3. 启动 kafka 生产者

    [atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    
  4. 观察 IDEA 控制台数据打印

    image-20240222150646614

    有 3 个消费者并行消费,因为只发了两条消息,所以这里只有 1 和 3。

Kafka 集成 SpringBoot

SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者。

image-20240221151944111

跟之前不太一样的是,外部数据是通过接口的方式发送到 SpringBoot 程序,然后 SpringBoot 接收到这个接口的数据,然后再发送到 kafka 集群。

SpringBoot 环境准备

  1. 在 IDEA 中安装 lombok 插件

    在 Plugins 下搜索 lombok 然后在线安装即可,安装后注意重启

    image-20240222151143427

  2. 创建一个 Spring Initializr

    image-20240222151315880

    注意:有时候SpringBoot官方脚手架不稳定,我们切换国内地址:https://start.aliyun.com

  3. 项目名称 springboot

    image-20240222151454439

  4. 添加项目依赖

    image-20240222151506918

    image-20240222151528661

    image-20240222151539501

    image-20240222151553083

  5. 检查自动生成的配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu</groupId><artifactId>springboot</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
    

SpringBoot 生产者

  1. 修改 SpringBoot 核心配置文件 application.propeties,添加生产者相关信息

    # 应用名称
    spring.application.name=atguigu_springboot_kafka# 指定kafka的地址
    spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的序列化器
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
  2. 创建 controller 从浏览器接收数据,并写入指定的 topic

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;@RestController
    public class ProducerController {// Kafka模板用来向kafka发送数据@AutowiredKafkaTemplate<String, String> kafka;@RequestMapping("/atguigu")public String data(String msg) {kafka.send("first", msg);return "ok";}
    }
    
  3. 在浏览器中给 /atguigu 接口发送数据

    http://localhost:8080/atguigu?msg=hello

  4. kafka 消费者接收到数据

    image-20240222152810440

SpringBoot 消费者

  1. 修改 SpringBoot 核心配置文件 application.propeties

    # =========消费者配置开始=========
    # 指定kafka的地址
    spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092# 指定key和value的反序列化器
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 指定消费者组的group_id
    spring.kafka.consumer.group-id=atguigu
    # =========消费者配置结束=========
    
  2. 创建类消费 Kafka 中指定 topic 的数据

    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.KafkaListener;@Configuration
    public class KafkaConsumer {// 指定要监听的topic@KafkaListener(topics = "first")public void consumeTopic(String msg) { // 参数: 收到的valueSystem.out.println("收到的信息: " + msg);}
    }
    
  3. 向 first 主题发送数据

    [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    > atguigu
    
  4. SpringBoot 消费者接收到数据

    image-20240222153132773

Kafka 集成 Spark

Spark 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。

image-20240222152320221

Spark 环境准备

  1. Scala 环境准备

    • 参考:P73 尚硅谷 Kafka 集成 Spark 生产者

    Spark 的底层源码是用 Scala 编写的。

  2. 创建一个 maven 项目 spark-kafka

  3. 在项目 spark-kafka 上点击右键,Add Framework Support => 勾选 scala

  4. 在 main 下创建 scala 文件夹,并右键 Mark Directory as Sources Root => 在 scala 下创建包名为 com.atguigu.spark

  5. 添加配置文件

    <dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency>
    </dependencies>
    
  6. 将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error

    log4j.rootLogger=error, stdout,R
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender
    log4j.appender.R.File=../log/agent.log
    log4j.appender.R.MaxFileSize=1024KB
    log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout
    log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n
    

Spark 生产者

  1. 在 com.atguigu.spark 包下创建 scala ObjectSparkKafkaProducer

    image-20240222154814420

    import java.util.Properties
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object SparkKafkaProducer {def main(args: Array[String]): Unit = {// 0 kafka配置信息val properties = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])// 1 创建kafka生产者var producer = new KafkaProducer[String, String](properties)// 2 发送数据for (i <- 1 to 5) {producer.send(new ProducerRecord[String, String]("first", "atguigu" + i))}// 3 关闭资源producer.close()}
    }
    
  2. 启动 Kafka 消费者

    [atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    
  3. 执行 SparkKafkaProducer 程序,观察 kafka 消费者控制台情况

    image-20240222155201137

Spark 消费者

  1. 添加配置文件

    <dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency>
    </dependencies>
    
  2. 在 com.atguigu.spark 包下创建 scala ObjectSparkKafkaConsumer

    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object SparkKafkaConsumer {def main(args: Array[String]): Unit = {// 1.创建SparkConfval sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")// 2.创建StreamingContext 初始化上下文环境// Seconds(3):时间窗口,批处理间隔,表示每隔3秒钟,Spark Streaming就会收集一次数据进行处理。val ssc = new StreamingContext(sparkConf, Seconds(3))// 3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup")// 4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, // 上下文环境LocationStrategies.PreferConsistent, // 数据存储位置 优先位置ConsumerStrategies.Subscribe[String, String](Set("first"), kafkaPara) // 消费策略:(订阅多个主题,配置参数) )// 5.将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())// 6.计算WordCountvalueDStream.print()// 7.开启任务 并阻塞(使程序一直执行)ssc.start()ssc.awaitTermination()}
    }
    
  3. 启动 SparkKafkaConsumer 消费者

  4. 启动 kafka 生产者

    [atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    
  5. 观察IDEA控制台数据打印

    image-20240222160506298


笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/262557.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何在同一个module里面集成多个数据库的多张表数据

确保本公司数据安全&#xff0c;通常对数据的管理采取很多措施进行隔离访问。 但是&#xff0c;Mendix应怎样访问散布于异地的多个数据库呢&#xff1f; 前几期我们介绍过出海跨境的大企业对于Mendix的技术、人才的诉求后&#xff0c;陆陆续续有其他客户希望更聚焦具体的实际场…

数据结构链表力扣例题AC(4)——代码以及思路记录

21. 合并两个有序链表 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 AC struct ListNode* mergeTwoLists(struct ListNode* list1, struct ListNode* list2) {if(list1 NULL){return list2;}if(list2 NULL){return l…

alibabacloud学习笔记06(小滴课堂)

讲Sentinel流量控制详细操作 基于并发线程进行限流配置实操 在浏览器打开快速刷新会报错 基于并发线程进行限流配置实操 讲解 微服务高可用利器Sentinel熔断降级规则 讲解服务调用常见的熔断状态和恢复 讲解服务调用熔断例子 我们写一个带异常的接口&#xff1a;

正交匹配追踪(Orthogonal Matching Pursuit, OMP)的MATLAB实现

压缩感知&#xff08;Compressed Sensing, CS&#xff09;是一种利用稀疏信号的先验知识&#xff0c;用远少于奈奎斯特采样定理要求的样本数目恢复整个信号的技术。正交匹配追踪&#xff08;Orthogonal Matching Pursuit, OMP&#xff09;是一种常见的贪婪算法&#xff08;Gree…

OCPP 1.6 接入实现文档

一、简介 OCPP&#xff08;Open Charge Point Protocol&#xff09;是一个开放的通信协议&#xff0c;用于充电站&#xff08;Charge Point&#xff09;与中央系统&#xff08;Central System&#xff0c;如充电站管理系统或服务提供商平台&#xff09;之间的通讯。本篇文档将…

谷歌搜索引擎关键词优化,竞价排名怎么做?大舍传媒

公司 大舍传媒成立于2005年&#xff0c;并从那时开始专注于谷歌搜索引擎优化&#xff08;SEO&#xff09;。如今&#xff0c;我们已经拥有了十八年的海外数字营销经验。我们为全球数千个国际知名品牌客户提供服务&#xff0c;是一家专注于技术的公司。 谷歌排名成果 在谷歌&…

Windows系统中定时执行python脚本

背景&#xff1a;本地Windows系统指定目录下会有文件的修改新增&#xff0c;这些变化的文件需要定时的被上传到git仓库中&#xff0c;这样不需要每次变更手动上传了。 首先编写一个检测文件夹下文件变化并且上传git仓库的python脚本(确保你已经在E:\edc_workspace\data_edc_et…

10.vue学习笔记(组件数据传递-props回调函数子传父+透传Attributes+插槽slot)

文章目录 1.组件数据传递2.透传Attributes&#xff08;了解&#xff09;禁用Attributes继承 3.插槽slot 1.组件数据传递 我们之前讲解过了组件之间的数据传递&#xff0c;props 和 自定义事件 两种方式 props&#xff1a;父传子 自定义事件&#xff1a;子传父 props通过额外方…

dell戴尔电脑灵越系列Inspiron 15 3520原厂Win11系统中文版/英文版

Dell戴尔笔记本灵越3520原装出厂Windows11系统包&#xff0c;恢复出厂开箱预装OEM系统 链接&#xff1a;https://pan.baidu.com/s/1mMOAnvXz5NCDO_KImHR5gQ?pwd3nvw 提取码&#xff1a;3nvw 原厂系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、Office办公软件、MyD…

2024.2.22 C++QT 作业

思维导图 练习题 1>完善对话框&#xff0c;点击登录对话框&#xff0c;如果账号和密码匹配&#xff0c;则弹出信息对话框&#xff0c;给出提示”登录成功“&#xff0c;提供一个Ok按钮&#xff0c;用户点击Ok后&#xff0c;关闭登录界面&#xff0c;跳转到其他界面。如果账…

Redis 工具类 与 Redis 布隆过滤器

Redis 工具类 1. 核心依赖 <!--redis--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency><groupId>com.google.guava…

leetcode(算法) 83.删除排序链表中的重复元素(python版)

需求 给定一个已排序的链表的头 head &#xff0c; 删除所有重复的元素&#xff0c;使每个元素只出现一次 。返回 已排序的链表 。 示例 1&#xff1a; 输入&#xff1a;head [1,1,2] 输出&#xff1a;[1,2] 示例 2&#xff1a; 输入&#xff1a;head [1,1,2,3,3] 输出&…

【Unity3D】ASE制作天空盒

找到官方shader并分析 下载对应资源包找到\DefaultResourcesExtra\Skybox-Cubed.shader找到\CGIncludes\UnityCG.cginc观察变量, 观察tag, 观察代码 需要注意的内容 ASE要处理的内容 核心修改 添加一个Custom Expression节点 code内容为: return DecodeHDR(In0, In1);outp…

jenkins报错:Pseudo-terminal will not be allocated because stdin is not a terminal

jenkins的流水线部分代码如下 sh ssh root192.168.2.234 << remotessh cd /var/lib/jenkins/workspace/txkc /usr/local/maven/apache-maven-3.8.6/bin/mvn clean package -U ls remotessh执行流水线出现报错&#xff1a;Pseudo-terminal will not be allocated because…

【数据结构】排序(1)

目录 一、概念&#xff1a; 二、直接插入排序&#xff1a; 三、希尔排序&#xff1a; 四、直接选择排序&#xff1a; 五、堆排序&#xff1a; 六、冒泡排序&#xff1a; 一、概念&#xff1a; 排序的概念&#xff1a; 使一串记录&#xff0c;按照其中的某个或某些关键字…

【Crypto | CTF】BUUCTF RSA2

天命&#xff1a;密码学越来越难了&#xff0c;看别人笔记都不知道写啥 天命&#xff1a;莫慌&#xff0c;虽然我不会推演法&#xff0c;但我可以用归纳法 虽然我不知道解题的推演&#xff0c;但我可以背公式啊哈哈哈 虽然我不会这题&#xff0c;但是我也能做出来 公式我不知…

百度百科词条在网络推广中的六大作用

也许很多网友都发现了&#xff0c;在网上查资料&#xff0c;百科词条往往是优先展示的。一方面因为百科是搜索引擎自身的平台&#xff0c;另一方面就是因为百科信息权威&#xff0c;网友认可度高。所以企业开展网络营销&#xff0c;百科营销是一块重要阵地。 也有的企业认为百科…

代码检测规范和git提交规范

摘要&#xff1a;之前开发的项目&#xff0c;代码检测和提交规范都是已经配置好的&#xff0c;最近自己新建的项目就记录下相关配置过程。 1. ESlint配置 2013年6月创建开源项目&#xff0c;提供一个插件化的JavaScript代码检测工具&#xff0c;创建项目是生成的eslintrc.js文…

Elasticsearch从入门到精通-01认识Elasticsearch

Elasticsearch从入门到精通-01认识Elasticsearch &#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是程序员行走的鱼 &#x1f342;博主从本篇正式开始ES学习&#xff0c;希望小伙伴可以一起探讨 &#x1f4d6; 本篇主要介绍和大家一块简单认识下ES并了解ES中的主要角色…

装饰模式(Decorator Pattern)

定义 装饰模式&#xff08;Decorator Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许通过将对象包装在装饰器类的实例中来动态地添加新的行为和责任。这种模式可以在不修改现有代码的情况下&#xff0c;灵活地扩展对象的功能。 示例 考虑一个咖啡店的场景&…