四十二、大数据技术之Kafka3.x(5)

🌻🌻 目录

  • 一、Kafka-Eagle 监控
    • 1.1 MySQL 环境准备
    • 1.2 Kafka 环境准备
    • 1.3 Kafka-Eagle 安装
    • 1.4 Kafka-Eagle 页面操作
  • 二、集成 SpringBoot
    • 2.1 前期准备
    • 2.2 SpringBoot 生产者
    • 2.3 SpringBoot 消费者
  • 三、集成 Spark(拓展 Scala 语言)
    • 3.1 Scala 入门
      • 3.1.1 Scala 环境搭建
      • 3.1.2 Scala 插件安装
    • 3.2 前期准备
    • 3.3 Spark 生产者
    • 3.4 Spark 消费者

一、Kafka-Eagle 监控

Kafka-Eagle 框架可以监控Kafka集群的整体运行情况,在生产环境中经常使用。

1.1 MySQL 环境准备

Kafka-Eagle 的安装依赖于MySQLMySQL主要用来存储可视化展示的数据。如果你的集群中之前安装过MySQL可以跨过该步。

安装参考 👉👉 三、 CentOS-7.5 上面安装 mysql 5.7.16

1.2 Kafka 环境准备

1)关闭Kafka

在这里插入图片描述

./kafka-server-stop.sh

2)修改/usr/local/kafka/bin/kafka-server-start.sh命令中

在这里插入图片描述

vi kafka-server-start.sh

修改如下参数值:

在这里插入图片描述

export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
  • 注意:修改后先分发之其他节点(仅集群)下面操作完再启动。

1.3 Kafka-Eagle 安装

  • 0)官网 下载 kafka-eagle-bin-2.0.8.tar.gz
  • 1)本地资源库下载

在这里插入图片描述

在这里插入图片描述

  • 2)上传压缩包kafka-eagle-bin-2.0.8.tar.gz到集群/usr/local下面

在这里插入图片描述

  • 3)解压到本地

在这里插入图片描述

tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
  • 3)进入刚才解压的目录
  • 4)将efak-web-2.0.8-bin.tar.gz解压至/usr/local

在这里插入图片描述

tar -zxvf efak-web-2.0.8-bin.tar.gz -C  /usr/local/cd /usr/local/ls

在这里插入图片描述

5)修改名称

在这里插入图片描述

mv efak-web-2.0.8 efak
  • 6)修改配置文件 /usr/local/efak/conf/system-config.properties

在这里插入图片描述

vi system-config.properties

修改如下:

在这里插入图片描述

efak.zk.cluster.alias=cluster1
cluster1.zk.list=linux-102:2181/kafka

在这里插入图片描述

cluster1.efak.offset.storage=kafka

在这里插入图片描述

# 配置mysql连接
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://linux-102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root

7)添加环境变量

在这里插入图片描述

在这里插入图片描述

#efak
export KE_HOME=/usr/local/efak
export PATH=$PATH:$KE_HOME/binsource /etc/profile

8)启动

  • (1)注意:启动之前需要先启动ZK以及kafka

在这里插入图片描述

(2)启动efak

cd /usr/local/efak/binls./ke.sh start

在这里插入图片描述

启动成功如下显示:

在这里插入图片描述

说明:如果停止efak,执行命令。./ke.sh stop

1.4 Kafka-Eagle 页面操作

1)登录页面查看监控数据

  • 浏览器输入:http://192.168.10.102:8048用户名:admin 密码: 123456 如下所示:

① 登录页

在这里插入图片描述

② 首页

在这里插入图片描述

③ 监控查看页面

在这里插入图片描述

二、集成 SpringBoot

2.1 前期准备

SpringBoot是一个在JavaEE开发中非常常用的组件。可以用于Kafka的生产者,也可以用于SpringBoot的消费者。

在这里插入图片描述

1)在IDEA中安装lombok插件

  • Plugins下搜索lombok然后在线安装即可,安装后注意重启

在这里插入图片描述

2)SpringBoot环境准备

  • (1)创建一个Spring Initializr
  • 注意:有时候SpringBoot官方脚手架不稳定,我们切换国内地址https://start.aliyun.com
  • (2)创建工程 springboot

在这里插入图片描述

在这里插入图片描述

③ 添加项目依赖

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
自动生成的配置文件如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.9.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.gansu</groupId><artifactId>springboot</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot</name><description>Demo project for Spring Boot</description><url/><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

2.2 SpringBoot 生产者

