Java技术栈 —— Spark入门(二)之实时WordCount

Java技术栈 —— Spark入门(二)

  • 一、kafka
    • 1.1 创建topic
    • 1.2 准备input与查看output
  • 二、spark
    • 2.1 spark下的程序文件
    • 2.2 用spark-submit提交作业

参考文章:

参考文章或视频链接
[1] 《Kafka + Spark Stream实时WordCount》

实验环境:
假设你的用户为root,以下软件安装路径为/opt

软件版本
spark: 3.5.2 (scala 2.12)
kafka: 3.8.0 (scala 2.13)

实验结构图

在这里插入图片描述

一、kafka

1.1 创建topic

# 创建input
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test.wordcount.input --partitions 1 --replication-factor 1
# 创建output
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test.wordcount.output --partitions 1 --replication-factor 1

1.2 准备input与查看output

# 打开两个terminal终端
# 准备键盘输入作为prodcuer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test.wordcount.input
# 在屏幕上查看输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test.wordcount.output

二、spark

2.1 spark下的程序文件

# coding=utf-8
# /opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka-wordcount.py
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as FbootstrapServers = "localhost:9092"spark = SparkSession\.builder\.appName("StructuredKafkaWordCount")\.getOrCreate()# 基于来自kafka的数据流,创建dataframe
lines = spark\.readStream\.format("kafka")\.option("kafka.bootstrap.servers", bootstrapServers)\.option("subscribe", "test.wordcount.input")\.option("failOnDataLoss", False)\.option("group.id", "wordcount-group3")\.load()\.selectExpr("CAST(value AS STRING)")# 将单行数据拆分,转成多行数据
words = lines.select(explode(split(lines.value, ' ')).alias('word')
)# 对单词进行分组,并计算总数
wordCounts = words.groupBy('word').count()# 将两列数据合并成单列数据
wordCounts = wordCounts.select(F.concat(F.col("word"), F.lit("|"), F.col("count").cast("string")).alias("value"))# 测试时,可以不将结果写入kafka,直接输出到控制台
# query = wordCounts \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .start()# 将结果输出到 test.wordcount.output
query = wordCounts \.writeStream \.format('kafka') \.outputMode('update') \.option("kafka.bootstrap.servers", bootstrapServers) \.option('checkpointLocation', '/spark/job-checkpoint') \.option("topic", "test.wordcount.output") \.start()query.awaitTermination()

2.2 用spark-submit提交作业

# 提交Spark作业,这个过程需要保证网络畅通,会将一些依赖下载到/root/.ivy2/jars目录下
$SPARK_HOME/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,\
org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka-wordcount.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/416385.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【AQS源码】深入理解AQS的工作原理

【AQS源码】深入理解AQS的工作原理-CSDN博客

从零开始掌握容器技术:Docker的奇妙世界

容器技术在当今的云计算和软件开发领域中扮演着越来越重要的角色。如果你是一名计算机专业的学生或从事IT行业的从业者,可能已经听说过Docker这个词。它在软件开发、部署、运维等环节中大放异彩,但对于刚接触这个概念的朋友来说,可能还是有些…

JMeter在Mac下的安装使用

前言 开发过程中需要对系统进行性能测试,可以选用jemter对接口进行压测,jemter优点如下: 开源许可证:Jmeter完全免费,允许开发者使用源代码进行开发 友好的 GUI:Jmeter 非常易于使用,不需要花…

flume 使用 exec 采集容器日志,转储磁盘

flume 使用 exec 采集容器日志,转储磁盘 在该场景下,docker 服务为superset,flume 的sources 选择 exec , sinks选择 file roll 。 任务配置 具体配置文件如下: #simple.conf: A single-node Flume configuration#…

推荐4个一键生成 PPT的AI工具,让你畅享智能办公!

对于职场人士来说,ai PPT 工具已经成为了高效办公的一大得力助手 。它可以让你从繁琐的 PPT 制作中解脱出来,把更多的时间放在其他的工作准备上面。并且它们有极大的设计能力,会让我们的PPT变的设计感十足,如果大家正在为PPT制作烦…

js逆向——RSA实战案例讲解

受害者网站:http://www.15yunmall.com/pc/login/index 检查超时,这个我们不管他 直接分析参数,有2处加密位置,分别为password和csrftoken 只要是能够跟栈的,一律先在send的位置下断 很快就跟栈找到加密数据的位置 R…

《JavaEE进阶》----4.<SpringMVC①简介、基本操作(各种postman请求)>

本篇博客讲解 MVC思想、及Spring MVC(是对MVC思想的一种实现)。 Spring MVC的基本操作、学习了六个注解 RestController注解 RequestMappering注解 RequestParam注解 RequestBody注解 PathVariable注解 RequestPart注解 MVC View(视图) 指在应⽤程序中…

