spark stream入门案例:netcat准实时处理wordCount(scala 编程)

目录

案例需求

代码

结果

解析


         案例需求:

        使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

        -- 1. Spark从socket中获取数据:一行一行的获取
        -- 2. Driver程序执行时,streaming处理过程不能结束
        -- 3. 采集器在正常情况下启动后就不应该停止,除非特殊情况
        -- 4. 采集器位于一个executor中,是一个线程,执行时需要一个核,如果设定的总核数为1时,那么在运行时因为没有核数,所以不会有打印结果,所以sparkStreaming使用的核数至少为2个
        -- 5. print()方法,默认是打印10行结果
        -- 6. netcat的指令:
 

      在Windows下:nc -lp 9999在linux下: nc -lk 9999

        代码: 
package cn.olo.streamimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamDemo {def main(args: Array[String]): Unit = {// 连接SparkStreamingval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")/*1.方法:StreamingContext(形参)2.形参:形参1:conf: SparkConf:spark配置对象形参2:batchDuration: Duration:采集时间*/val ssc = new StreamingContext(sparkConf,Seconds(5))// 需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数// 1. 获取netcat工具9999端口的连接,并开始接收数据// 从socket中获取数据:一行一行的获取val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)// 2. 数据处理val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToSumDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_ + _ )// 3. 打印数据wordToSumDS.print()// 4. Driver程序执行时,streaming处理过程不能结束// 采集器在正常情况下启动后就不应该停止,除非特殊情况// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()}}

结果:

解析:

        a、采集周期时间之间,每一个采集周期生成一个RDD,按照时间的顺序依次进行
        b、在每一个采集周期内,会执行wordcount计算,最终得出:统计出每一个采集周期时间的wordcount

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/161214.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

加固数据安全:Java助力保护Excel文件,让数据无懈可击

摘要:本文由葡萄城技术团队于CSDN原创并首发。转载请注明出处:葡萄城官网,葡萄城为开发者提供专业的开发工具、解决方案和服务,赋能开发者。 前言 Excel文件保护是常用的一种功能,文件保护主要有三种: 添…

单目3D自动标注

这里介绍两种 1. 基于SAM的点云标注 Seal:是一个多功能的自监督学习框架,能够通过利用视觉基础模型的现成知识和2D-3D的时空约束分割自动驾驶数据集点云 Scalability:可拓展性强,视觉基础模型蒸馏到点云中,避免2D和…

SaaS人力资源管理系统的Bug

SaaS人力资源管理系统的Bug Bug1【18】 这里我是直接把代码复制过来的&#xff0c;然后就有一个空白 这是因为它的代码有问题&#xff0c;原本的代码如下所示 <el-table-column fixed type"index" label"序号" width"50"></el-table…

Linux性能优化--性能追踪:受CPU限制的应用程序(GIMP)

10.0 概述 本章包含了一个例子&#xff1a;如何用Linux性能工具在受CPU限制的应用程序中寻找并修复性能问题。 阅读本章后&#xff0c;你将能够&#xff1a; 在受CPU限制的应用程序中明确所有的CPU被哪些源代码行使用。用1trace和oprofile弄清楚应用程序调用各种内部与外部函…

KNN-近邻算法 及 模型的选择与调优(facebook签到地点预测)

什么是K-近邻算法&#xff08;K Nearest Neighbors&#xff09; 1、K-近邻算法(KNN) 1.1 定义 如果一个样本在特征空间中的k个最相似(即特征空间中最邻近)的样本中的大多数属于某一个类别&#xff0c;则该样本也属于这个类别。 来源&#xff1a;KNN算法最早是由Cover和Hart提…

【JVM面试】从JDK7 到 JDK8, JVM为啥用元空间替换永久代?

系列文章目录 【JVM系列】第一章 运行时数据区 【面试】第二章 从JDK7 到 JDK8, JVM为啥用元空间替换永久代&#xff1f; 大家好&#xff0c;我是青花。拥有多项发明专利&#xff08;都是关于商品、广告等推荐产品&#xff09;。对广告、Web全栈以及Java生态微服务拥有自己独到…

VSS、VDD、VBAT、VSSA

引言 在学习设计TM32时&#xff0c;发现芯片除了GPIO引脚外还会引出许多引脚&#xff0c;以STM32F407ZGT6为例除了GPIO引脚还会有以下引脚 如VSS、VDD、VBAT、VSSA、NRST、VREF、VDDA、VCAP_1、VCAP_2、PDR_ON这些引脚。他们有何作用&#xff0c;电路设计中应如何连接&#x…

迅为RK3588开发板使用RKNN-Toolkit-lite2运行测试程序

1 首先也需要部署运行环境&#xff0c;将库文件放入 RK3588 开发板上&#xff0c;我们将网盘资料“iTOP-3588 开发 板 \02_ 【 iTOP-RK3588 开 发 板 】 开 发 资 料 \12_NPU 使 用 配 套 资 料 \05_Linux_librknn_api\librknn_api\aarch64”路径下的文件通过U盘拷贝到开发板的…

LLM-RAG-WEB 大模型+文件+可视化界面

注意&#xff1a;这里只是简单实现了功能和界面&#xff0c;文件对话也暂时只支持一个文件&#xff0c;如果跳到模型对话再切换回文件对话会文件会删除重置会话&#xff0c;但模型对话切换回来时保留之前会话的。 1、代码&#xff08;使用步骤说明在链接里&#xff09; 参考下…

SQLite4Unity3d安卓 在手机上创建sqlite失败解决

总结 要在Unity上运行一次出现库&#xff0c;再打包进APK内 问题 使用示例代码的创建库 var dbPath string.Format("Assets/StreamingAssets/{0}", DatabaseName); #else// check if file exists in Application.persistentDataPathvar filepath string.Format…

Wireshark新手小白基础使用方法

一、针对IP抓取 1、过滤格式&#xff1a; &#xff08;1&#xff09;、ip.src eq x.x.x.x &#xff08;2&#xff09;、ip.dst eq x.x.x.x &#xff08;3&#xff09;ip.src eq x.x.x.x or ip.dst eq x.x.x.x 二、针对端口过滤 1、过滤格式&#xff1a; &#xff08;1&a…

百度智能云千帆大模型平台 2.0 产品技术解析

本文整理自 2023 年 9 月 5 日百度云智大会 - 智能计算&大模型技术分论坛&#xff0c;百度智能云 AI &大数据平台总经理忻舟的主题演讲《百度智能云千帆大模型平台 2.0 产品技术解析》。 这是关于技术主题的论坛&#xff0c;我首先问大家三个开发者的小问题。 第一个问…

如何使用RockPlus MES系统帮助SMT行业实现降本增效

SMT&#xff08;Surface Mount Technology&#xff09;是现代电子行业中主要的组装技术&#xff0c;广泛应用于电子产品的生产。SMT工艺涵盖了锡膏印刷、元器件贴装和回流焊接。经过这些关键工序&#xff0c;元器件被精确固定在电路板上&#xff0c;完成一个电子产品组装。 SM…

SpringCloud: feign整合sentinel实现降级

一、加依赖&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache…

excel+requests管理测试用例接口自动化框架

背景&#xff1a; 某项目有多个接口&#xff0c;之前使用的unittest框架来管理测试用例&#xff0c;将每个接口的用例封装成一个py文件&#xff0c;接口有数据或者字段变动后&#xff0c;需要去每个py文件中找出变动的接口测试用例&#xff0c;维护起来不方便&#xff0c;为了…

Spring AOP归纳与总结

前言 AOP的核心思想是面向切面编程。AOP规范定义了多种概念&#xff0c;常用的aop框架有spring aop和AspectJ&#xff0c;两者功能和性能差异较大&#xff0c;现在默认的AOP框架是AspectJ&#xff0c;下面逐渐归纳其相关概念、功能及实现原理。 1. 概念 1. 切面&#xff1a;…

从零开始学习 Java:简单易懂的入门指南之线程池(三十六)

线程池 1.1 线程状态介绍1.2 线程池-基本原理1.3 线程池-Executors默认线程池1.4 线程池-Executors创建指定上限的线程池1.5 线程池-ThreadPoolExecutor1.6 线程池-参数详解1.7 线程池-非默认任务拒绝策略 1.1 线程状态介绍 当线程被创建并启动以后&#xff0c;它既不是一启动…

linux文件权限与目录配置

用户与用户组 linux一般将文件可读写的身份分为三个类别&#xff1a;拥有者&#xff08;owner&#xff09;、所属群组&#xff08;group&#xff09;、其他人&#xff08;other&#xff09; 三种身份都有读、写、执行等权限 文件拥有者 linux是个多人多任务的系统&#xff0c…

论文解析-moETM

论文解析-moETM 参考亮点动机发展现状现存问题 功能方法Encoder改进Decoder改进 评价指标生物保守性批次效应移除 实验设置结果多组学数据整合cell-topic mixture可解释性组学翻译性能评估RNA转录本、表面蛋白、染色质可及域调控关系研究1. 验证同一主题下&#xff0c;top gene…

什么是NetApp的DQP和如何安装DQP?

首先看看什么是DQP&#xff0c;DQPDisk Qualification Package&#xff0c;文字翻译就是磁盘验证包。按照NetApp的最佳实践&#xff0c;要定期升级DQP包&#xff0c;保证对最新磁盘和磁盘扩展柜的兼容。 本文主要介绍7-mode下如何升级DQP&#xff0c;至于cluster mode另外文章…