(1)修改SpringBoot核心配置文件application.propeties, 添加生产者相关信息

在创建好的类中写入 StringSerializer,鼠标点击进去,右键复制全类名

在这里插入图片描述

# 指定kafka的地址
spring.kafka.bootstrap-servers=192.168.10.102:9092
#指定key和value的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

(2)创建producerController从浏览器接收数据, 并写入指定的topic

在这里插入图片描述

package com.gansu.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class producerController {@AutowiredKafkaTemplate<String,String> kafka;@RequestMapping("/producter")public String producter(String msg){kafka.send("second",msg);return "ok";}
}

(3)在浏览器中给/atguigu接口发送数据

http://localhost:8080/producter?msg=xiaojin

在这里插入图片描述

2.3 SpringBoot 消费者

(1)修改SpringBoot核心配置文件application.propeties

在这里插入图片描述

# =========消费者配置开始=========
# 指定key和value的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的group_id
spring.kafka.consumer.group-id=test

(2)创建类消费Kafka中指定topic的数据 ConsumerController

在这里插入图片描述

package com.gansu.consumer;import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;@Configuration
public class ConsumerController {@KafkaListener(topics = "second")public void consumer(String receive){System.out.println("消费者收到的数据为"+receive);}
}

(3)向first主题发送数据

在这里插入图片描述

三、集成 Spark(拓展 Scala 语言)

在这里插入图片描述

Spark是一个在大数据开发中非常常用的组件。可以用于Kafka的生产者,也可以用于Spark的消费者。

3.1 Scala 入门

3.1.1 Scala 环境搭建

1)安装步骤

  • (1)首先确保JDK1.8安装成功
  • (2)下载对应的Scala安装文件scala-2.12.11.zip 如下:
  • scala官网 下载 👉👉 scala-2.12.11.zip
  • 本地资源库获取 👉👉 链接:提取码:yyds

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • (3)解压scala-2.11.8.zip到自己的磁盘目录

在这里插入图片描述

  • (4)配置Scala的环境变量

注意1:解压路径不能有任何中文路径,最好不要有空格。
注意2:环境变量要大写SCALA_HOME

在这里插入图片描述

在这里插入图片描述

验证是否安装成功:

(1)在键盘上同时按win+r键,并在运行窗口输入cmd命令,输入Scala并按回车键,启动Scala环境。

在这里插入图片描述

scala

(2)定义两个变量,并计算求和。

在这里插入图片描述

var n:int = 10var n2:Int = 2var result:Int = n+n2

3.1.2 Scala 插件安装

默认情况下IDEA不支持Scala的开发,需要安装Scala插件。

1)插件在线安装(可选)

  • (1)在搜索插件框里面输入Scala->点击Install->点击ok->点击apply。

在这里插入图片描述

  • (2)重启IDEA,再次来到Scala插件页面,已经变成Uninstall

在这里插入图片描述

3.2 前期准备

  • (1)创建一个maven项目spark-kafka

在这里插入图片描述

在这里插入图片描述

  • (2)在项目spark-kafka上点击右键,Add Framework Support=》勾选scala

在这里插入图片描述

在这里插入图片描述

  • (3)在main下创建scala文件夹,并右键Mark Directory as Sources Root=>scala下创建包名为com.gansu.spark

在这里插入图片描述

在这里插入图片描述

  • (4)在pom.xml 添加依赖

在这里插入图片描述

<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency>
</dependencies>

(5)将log4j.properties文件添加到resources里面,就能更改打印日志的级别为error

在这里插入图片描述

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}%5p --- [%50t]  %-80c(line:%5L):%m%nlog4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L):%m%n

3.3 Spark 生产者

(1)在com.gansu.spark包下创建scala Object:SparkKafkaProducer

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

package com.gansu.sparkimport java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializerobject SparkKafkaProducer {//直接写个 main回车即可def main(args: Array[String]): Unit = {//scala 语言可以不用分号// 0 kafka配置信息val properties = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])// 1 创建kafka生产者val producer = new KafkaProducer[String,String](properties)// 2 发送数据for (elem <- 1 to 5) {producer.send(new ProducerRecord[String,String]("august","Daniel-xiaoJ"))}//关闭资源producer.close()}
}

(2)启动Kafka消费者
(3)执行SparkKafkaProducer程序,观察kafka消费者控制台情况

在这里插入图片描述

bin/kafka-topics.sh --bootstrap-server linux-102:9092 --topic august --create --partitions 1 --replication-factor 1bin/kafka-topics.sh --bootstrap-server linux-102:9092 --listbin/kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic august

