SparkStreaming在实时处理的两个场景示例

简介

Spark Streaming是Apache Spark生态系统中的一个组件,用于实时流式数据处理。它提供了类似于Spark的API,使开发者可以使用相似的编程模型来处理实时数据流。

Spark Streaming的工作原理是将连续的数据流划分成小的批次,并将每个批次作为RDD(弹性分布式数据集)来处理。这样,开发者可以使用Spark的各种高级功能,如map、reduce、join等,来进行实时数据处理。Spark Streaming还提供了内置的窗口操作、状态管理、容错处理等功能,使得开发者能够轻松处理实时数据的复杂逻辑。

Spark Streaming支持多种数据源,包括Kafka、Flume、HDFS、S3等,因此可以轻松地集成到各种数据管道中。它还能够与Spark的批处理和SQL引擎进行无缝集成,从而实现流式处理与批处理的混合使用。
在这里插入图片描述

本文以 TCP、kafka场景讲解spark streaming的使用

消息队列下的信息铺抓

类似消息队列的有redis、kafka等核心组件。
本文以kafka为例,向kafka中实时抓取数据,

pom.xml中添加以下依赖

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.2.0</version></dependency><!-- Spark SQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.0</version></dependency><!-- Kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Spark Streaming Kafka Connector --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.2.0</version></dependency><!-- PostgreSQL JDBC --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.24</version></dependency>
</dependencies>

创建项目编写以下代码实现功能

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka").setMaster("local[*]").setExecutorEnv("setLogLevel", "ERROR");//设置日志等级为ERROR,避免日志增长导致的磁盘膨胀// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("auto.offset.reset", "earliest");// auto.offset.reset可指定参数有// latest:从分区的最新偏移量开始读取消息。// earliest:从分区的最早偏移量开始读取消息。// none:如果没有有效的偏移量,则抛出异常。kafkaParams.put("enable.auto.commit", true);  //采用自动提交offset 的模式kafkaParams.put("auto.commit.interval.ms",2000);//每隔离两秒提交一次commited-offsetkafkaParams.put("group.id", "spark_kafka"); //消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  //订阅kafka);//定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  //将偏移量和value聚合}), schema);// 写入到 PostgreSQLdf.write()//选择写入数据库的模式.mode(SaveMode.Append)//采用追加的写入模式//协议.format("jdbc")//option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL//确定表名.option("dbtable", "public.spark_kafka")//指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在执行代码前,向创建名为spark_kafka的topic

kafka-topics.sh --create --topic spark_kafka --bootstrap-server 10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

向spark_kafka 主题进行随机推数

kafka-producer-perf-test.sh --topic spark_kafka --thrghput 10 --num-records 10000 --record-size 100000 --producer-props bootstrap.servers=10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092

运行过程中消费的offset会一直被提交到每一个分区
在这里插入图片描述

此时在数据库中查看,数据已经实时落地到库中
在这里插入图片描述

TCP

TCP环境下,实时监控日志的输出,可用于监控设备状态、环境变化等。当监测到异常情况时,可以实时发出警报。

