Delta lake with Java--使用stream同步数据

今天继续学习Delta lake Up and Running 的第8章,处理流数据,要实现的效果就是在一个delta表(名为:YellowTaxiStreamSource)插入一条数据,然后通过流的方式能同步到另外一个delta表 (名为:YellowTaxiStreamTarget)。接着在YellowTaxiStreamSource更新数据YellowTaxiStreamTarget也能更新。至于删除也尝试过了,发现删除是没有办法同步的。

一、先上代码,今天的代码分3份

第1份:用来启动流

import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;public class DeltaLakeStream {public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();String targetPath="D:\\bigdata\\detla-lake-with-java\\YellowTaxiStreamTarget";spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");//定义源数据表spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxiSource(" +"RideID INT," +"PickupTime TIMESTAMP," +"CabNumber STRING)" +"USING DELTA LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamSource'");//定义目标数据表spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxiTarget(" +"RideID INT," +"PickupTime TIMESTAMP," +"CabNumber STRING)" +"USING DELTA LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamTarget'");//通过流的方式读取元数据表,记得要option("ignoreChanges", "true")否则报错var stream_df=spark.readStream().option("ignoreChanges", "true").format("delta").load("file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamSource");//打开目标表,用于后面同步数据var deltaTable = DeltaTable.forPath(spark, targetPath);//   var streamQuery=stream_df.writeStream().format("delta").option("checkpointLocation", targetPath+"\\_checkpoint").start(targetPath);//定义同步流,如果目标表的记录与更新记录的RideID相等则更新,没有找到则插入新记录var streamQuery=stream_df.writeStream().format("delta").foreachBatch((batchDf,batchId)->{deltaTable.as("t").merge(batchDf.as("s"),"t.RideID==s.RideID").whenMatched().updateAll().whenNotMatched().insertAll().execute();}).outputMode("Update").start(targetPath);try {System.out.println("启动stream监听");streamQuery.awaitTermination(); //启动流} catch (StreamingQueryException e) {throw new RuntimeException(e);}}
}

第2份:用来操作源数据表

import org.apache.spark.sql.SparkSession;public class DeltaLakeStreamSource {public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxiSource(" +"RideID INT," +"PickupTime TIMESTAMP," +"CabNumber STRING)" +"USING DELTA LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamSource'");//验证插入spark.sql("INSERT INTO taxidb.YellowTaxiSource (RideID,PickupTime,CabNumber) values (1,'2013-10-13 10:13:15','11-96')").show(false);//验证更新  //spark.sql("UPDATE taxidb.YellowTaxiSource SET CabNumber='199-99' WHERE RideID=1").show(false);//验证删除,不过无效  //spark.sql("DELETE FROM taxidb.YellowTaxiSource WHERE RideID=1").show(false);spark.sql("SELECT RideID,PickupTime,CabNumber FROM taxidb.YellowTaxiSource").show(false);spark.close();}
}

第3份:用来验证目标数据表的同步结果

import org.apache.spark.sql.SparkSession;public class DeltaLakeStreamTarget {public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();String targetPath="D:\\bigdata\\detla-lake-with-java\\YellowTaxiStreamTarget";spark.read().format("delta").load(targetPath).show();}
}

二、运行验证

1、先运行DeltaLakeStream,具体运行结果如下图:

2、验证插入数据同步

运行DeltaLakeStreamSource,插入一条RideID=1的数据,具体运行结果如下图:

接着运行 DeltaLakeStreamTarget,看一下数据是否已经通过流的方式同步到目标表,具体运行结果如下图:

3、验证更新数据同步

将DeltaLakeStreamSource的插入数据代码注释掉,同时将更新代码打开,然后运行,将RideID=1的记录的CabNumber值得从11-96修改成199-99,具体运行结果如下图:

接着运行 DeltaLakeStreamTarget,看一下数据是否已经通过流的方式同步到目标表,具体运行结果如下图:

至于删除也尝试过,没有成功,不知道是不是不支持,还望高手指教。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/319616.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode题练习与总结:分隔链表--86

一、题目描述 给你一个链表的头节点 head 和一个特定值 x ,请你对链表进行分隔,使得所有 小于 x 的节点都出现在 大于或等于 x 的节点之前。 你应当 保留 两个分区中每个节点的初始相对位置。 示例 1: 输入:head [1,4,3,2,5,2]…

神经网络的优化器

神经网络的优化器是用于训练神经网络的一类算法,它们的核心目的是通过改变神经网络的权值参数来最小化或最大化一个损失函数。优化器对损失函数的搜索过程对于神经网络性能至关重要。 作用: 参数更新:优化器通过计算损失函数相对于权重参数的…

ngrinder项目-本地调试遇到的坑

前提-maven mirrors配置 <mirrors><!--阿里公有仓库--><mirror><id>nexus-aliyun</id><mirrorOf>central</mirrorOf><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</ur…

从零开始学AI绘画,万字Stable Diffusion终极教程(二)

【第2期】关键词 欢迎来到SD的终极教程&#xff0c;这是我们的第二节课 这套课程分为六节课&#xff0c;会系统性的介绍sd的全部功能&#xff0c;让你打下坚实牢靠的基础 1.SD入门 2.关键词 3.Lora模型 4.图生图 5.controlnet 6.知识补充 在第一节课里面&#xff0c;我们…

(六)SQL系列练习题(下)#CDA学习打卡

目录 三. 查询信息 16&#xff09;检索"1"课程分数小于60&#xff0c;按分数降序排列的学生信息​ 17&#xff09;*按平均成绩从高到低显示所有学生的所有课程的成绩以及平均成绩 18&#xff09;*查询各科成绩最高分、最低分和平均分 19&#xff09;*按各科成绩…

