🌻🌻 目录
- 一、Kafka-Eagle 监控
- 1.1 MySQL 环境准备
- 1.2 Kafka 环境准备
- 1.3 Kafka-Eagle 安装
- 1.4 Kafka-Eagle 页面操作
- 二、集成 SpringBoot
- 2.1 前期准备
- 2.2 SpringBoot 生产者
- 2.3 SpringBoot 消费者
- 三、集成 Spark(拓展 Scala 语言)
- 3.1 Scala 入门
- 3.1.1 Scala 环境搭建
- 3.1.2 Scala 插件安装
- 3.2 前期准备
- 3.3 Spark 生产者
- 3.4 Spark 消费者
一、Kafka-Eagle 监控
Kafka-Eagle
框架可以监控Kafka
集群的整体运行情况,在生产环境中经常使用。
1.1 MySQL 环境准备
Kafka-Eagle
的安装依赖于MySQL
,MySQL
主要用来存储可视化展示的数据。如果你的集群中之前安装过MySQL
可以跨过该步。
安装参考 👉👉 三、 CentOS-7.5 上面安装 mysql 5.7.16
1.2 Kafka 环境准备
1)关闭
Kafka
./kafka-server-stop.sh
2)修改
/usr/local/kafka/bin/kafka-server-start.sh
命令中
vi kafka-server-start.sh
修改如下参数值:
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
注意:修改后先分发之其他节点(仅集群)
下面操作完再启动。
1.3 Kafka-Eagle 安装
- 0)官网 下载 kafka-eagle-bin-2.0.8.tar.gz
- 1)本地资源库下载
①
②
- 2)上传压缩包
kafka-eagle-bin-2.0.8.tar.gz
到集群/usr/local
下面
- 3)解压到本地
tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
- 3)进入刚才解压的目录
- 4)将
efak-web-2.0.8-bin.tar.gz
解压至/usr/local
tar -zxvf efak-web-2.0.8-bin.tar.gz -C /usr/local/cd /usr/local/ls
5)修改名称
mv efak-web-2.0.8 efak
- 6)修改配置文件
/usr/local/efak/conf/system-config.properties
vi system-config.properties
修改如下:
efak.zk.cluster.alias=cluster1
cluster1.zk.list=linux-102:2181/kafka
cluster1.efak.offset.storage=kafka
# 配置mysql连接
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://linux-102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
7)添加环境变量
#efak
export KE_HOME=/usr/local/efak
export PATH=$PATH:$KE_HOME/binsource /etc/profile
8)启动
- (1)注意:启动之前需要先启动
ZK
以及kafka
。
(2)启动efak
cd /usr/local/efak/binls./ke.sh start
启动成功如下显示:
说明:如果停止efak,执行命令。./ke.sh stop
1.4 Kafka-Eagle 页面操作
1)登录页面查看监控数据
- 浏览器输入:
http://192.168.10.102:8048
用户名:admin
密码:123456
如下所示:
① 登录页
② 首页
③ 监控查看页面
二、集成 SpringBoot
2.1 前期准备
SpringBoot
是一个在JavaEE开发中非常常用的组件。可以用于Kafka
的生产者,也可以用于SpringBoot
的消费者。
1)在
IDEA
中安装lombok
插件
- 在
Plugins
下搜索lombok
然后在线安装即可,安装后注意重启
2)SpringBoot环境准备
- (1)创建一个
Spring Initializr
- 注意:有时候SpringBoot官方脚手架不稳定,我们切换国内地址
https://start.aliyun.com
- (2)创建工程
springboot
①
②
③ 添加项目依赖
④
⑤
自动生成的配置文件如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.9.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.gansu</groupId><artifactId>springboot</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot</name><description>Demo project for Spring Boot</description><url/><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
2.2 SpringBoot 生产者
(1)修改
SpringBoot
核心配置文件application.propeties
, 添加生产者相关信息
在创建好的类中写入 StringSerializer
,鼠标点击进去,右键复制全类名
# 指定kafka的地址
spring.kafka.bootstrap-servers=192.168.10.102:9092
#指定key和value的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
(2)创建
producerController
从浏览器接收数据, 并写入指定的topic
package com.gansu.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class producerController {@AutowiredKafkaTemplate<String,String> kafka;@RequestMapping("/producter")public String producter(String msg){kafka.send("second",msg);return "ok";}
}
(3)在浏览器中给/atguigu接口发送数据
http://localhost:8080/producter?msg=xiaojin
2.3 SpringBoot 消费者
(1)修改
SpringBoot
核心配置文件application.propeties
# =========消费者配置开始=========
# 指定key和value的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的group_id
spring.kafka.consumer.group-id=test
(2)创建类消费
Kafka
中指定topic
的数据ConsumerController
package com.gansu.consumer;import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;@Configuration
public class ConsumerController {@KafkaListener(topics = "second")public void consumer(String receive){System.out.println("消费者收到的数据为"+receive);}
}
(3)向first主题发送数据
三、集成 Spark(拓展 Scala 语言)
Spark
是一个在大数据开发中非常常用的组件。可以用于Kafka
的生产者,也可以用于Spark
的消费者。
3.1 Scala 入门
3.1.1 Scala 环境搭建
1)安装步骤
- (1)首先确保JDK1.8安装成功
- (2)下载对应的Scala安装文件
scala-2.12.11.zip
如下:
- scala官网 下载 👉👉 scala-2.12.11.zip
- 本地资源库获取 👉👉 链接:提取码:yyds
①
②
③
④
- (3)解压
scala-2.11.8.zip
到自己的磁盘目录
- (4)配置
Scala
的环境变量
注意1:解压路径不能有任何中文路径,最好不要有空格。
注意2:环境变量要大写SCALA_HOME
①
②
验证是否安装成功:
(1)在键盘上同时按
win+r
键,并在运行窗口输入cmd
命令,输入Scala
并按回车键,启动Scala环境。
scala
(2)定义两个变量,并计算求和。
var n:int = 10var n2:Int = 2var result:Int = n+n2
3.1.2 Scala 插件安装
默认情况下IDEA不支持Scala的开发,需要安装Scala插件。
1)插件在线安装(可选)
- (1)在搜索插件框里面输入Scala->点击Install->点击ok->点击apply。
- (2)重启IDEA,再次来到Scala插件页面,已经变成
Uninstall
。
3.2 前期准备
- (1)创建一个maven项目
spark-kafka
- (2)在项目
spark-kafka
上点击右键,Add Framework Support=》
勾选scala
- (3)在
main
下创建scala
文件夹,并右键Mark Directory as Sources Root=>
在scala
下创建包名为com.gansu.spark
- (4)在
pom.xml
添加依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency>
</dependencies>
(5)将log4j.properties
文件添加到resources
里面,就能更改打印日志的级别为error
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}%5p --- [%50t] %-80c(line:%5L):%m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L):%m%n
3.3 Spark 生产者
(1)在
com.gansu.spark
包下创建scala Object:SparkKafkaProducer
package com.gansu.sparkimport java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializerobject SparkKafkaProducer {//直接写个 main回车即可def main(args: Array[String]): Unit = {//scala 语言可以不用分号// 0 kafka配置信息val properties = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])// 1 创建kafka生产者val producer = new KafkaProducer[String,String](properties)// 2 发送数据for (elem <- 1 to 5) {producer.send(new ProducerRecord[String,String]("august","Daniel-xiaoJ"))}//关闭资源producer.close()}
}
(2)启动
Kafka
消费者
(3)执行SparkKafkaProducer程序,观察kafka消费者控制台情况
bin/kafka-topics.sh --bootstrap-server linux-102:9092 --topic august --create --partitions 1 --replication-factor 1bin/kafka-topics.sh --bootstrap-server linux-102:9092 --listbin/kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic august
3.4 Spark 消费者
(1)导入下面的依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version>
</dependency>
(2)在
com.gansu.spark
包下创建scala Object:SparkKafkaConsumer
package com.gansu.sparkimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object SparkKafkaConsumer {def main(args: Array[String]): Unit = {//1.创建SparkConfval sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")//2.创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.10.102:9092",ConsumerConfig.GROUP_ID_CONFIG -> "test",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])//4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //优先位置ConsumerStrategies.Subscribe[String, String](Set("august"), kafkaPara)// 消费策略:(订阅多个主题,配置参数))//5.将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())//6.计算WordCountvalueDStream.print()//7.开启任务ssc.start()ssc.awaitTermination()}
}
(3)启动
SparkKafkaConsumer
消费者
(4)启动
kafka
生产者
(5)观察IDEA控制台数据打印
bin/kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic august
文章源码
✌✌调优源码剖析高级后期更新
✌✌