package org.example;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;public class SparkStreamingKafka {public static void main(String[] args) throws InterruptedException {// 创建 Spark 配置SparkConf sparkConf = new SparkConf().setAppName("spark_kafka") // 设置应用程序名称.setMaster("local[*]") // 设置 Spark master 为本地模式,[*]表示使用所有可用核心// 设置日志等级为ERROR,避免日志增长导致的磁盘膨胀.setExecutorEnv("setLogLevel", "ERROR");// 创建 Spark Streaming 上下文JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(2000)); // 间隔两秒扑捉一次// 创建 Spark SQL 会话SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();// 设置 Kafka 相关参数Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "10.0.0.105:9092,10.0.0.106:9092,10.0.0.107:9092"); // Kafka 服务器地址kafkaParams.put("key.deserializer", StringDeserializer.class); // key 反序列化器类kafkaParams.put("value.deserializer", StringDeserializer.class); // value 反序列化器类kafkaParams.put("auto.offset.reset", "earliest"); // 从最早的偏移量开始消费消息kafkaParams.put("enable.auto.commit", true);  // 采用自动提交 offset 的模式kafkaParams.put("auto.commit.interval.ms", 2000); // 每隔两秒提交一次 committed-offsetkafkaParams.put("group.id", "spark_kafka"); // 消费组名称// 创建 Kafka streamCollection<String> topics = Collections.singletonList("spark_kafka"); // Kafka 主题名称JavaDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)  // 订阅 Kafka);// 定义数据结构StructType schema = new StructType().add("key", DataTypes.LongType).add("value", DataTypes.StringType);kafkaStream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {// 转换为 DataFrameDataset<Row> df = sparkSession.createDataFrame(rdd.map(record -> {return RowFactory.create(record.offset(), record.value());  // 将偏移量和 value 聚合}), schema);// 写入到 PostgreSQLdf.write()// 选择写入数据库的模式.mode(SaveMode.Append) // 采用追加的写入模式// 协议.format("jdbc")// option 参数.option("url", "jdbc:postgresql://localhost:5432/postgres") // PostgreSQL 连接 URL// 确定表名.option("dbtable", "public.spark_kafka") // 指定表名.option("user", "postgres") // PostgreSQL 用户名.option("password", "postgres") // PostgreSQL 密码.save();});// 启动 Spark StreamingstreamingContext.start();// 等待 Spark Streaming 应用程序终止streamingContext.awaitTermination();}
}

在10.0.0.108 打开9999端口键入数值 ,使其被spark接收到并进行运算

nc -lk 9999

开启端口可以键入数值 此时会在IDEA的控制台显示其计算值
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/267739.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(C语言)函数详解上

&#xff08;C语言&#xff09;函数详解上 目录&#xff1a; 1. 函数的概念 2. 库函数 2.1 标准库和头文件 2.2 库函数的使用方法 2.2.1 sqrt 功能 2.2.2 头文件包含 2.2.3 实践 2.2.4 库函数文档的一般格式 3. 自定义函数 3.1 函数的语法形式 3.2 函数的举例 4. 形参和实参 4.…

Redis 命令全解析之 List类型

文章目录 命令RedisTemplate API使用场景 Redis 的 List 是一种有序、可重复、可变动的数据结构&#xff0c;它基于双向链表实现。在Redis中&#xff0c;List可以存储多个相同或不同类型的元素&#xff0c;每个元素在List中都有一个对应的索引位置。这使得List可以用来实现队列…

【计算机网络_应用层】协议定制序列化反序列化

文章目录 1. TCP协议的通信流程2. 应用层协议定制3. 通过“网络计算器”的实现来实现应用层协议定制和序列化3.1 protocol3.2 序列化和反序列化3.2.1 手写序列化和反序列化3.2.2 使用Json库 3.3 数据包读取3.4 服务端设计3.5 最后的源代码和运行结果 1. TCP协议的通信流程 在之…

oppo手机备忘录记录怎么转移到华为手机?

oppo手机备忘录记录怎么转移到华为手机?使用oppo手机已经有三四年了&#xff0c;因为平时习惯&#xff0c;在手机系统的备忘录中记录了很多重要的笔记&#xff0c;比如工作会议的要点、读书笔记、购物清单、朋友的生日提醒等。这些记录对我来说非常重要&#xff0c;我可以通过…

ElasticSearch搜索引擎使用指南

一、ES数据基础类型 1、数据类型 字符串 主要包括: text和keyword两种类型&#xff0c;keyword代表精确值不会参与分词&#xff0c;text类型的字符串会参与分词处理 数值 包括: long, integer, short, byte, double, float 布尔值 boolean 时间 date 数组 数组类型不…

独立游戏《星尘异变》UE5 C++程序开发日志1——项目与代码管理

写在前面&#xff1a;本日志系列将会向大家介绍在《星尘异变》这款模拟经营游戏&#xff0c;在开发时用到的与C相关的泛用代码与算法&#xff0c;主要记录UE5C与原生C的用法区别&#xff0c;以及遇到的问题和解决办法&#xff0c;因为这是我本人从ACM退役以后第一个从头开始的项…

【数据结构】知识点一:线性表之顺序表

内容导航 一、什么是线性表&#xff1f;二、什么是顺序表&#xff1f;1、顺序表的概念2、顺序表的结构a. 静态顺序表&#xff1a;使用定长数组存储元素。b. 动态顺序表&#xff1a;使用动态开辟的数组存储。 三、顺序表的接口实现精讲1.接口一&#xff1a;打印数据2.接口二&…

持安科技亮相张江高科895创业营,总评分第三名荣获「最具创新性企业」!

近日&#xff0c;张江高科895创业营&#xff08;第十三季&#xff09;信息安全专场Demo day&结营仪式在上海集成电路设计产业园圆满落幕。本季创业营通过多种渠道在海内外甄选优秀创业项目&#xff0c;一共择优录取了29家入营&#xff0c;最终甄选出9家代表参加Demo day路演…

Django后端开发——中间件

文章目录 参考资料中间件注册中间件settings.pymiddleware/mymiddleware.pymysite3/views.pymysite3/urls.py 练习 参考资料 B站网课&#xff1a;点击蓝色字体跳转 或复制链接至浏览器&#xff1a;https://www.bilibili.com/video/BV1vK4y1o7jH?p39&vd_source597e21cf34f…

数据可视化原理-腾讯-热力图

在做数据分析类的产品功能设计时&#xff0c;经常用到可视化方式&#xff0c;挖掘数据价值&#xff0c;表达数据的内在规律与特征展示给客户。 可是作为一个产品经理&#xff0c;&#xff08;1&#xff09;如果不能够掌握各类可视化图形的含义&#xff0c;就不知道哪类数据该用…

Mybatis plus扩展功能-Db静态工具

目录 1 前言 2 使用方法 2.1 Db静态工具拥有的部分方法 2.2 举例 1 前言 在我们的服务层中&#xff0c;有时为了实现一个方法需要引入其它的Mapper层方法&#xff0c;但是&#xff0c;这样可能出现循环依赖。虽然Spring已经给我们解决了简单的循环依赖问题&#xff0c;但是…

深入剖析k8s-Pod篇

为什么需要Pod&#xff1f; 进程是以进程组的方式组织在一起。受限制容器的“单进程模型”&#xff0c; 成组调用没有被妥善处理&#xff08;资源调用有限&#xff09;&#xff0c;使用资源囤积则导致复杂度上升。 在k8s项目中&#xff0c;Pod的实现需要使用一个中间容器——…

web组态(BY组态)接入流程

技术文档 官网网站&#xff1a;www.hcy-soft.com 体验地址&#xff1a; www.byzt.net:60/sm 一、数据流向图及嵌入原理 数据流向 嵌入原理 二、编辑器调用业务流程图 三、集成前需要了解的 1、后台Websocket端往前台监控画面端传输数据规则 后台websocket向客户端监控画面…

基于CNN-LSTM-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络&#xff08;CNN&#xff09;在时间序列中的应用 4.2 长短时记忆网络&#xff08;LSTM&#xff09;处理序列依赖关系 4.3 注意力机制&#xff08;Attention&#xff09; 5…

python自动化之项目架构搭建与思路讲解(第二天)

1.自动化测试的概念 自动化测试是指使用自动化工具和脚本来执行测试任务&#xff0c;以验证软件或系统的正确性和稳定性。它可以提高测试的效率和准确性&#xff0c;并节约时间和成本。 2.自动化脚本编写的思路 xmind文档如有需要&#xff0c;可在资源里自行下载 3.项目代码…

MySQL NDB Cluster 分布式架构搭建 自定义启动与重启Shell脚本

此次NDB Cluster使用三台虚拟机进行搭建&#xff0c;一台作为管理节点&#xff1b;而对于另外两台服务器&#xff0c;每一台都充当着数据节点和SQL节点的角色。注意不是MGR主从复制架构&#xff0c;而是分布式MySQL架构。 创建 /var/lib/mysql-cluster/config.ini Cluster全局…

(面试题)数据结构:链表相交

问题&#xff1a;有两个链表&#xff0c;如何判断是否相交&#xff0c;若相交&#xff0c;找出相交的起始节点 一、介绍 链表相交&#xff1a; 若两个链表相交&#xff0c;则两个链表有共同的节点&#xff0c;那从这个节点之后&#xff0c;后面的节点都会重叠&#xff0c;知道…

灯塔:CSS笔记

CSS&#xff1a;层叠样式表 所谓层叠 即叠加的意思&#xff0c;表示样式可以一层一层的层叠覆盖 css写在style标签中&#xff0c;style标签一般写在head标签里面&#xff0c;title标签下面 <!DOCTYPE html> <html lang"en"> <head><meta cha…

Vue开发实例(一)Vue环境搭建第一个项目

Vue环境搭建&第一个项目 一、环境搭建二、安装Vue脚手架三、创建Vue项目 一、环境搭建 下载方式从官网下载&#xff1a;http://nodejs.cn/download/ 建议下载v12.16.0版本以上的&#xff0c;因为版本低无法创建Vue的脚手架 检验是否安装成功 配置环境变量 新增NODE_HOME&…

chrome选项页面options page配置

options 页面用以定制Chrome浏览器扩展程序的运行参数。 通过Chrome 浏览器的“工具 ->更多工具->扩展程序”&#xff0c;打开chrome://extensions页面&#xff0c;可以看到有的Google Chrome扩展程序有“选项Options”链接&#xff0c;如下图所示。单击“选项Options”…