总分420+专业140+哈工大哈尔滨工业大学803信号与系统和数字逻辑电路考研电子信息与通信工程,真题,大纲,参考书。

考研复习一路走来&#xff0c;成绩还是令人满意&#xff0c;专业803信号和数电140&#xff0c;总分420&#xff0c;顺利上岸&#xff0c;总结一下自己这一年复习经历&#xff0c;希望大家可以所有参考&#xff0c;这一年复习跌跌拌拌&#xff0c;有时面对压力也会焦虑&#xff…

【iOS】KVC

文章目录 前言一、KVC常用方法二、key与keypath区别key用法keypath用法 三、批量存值操作四、字典与模型相互转化五、KVC底层原理KVC设值底层原理KVC取值底层原理 前言 KVC的全称是Key-Value Coding&#xff0c;翻译成中文叫做键值编码 KVC提供了一种间接访问属性方法或成员变…

从零开始学AI绘画,万字Stable Diffusion终极教程(四)

【第4期】图生图 欢迎来到SD的终极教程&#xff0c;这是我们的第四节课 这套课程分为六节课&#xff0c;会系统性的介绍sd的全部功能&#xff0c;让你打下坚实牢靠的基础 1.SD入门 2.关键词 3.Lora模型 4.图生图 5.controlnet 6.知识补充 在前面的课程中&#xff0c;我…

杭电acm2018 母牛的故事 Java解法 经典递归

标准递归题 先模拟 接着找递归出口 再找递归通式 想想看 今天的母牛等于前一天的母牛数加上今天出生的母牛 而三天前的母牛所有母牛都能生一头 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner scnew Scanner(System.in);l…

【计算机网络】计算机网络的定义和分类

一.定义 计算机网络并没有一个精确和统一的定义&#xff0c;在计算机网络发展的不同阶段&#xff0c;人们对计算机网络给出了不同的定义&#xff0c;这些定义反映了当时计算机网络技术的发展水平。 例如计算机网络早期的一个最简单定义&#xff1a;计算机网络是一些互连的、自…

云手机对出海企业有什么帮助?

近些年&#xff0c;越来越多的企业开始向海外拓展&#xff0c;意图发掘更广阔的市场。在这过程中&#xff0c;云手机作为一个新型工具为很多企业提供了助力&#xff0c;尤其在解决海外市场拓展过程中的诸多挑战方面发挥着作用。 首先&#xff0c;云手机的出现解决了企业在海外拓…

线阵相机和面阵相机简介

线阵相机 线阵相机&#xff0c;顾名思义就是所探测的物体要在一个很长的界面上。线阵相机的传感器只有一行感光像素&#xff0c;所以线阵相机一般具有非常高的扫描频率和分辨率。 线阵相机特点 线阵相机使用的线扫描传感器通常只有一行感光单元&#xff08;少数彩色线阵使用…

OpenCV 为轮廓创建边界框和圆(62)

返回:OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 上一篇:OpenCV检测凸包(61) 下一篇 :OpenCV如何为等值线创建边界旋转框和椭圆(62) ​ 目标 在本教程中&#xff0c;您将学习如何&#xff1a; 使用 OpenCV 函数 cv::boundingRect使用 OpenCV 函数 cv::mi…

数据分析--客户价值分析RFM(K-means聚类/轮廓系数)

原数据 import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn import metrics ### 数据抽取&#xff0c;读⼊数据 df pd.read_csv("customers1997.csv") #相对路径读取数据 print(df.info()) pr…

SpringCloud微服务:Eureka 和 Nacos 注册中心

共同点 都支持服务注册和服务拉取都支持服务提供者心跳方式做健康检测 不同点 Nacos 支持服务端主动检测提供者状态&#xff1a;临时实例采用心跳模式&#xff0c;非临时&#xff08;永久&#xff09;实例采用主动检测模式Nacos 临时实例心跳不正常会被剔除&#xff0c;非临时实…

LLM大语言模型原理、发展历程、训练方法、应用场景和未来趋势

LLM&#xff0c;全称Large Language Model&#xff0c;即大型语言模型。LLM是一种强大的人工智能算法&#xff0c;它通过训练大量文本数据&#xff0c;学习语言的语法、语义和上下文信息&#xff0c;从而能够对自然语言文本进行建模。这种模型在自然语言处理&#xff08;NLP&am…

VMware虚拟机中ubuntu使用记录(6)—— 如何标定单目相机的内参(张正友标定法)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、张正友相机标定法1. 工具的准备2. 标定的步骤(1) 启动相机(2) 启动标定程序(3) 标定过程的操作(5)可能的报错 3. 标定文件内容解析 前言 张正友相机标定法…

Linux进程状态

前言 上一期我们对进程的概念做了介绍并尝试理解了进程。介绍了PCB属性的pid等&#xff0c;这一期我们来介绍进程的状态&#xff01; 本期内容介绍 Linux的进程状态 僵尸进程和孤儿进程 理解进程的运行、阻塞和挂起状态 简单理解进程切换 Linux的进程状态 为了理解正在运行的…

定子的检查和包扎及转子的检查

线圈接好后 用摇表测试 线圈和外壳之间的绝缘性&#xff01; 测试通过后进行焊接&#xff01;&#xff0c;焊接的工具在后面的文章中会介绍&#xff01; 焊接好后&#xff0c;包绝缘管。 焊接完成后 进行星型连接&#xff0c;或者三角形连接&#xff01; 白扎带进行绑扎&…

Django初步了解

目录 一、什么是Django 二、Django的设计模式 三、涉及的英文缩写及其含义 四、安装&#xff08;官方教程&#xff09; 一、什么是Django Django是一个Python Web框架&#xff0c;可以快速开发网站&#xff0c;提供一站式的解决方案&#xff0c;包括缓存、数据库ORM、后台…