Kafka/Spark-01消费topic到写出到topic

1 Kafka的工具类

1.1 从kafka消费数据的方法

  1. 消费者代码
  def getKafkaDStream(ssc : StreamingContext , topic: String  , groupId:String  ) ={consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array(topic), consumerConfigs))kafkaDStream}
  1. 注意点
  • consumerConfigs是定义的可变的map的类型的,具体如下
private val consumerConfigs: mutable.Map[String, Object] = mutable.Map[String,Object](// kafka集群位置ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS),// kv反序列化器ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",// groupId// offset提交  自动 手动ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",//自动提交的时间间隔//ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG// offset重置  "latest"  "earliest"ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"// .....)
  • consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG , groupId)是为了不限制groupId特意写的传参

  • 是使用自带的kafka工具类createDirectStream方法去消费kafak 的数据,详细参数解释如下

在`KafkaUtils.createDirectStream`方法中,后续传递的参数的含义如下:1. `ssc`:这是一个`StreamingContext`对象,用于指定Spark Streaming的上下文。
2. `LocationStrategies.PreferConsistent`:这是一个位置策略,用于指定Kafka消费者的位置策略。`PreferConsistent`表示优先选择分区分布均匀的消费者。
3. `ConsumerStrategies.Subscribe[String, String]`:这是一个消费者策略,用于指定Kafka消费者的订阅策略。`Subscribe[String, String]`表示按照指定的泛型主题字符串数组订阅消息,键和值的类型都为`String`。
4. `Array(topic)`:这是一个字符串数组,用于指定要订阅的Kafka主题。
5. `consumerConfigs`:这是一个`java.util.Properties`类型的对象,其中配置了一些Kafka消费者的属性。总之,在`KafkaUtils.createDirectStream`方法中,这些参数组合被用于创建一个Kafka直连流(Direct Stream),该流可以直接从Kafka主题中消费消息,并将其转换为`InputDStream[ConsumerRecord[String, String]]`类型的DStream。

在这里插入图片描述

  • Subscribe传参需要指定泛型,这边指定string,表示指定主题的键和值的类型,即Array(topic), consumerConfigs传参是string

在这里插入图片描述

  • 最后方法返回一个kafkaDStream

1.2 kafka的生产数据的方法

  1. 生产者代码
  • 创建与配置