3.4 Spark 消费者

(1)导入下面的依赖

在这里插入图片描述

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version>
</dependency>

(2)在com.gansu.spark包下创建scala Object:SparkKafkaConsumer

在这里插入图片描述

package com.gansu.sparkimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object SparkKafkaConsumer {def main(args: Array[String]): Unit = {//1.创建SparkConfval sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")//2.创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.10.102:9092",ConsumerConfig.GROUP_ID_CONFIG -> "test",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])//4.读取Kafka数据创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //优先位置ConsumerStrategies.Subscribe[String, String](Set("august"), kafkaPara)// 消费策略:(订阅多个主题,配置参数))//5.将每条消息的KV取出val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())//6.计算WordCountvalueDStream.print()//7.开启任务ssc.start()ssc.awaitTermination()}
}

(3)启动SparkKafkaConsumer消费者

在这里插入图片描述

(4)启动kafka生产者
(5)观察IDEA控制台数据打印

bin/kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic august

文章源码

✌✌调优源码剖析高级后期更新✌✌

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/401207.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA快捷键(Ctrl + tab)非常好用 切换最近使用的编辑器选项卡

文章目录 1、为什么要使用 ctrl tab 快捷键&#xff1f;2、使用 ctrl tab 快捷键 1、为什么要使用 ctrl tab 快捷键&#xff1f; 当我们点击 ctrl alt 鼠标左键点击 进入方法的实现时&#xff0c;这个时候我们会在这个实现类中不断的点击&#xff0c;查看源码&#xff0c…

【学习笔记】Matlab和python双语言的学习(最小生成树——Kruskal算法、Prim算法)

文章目录 前言一、最小生成树树的一些概念关键特性最小生成树和最短路径的主要区别常用算法1. Kruskal算法(适合点多边少的图)2. Prim算法(适合边多点少的图) 二、示例三、代码实现----Matlab四、代码实现----python1. Kruskal算法2. Prim算法 总结 前言 通过模型算法&#xf…

【QuikGraph】TSP旅行商问题变体之不返回起点

1、问题分析 目的&#xff1a;在旅行商问题的基础上&#xff0c;无需返回起点。相当于找到一条最短路径&#xff0c;能够遍历所有的顶点。起点和终点都是动态计算出来的&#xff0c;不是提前固定的。 这个问题也称为为计算“最短的哈密尔顿路径”。 2、解决方案 出处&#…

【无标题】mysql读写分离架构+MyCAT实现读写分离

1、读写分离的目的 数据库负载均衡&#xff1a; 当数据库请求增多时&#xff0c;单例数据库不能够满足业务 需求。需要进行数据库实例的扩容。多台数据库同时相 应请求。也就是说需要对数据库的请求&#xff0c;进行负载均衡 但是由于数据库服务特殊原因&#xff0c;数据库…

【算法速刷(7/100)】LeetCode —— 200.岛屿数量

这题是典型的深搜题&#xff0c;只需要额外记录每个格子是否被搜索过&#xff0c;然后挨个进行陆地的深度搜索即可。&#xff08;如果要使用lambda进行递归&#xff0c;需要显式指出变量的模板类型&#xff0c;不能使用auto推导&#xff09; int numIslands(vector<vector&…

MATLAB基于深度学习的车辆检测系统

如今机器视觉领域深度学习算法已经大行其道&#xff0c;也让人工智能的实现不再那么遥不可及&#xff0c;但是在目标检测领域&#xff0c;让计算机超越人类还需让更多的人参与进来继续努力。如今众多的高校&#xff0c;甚至中小学已经将人工智能纳入了学习科目&#xff0c;这确…

【YOLOv5/v7改进系列】替换Neck为Gold-Yolo特征融合网络

一、导言 Gold-YOLO是一种高效的物体检测模型&#xff0c;它通过一种新的机制——Gather-and-Distribute&#xff08;GD&#xff09;机制来增强多尺度特征融合的能力&#xff0c;从而在保证实时性能的同时提高了检测精度。下面是对Gold-YOLO的主要特点和创新点的概述&#xff…

【C++ 面试 - 基础题】每日 3 题(十八)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…

Web开发-CSS篇-上

CSS的发展历史 CSS&#xff08;层叠样式表&#xff09;最初由万维网联盟&#xff08;W3C&#xff09;于1996年发布。CSS1是最早的版本&#xff0c;它为网页设计提供了基本的样式功能&#xff0c;如字体、颜色和间距。随着互联网的发展&#xff0c;CSS也不断演进&#xff1a; C…

【低代码开发】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

谷粒商城实战笔记-175~177-商城业务-检索服务-检索查询接口开发

文章目录 一&#xff0c;175-商城业务-检索服务-检索查询参数模型分析抽取二&#xff0c;176-商城业务-检索服务-检索返回结果模型分析抽取三&#xff0c;177-商城业务-检索服务-检索DSL测试-查询部分四&#xff0c;178-商城业务-检索服务-检索DSL测试-聚合部分问题记录解决方案…

RCE之无参数读取文件总结

RCE漏洞(Remote Code|Command Execute)&#xff1a; 是指由于程序中预留了执行代码或者命令的接口&#xff0c;并且提供了给用户使用的界面&#xff0c;导致被黑客利用&#xff0c; 控制服务器。 代码执行漏洞原理&#xff1a; 传入php代码到执行函数的变量&#xff0c;客户端…

IIC 通信协议详解

目录 一、概述二、I2C 详解1、I2C 总线简介2、I2C 协议相关知识2.1 起始位2.2 停止位2.3 数据传输2.4 应答信号2.5 I2C 设备地址格式2.5 I2C 时序图2.5.1 I2C 写时序2.5.2 I2C 读时序2.5.3 单个/多个字节的写入/读取 3、时钟同步和仲裁3.1 时钟同步3.2 时钟仲裁 一、概述 IIC …

Fal.ai Flux 1-Pro/Viva.ai/哩布哩布AI:AI绘图部分免费工具+原图提示词Prompt

目录 #1 找软件 #2 懂提示词 #3 更难的一步&#xff0c;会英文 我个人认为&#xff0c;想要玩文生图&#xff0c;你要会3个步骤&#xff1a; #1 找软件 主流文生图软件&#xff1a;Midjourney、Stable Diffusion、Dall-E 3 巧了&#xff0c;我用的都是小众、免费的画笔工…

【Linux】守护进程:containerd的使用教程

这里写目录标题 前言一. ctr1.1 ctr CLI1.2 ctr 调试 二、 创建 container2.1 进入 NewContainer2.2 ContainerService().Create 前言 介绍了 kubelet 通过 cri 接口和 containerd 交互的过程&#xff0c;containerd 源码分析&#xff1a;启动注册流程 介绍了 containerd 作为…

赶快收藏!全网最佳Set集合详解:HashSet、TreeSet!

先赞后看&#xff0c;Java进阶马上一大半 海外geeksforgeeks网站画了这么一张Set集合的层次结构图&#xff0c;基本把Set集合涉及的常用类关系给标明了。 大家好&#xff0c;我是南哥。 一个Java学习与进阶的领路人&#xff0c;相信对你通关面试、拿下Offer进入心心念念的公司…

arthas使用

1.安装arthas 我的是windows #打开cmd&#xff0c;执行以下命令 &#xff0c;下载jar curl -O https://arthas.aliyun.com/arthas-boot.jar2.启动本地的idea项目 3.进入到jdk的bin文件夹 jdk的配置在“高级系统设置” 进入jdk的bin目录 4.启动arthas 5.arthas使用 trace 类…

Elasticsearch自动补全功能实践与Java API应用

Elasticsearch是一个强大的搜索引擎&#xff0c;它不仅支持全文搜索&#xff0c;还提供了自动补全功能&#xff0c;可以显著提升用户体验。自动补全功能允许用户在输入查询时实时显示建议项&#xff0c;帮助用户快速找到所需信息。本文将介绍如何使用Elasticsearch的RestHighLe…

探索Java Stream API:高效处理集合的利器

文章目录 一、Stream API简介1.1 什么是Stream&#xff1f;1.2 Stream的特点 二、Stream API的基本操作2.1 创建Stream2.2 中间操作2.3 终端操作 三、Stream API的高级应用3.1 并行Stream3.2 复杂数据处理3.3 Stream与Optional 四、最佳实践例子 1: 筛选和映射例子 2: 排序和收…

什么是流批一体?怎样理解流批一体?

目录 一、流式处理与批量处理概述 1.流式处理 2.批量处理 3.流批一体的定义 二、流批一体的关键特点 三、流批一体的技术实现 四、应用场景 五、实施流批一体的考虑因素 流批一体听起来很简单&#xff0c;但内涵却十分复杂。它包含了计算语义、编程模型、API、调度、执行、shuf…