我用 GPT 学占星

最近对占星赶兴趣,但是看到星盘中好多名词,不懂是什么意思?所以直接问 gpt , 发现回答的真的很棒🎉 ! 假如我想知道各个状态的具体是根据什么数据来显示的? 分分钟解决了我的问题; 我…

docker Desktop报错 error pulling image configuration 处理

问题描述 在 docker 拉数据 出现以下错误 error pulling image configurarion: 这个问题 主要是 可能应该某些原因不能网络无法连上镜像 原因分析: 1。 2024年 5月以后 国内很多IP都 。。。懂的都懂,很多 VPN 也是。。。 懂的都懂&#x…

7种常见排序

1 直接插入排序 把待排序的记录按其关键码值的大小逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为 止&#xff0c;得到一个新的有序序列 。 void InsertSort(int* a, int n) {for (int i 0; i < n - 1; i){//划分区间【0&#xff0c;end】int en…

Ubuntu 24.04 安装 英特尔工具包 Intel® Toolkits

目录 1.采用用户界面 GUI 安装英特尔基本工具包 Intel oneAPI Base Toolkit 1.1 下载离线英特尔基本工具包 1.2 安装英特尔基本工具包 1.3 英特尔基本工具包 Intel oneAPI Base Toolkit 环境设置 2.安装英特尔高性能计算工具包 Intel HPC Toolkit 2.1 下载离线英特尔高性…

模型从 HuggingFace 转存到 ModelScope

由于 HuggingFace 网络访问比较慢&#xff0c;国内通常会使用魔搭下载模型&#xff0c;如果魔搭上还没有&#xff0c;需要从 HuggingFace 准存一下&#xff0c;本文将通过 Colab AliyunPan 的方式下载模型并进行转存。 登录Colab 并运行一下命令 安装依赖包&#xff0c;Hugg…

最新项目管理软件排行榜,90%大厂项目经理都在用!

本文是主流的热门项目管理软件排行榜&#xff0c;助力企业选型&#xff01; 项目管理软件排行榜就如同企业管理的指南针&#xff0c;能为企业在众多项目管理工具中找到最适合的那一款。 对于企业来说&#xff0c;如果没有好用的项目管理软件&#xff0c;就像航海者失去了罗盘&…

Python 数据分析笔记— Numpy 基本操作(上)

文章目录 学习内容&#xff1a;一、什么是数组、矩阵二、创建与访问数组三、矩阵基本操作 学习内容&#xff1a; 一、什么是数组、矩阵 数组&#xff08;Array&#xff09;&#xff1a;是有序的元素序列&#xff0c;可以是一维、二维、多维。 array1 [1,2,3] 或[a, b, c, d…

智能工厂监控升级:Sovit2D大屏展示和ARM计算机的完美搭档

在当今科技飞速发展的时代&#xff0c;智能工厂和环境监测领域对于高效、精准的监控系统的需求日益增长。Sovit2D 组态软件与 ARM 工业计算机的结合&#xff0c;为这些领域带来了全新的解决方案。 走进智能工厂的监控室&#xff0c;一台台 ARM 工业计算机正稳定地运行着 Sovit2…

Echarts可视化

echarts是一个基于javascripts的开源可视化图表库 画图步骤&#xff1a; 1.引入echarts.js文件 <script src" https://cdn.jsdelivr.net/npm/echarts5.5.1/dist/echarts.min.js"></script> 也可将文件下载到本地通过src引入。 2. 准备一个呈现图表的…

828华为云征文|华为云Flexus X实例docker部署harbor镜像仓库

828华为云征文&#xff5c;华为云Flexus X实例docker部署harbor镜像仓库 华为云最近正在举办828 B2B企业节&#xff0c;Flexus X实例的促销力度非常大&#xff0c;特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务的需求&#xff0c;一定不要错…

Django+Vue二手交易平台的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 需要的环境3.2 Django接口层3.3 实体类3.4 config.ini3.5 启动类3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质创作者&…

Having trouble using OpenAI API

题意&#xff1a;"使用OpenAI API遇到困难" 问题背景&#xff1a; I am having trouble with this code. I want to implement AI using OpenAI API in my React.js project but I cannot seem to get what the issue is. I ask it a question in the search bar in…

string详解

Golang详解string 文章目录 Golang详解stringGolang中为什么string是只读的&#xff1f;stirng和[]byte的转化原理[]byte转string一定需要内存拷贝吗&#xff1f;字符串拼接性能测试 Golang中为什么string是只读的&#xff1f; 在Go语言中&#xff0c;string其实就是一个结构体…