/*** 生产者对象*/val producer : KafkaProducer[String,String] = createProducer()/*** 创建生产者对象*/def createProducer():KafkaProducer[String,String] = {val producerConfigs: util.HashMap[String, AnyRef] = new util.HashMap[String,AnyRef]//生产者配置类 ProducerConfig//kafka集群位置//producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")//producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils("kafka.bootstrap-servers"))producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,MyPropsUtils(MyConfig.KAFKA_BOOTSTRAP_SERVERS))//kv序列化器producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer")//acksproducerConfigs.put(ProducerConfig.ACKS_CONFIG , "all")//batch.size  16kb//linger.ms   0//retries//幂等配置producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG , "true")val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](producerConfigs)producer}
  • 生产方法
  /*** 生产(按照默认的黏性分区策略)*/def send(topic : String  , msg : String ):Unit = {producer.send(new ProducerRecord[String,String](topic , msg ))}/**或者!* 生产(按照key进行分区)*/def send(topic : String  , key : String ,  msg : String ):Unit = {producer.send(new ProducerRecord[String,String](topic , key ,  msg ))}
  • 关闭生产
/*** 关闭生产者对象*/def close():Unit = {if(producer != null ) producer.close()}/*** 刷写 ,将缓冲区的数据刷写到磁盘**/def flush(): Unit ={producer.flush()}

2 消费数据

2.1 消费到数据

单纯的使用返回的ConsumerRecord不支持序列化,没有实现序列化接口

在这里插入图片描述

因此需要转换成通用的jsonobject对象

//3. 处理数据//3.1 转换数据结构val jsonObjDStream: DStream[JSONObject] = offsetRangesDStream.map(consumerRecord => {//获取ConsumerRecord中的value,value就是日志数据val log: String = consumerRecord.value()//转换成Json对象val jsonObj: JSONObject = JSON.parseObject(log)//返回jsonObj})

2.2 数据分流发送到对应topic

  1. 提取错误数据并发送到对应的topic中
jsonObjDStream.foreachRDD(rdd => {rdd.foreachPartition(jsonObjIter => {for (jsonObj <- jsonObjIter) {//分流过程//分流错误数据val errObj: JSONObject = jsonObj.getJSONObject("err")if(errObj != null){//将错误数据发送到 DWD_ERROR_LOG_TOPICMyKafkaUtils.send(DWD_ERROR_LOG_TOPIC ,  jsonObj.toJSONString )}else{}}}}	
  1. 将公共字段和页面数据发送到DWD_PAGE_DISPLAY_TOPIC
else{// 提取公共字段val commonObj: JSONObject = jsonObj.getJSONObject("common")val ar: String = commonObj.getString("ar")val uid: String = commonObj.getString("uid")val os: String = commonObj.getString("os")val ch: String = commonObj.getString("ch")val isNew: String = commonObj.getString("is_new")val md: String = commonObj.getString("md")val mid: String = commonObj.getString("mid")val vc: String = commonObj.getString("vc")val ba: String = commonObj.getString("ba")//提取时间戳val ts: Long = jsonObj.getLong("ts")// 页面数据val pageObj: JSONObject = jsonObj.getJSONObject("page")if(pageObj != null ){//提取page字段val pageId: String = pageObj.getString("page_id")val pageItem: String = pageObj.getString("item")val pageItemType: String = pageObj.getString("item_type")val duringTime: Long = pageObj.getLong("during_time")val lastPageId: String = pageObj.getString("last_page_id")val sourceType: String = pageObj.getString("source_type")//封装成PageLog,这边还写了bean实体类去接收var pageLog =PageLog(mid,uid,ar,ch,isNew,md,os,vc,ba,pageId,lastPageId,pageItem,pageItemType,duringTime,sourceType,ts)//发送到DWD_PAGE_LOG_TOPICMyKafkaUtils.send(DWD_PAGE_LOG_TOPIC , JSON.toJSONString(pageLog , new SerializeConfig(true)))//scala中bean没有set和get方法,这边是直接操作字段}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/128593.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WebRTC中 setup:actpass、active、passive

1、先看一下整个DTLS的流程 setup:actpass、active、passive就发生在Offer sdp和Anser SDP中 Offer的SDP是setup:actpass,这个是服务方&#xff1a; v0\r o- 1478416022679383738 2 IN IP4 127.0.0.1\r s-\r t0 0\r agroup:BUNDLE 0 1\r aextmap-allow-mixed\r amsid-semanti…

在MySQL客户端使用Tab健进行命令补全

在MySQL客户端中&#xff0c;你可以使用Tab键进行命令补全&#xff0c;这将提高我们的效率&#xff0c;这与Linux命令行中的行为类似。例如&#xff0c;如果你输入SEL然后按Tab键&#xff0c;MySQL客户端会自动补全为SELECT。 然而&#xff0c;需要注意的是&#xff0c;这个功能…

Unity Asset Bundle Browser 工具

Unity Asset Bundle Browser 工具 您可以在 Unity 项目中使用 Asset Bundle Browser 工具能够查看和编辑资源包的配置。 有关更多信息&#xff0c;请参阅 Unity Asset Bundle Browser 文档。 注意&#xff1a;此工具是不受支持的实用程序。查看极大的资源包可能会导致性能下…

谁在为网络安全制造标尺?

“我们想帮助企业往后退一步&#xff0c;去全局的看一下自己的安全能力建设水平如何&#xff0c;以及在当下的阶段最应该做的安全建设是什么&#xff1f; ” 度量&#xff0c;对应的是更清晰的认知。而对企业安全而言&#xff0c;这种认知&#xff0c;也更在成为一把新的标尺…

《AI辞职信一键生成》告别凡俗套路,展现独特个性!

在这个科技日新月异的时代&#xff0c;我们的生活被各种应用软件深深地渗透。其中&#xff0c;讯飞星火AI大模型的应用无疑是一种创新和突破。最近&#xff0c;我有幸体验了一款名为《AI辞职信一键生成》的web应用&#xff0c;它以其独特的功能和出色的用户体验&#xff0c;让我…

算法-88.合并两个有序数组-⭐

给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合并后的数组同样按 非递减顺序 排列。 注意&#xff1a;最终&#xff0c;合并后数组…

javaee springMVC 一个案例

项目结构 pom.xml <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/P…

MongoDB简介以及安装

文章目录 1. MongoDB简介2. NoSQL简介3. MongoDB安装 1. MongoDB简介 MongoDB是一种NoSQL数据库&#xff0c;采用了文档数据库模型。它以BSON&#xff08;Binary JSON&#xff09;格式存储数据&#xff0c;支持动态模式和灵活的查询语言。MongoDB具有以下特点&#xff1a; 文…

学习机器学习需要哪些数学知识?

作为一门以数据及其模型为 研究对象的学科&#xff0c;优化模型、分析模型性能等都需要数学手段的帮助。和其他学科一样&#xff0c;数学 可以帮我们更清晰地描述和理解机器学习算法&#xff0c;也可以从理论上证明算法的有效性&#xff0c;是机器学习中必不可少的一环。 1 向…

Brief. Bioinformatics2021 | sAMP-PFPDeep+:利用三种不同的序列编码和深度神经网络预测短抗菌肽

文章标题&#xff1a;sAMP-PFPDeep: Improving accuracy of short antimicrobial peptides prediction using three different sequence encodings and deep neural networks 代码&#xff1a;https://github.com/WaqarHusain/sAMP-PFPDeep 一、问题 短抗菌肽(sAMPs)&#x…

图解 LeetCode 算法汇总——回溯

本文首发公众号&#xff1a;小码A梦 回溯算法是一种常见的算法&#xff0c;常见用于解决排列组合、排列问题、搜索问题等算法&#xff0c;在一个搜索空间中寻找所有的可能的解。通过向分支不断尝试获取所有的解&#xff0c;然后找到合适的解&#xff0c;找完一个分支后再往回搜…

NLP(4)--BERT

目录 一、自监督学习 二、BERT的两个问题 三、GLUE 四、BERT与Transformer的关系 五、BERT的训练方式 六、BERT的四个例子 1、语句分类&#xff08;情感分析&#xff09; 2、词性标注 3、立场分析 4、问答系统 七、BERT的后续 1、为什么预训练后的微调可以满足多…

北京互联网营销服务商浩希数字科技申请1350万美元纳斯达克IPO上市

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;总部位于北京的互联网营销服务商浩希数字科技&#xff08;Haoxi Health Technology Limited &#xff09;近期已向美国证券交易委员会&#xff08;SEC&#xff09;提交招股书&#xff0c;申请在纳斯…

08-JVM垃圾收集器详解

上一篇&#xff1a;07-垃圾收集算法详解 如果说收集算法是内存回收的方法论&#xff0c;那么垃圾收集器就是内存回收的具体实现。 虽然我们对各个收集器进行比较&#xff0c;但并非为了挑选出一个最好的收集器。因为直到现在为止还没有最好的垃圾收集器出现&#xff0c;更加没…

vue学习之条件渲染

条件渲染 用于控制组件显示创建 demo6.html,内容如下 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title&…

JavaScript-----jQuery

目录 前言&#xff1a; 1. jQuery介绍 2. 工厂函数 - $() jQuery通过选择器获取元素&#xff0c;$("选择器") 过滤选择器&#xff0c;需要结合其他选择器使用。 3.操作元素内容 4. 操作标签属性 5. 操作标签样式 6. 元素的创建,添加,删除 7.数据与对象遍历…

vue学习之事件绑定

事件绑定 创建 demo5.html,内容如下 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</…

jupyter 添加中文选项

文章目录 jupyter 添加中文选项1. 下载中文包2. 选择中文重新加载一下&#xff0c;页面就变成中文了 jupyter 添加中文选项 1. 下载中文包 pip install jupyterlab-language-pack-zh-CN2. 选择中文 重新加载一下&#xff0c;页面就变成中文了 这才是设置中文的正解&#xff…

目标检测笔记(十五): 使用YOLOX完成对图像的目标检测任务(从数据准备到训练测试部署的完整流程)

文章目录 一、目标检测介绍二、YOLOX介绍三、源码获取四、环境搭建4.1 环境检测 五、数据集准备六、模型训练七、模型验证八、模型测试 一、目标检测介绍 目标检测&#xff08;Object Detection&#xff09;是计算机视觉领域的一项重要技术&#xff0c;旨在识别图像或视频中的…

Jmeter进阶使用指南-使用参数化

Apache JMeter是一个广泛使用的开源负载和性能测试工具。在进行性能测试时&#xff0c;我们经常需要模拟不同的用户行为和数据&#xff0c;这时候&#xff0c;参数化就显得尤为重要。此文主要介绍如何在JMeter中使用参数化。 什么是参数化&#xff1f; 参数化是一种将静态值替…