Spark写PGSQL分区表

这里写目录标题

    • 需求
    • 碰到的问题
      • 格式问题
      • 分区问题(重点)
    • 解决
      • 完整代码
      • 效果

需求

spark程序计算后的数据需要往PGSQL中的分区表进行写入。

碰到的问题

格式问题

使用了字符串格式,导致插入报错。

val frame = df.withColumn("insert_time",current_timestamp()))
Batch entry 0 INSERT INTO t ("a","insert_time") VALUES 
(1,'2023-08-01 10:00:00') was aborted: ERROR: column 
"insert_time" is of type timestamp without time zone but 
expression is of type character varying

分区问题(重点)

一直都是spark计算完后写单表或者hive的表,都需要去手动去维护分区。但是写PGSQL空表(只有表字段,还没有数据,没有创建分区),需要手动先创建分区,否则会报错。

报错信息

Partition key of the failing row contains (insert_time) = 
(2023-08-04 21:14:09.641).  Call getNextException to see other 
errors in the batch.

插入失败的行的分区键包含的时间戳值 2023-08-04 21:14:09.641 在分区表中找不到对应的分区范围。

解决

最终的解决方案是在插入数据之前,通过代码去添加分区,添加好分区后再写入数据即可。

object WritePgSQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkPostgreSQLPartitionedTable").config("spark.master", "local").getOrCreate()// 设置PostgreSQL连接信息val postgresUrl = "jdbc:postgresql://192.168.160.123:5432/test"val connectionProperties = new java.util.Properties()connectionProperties.setProperty("user", "test")connectionProperties.setProperty("password", "123456")// 创建测试数据val data = Seq((1, "2023-08-01 10:00:00"),(2, "2023-08-02 12:00:00"),(3, "2023-08-03 15:00:00"))val columns = Seq("a", "insert_time1")val df = spark.createDataFrame(data).toDF(columns: _*)val frame = df.drop("insert_time1").withColumn("insert_time", current_timestamp().cast("timestamp"))// 动态创建分区范围// p1 可以换成p20230804这样的分区格式// t为表名// (TIMESTAMP '2023-08-04 00:00:00') 分区开始范围,一般通过代码生成,为计算时间的零点// (TIMESTAMP '2023-08-05 00:00:00') 分区结束范围,一般通过代码生成,为计算时间的下一天零点val createPartitionSql =s"""CREATE TABLE "p1" PARTITION OF t FOR VALUES FROM (TIMESTAMP '2023-08-04 00:00:00') TO (TIMESTAMP '2023-08-05 00:00:00') ;"""println(createPartitionSql)// 执行创建分区 SQLval connection = java.sql.DriverManager.getConnection(postgresUrl, connectionProperties)val statement = connection.createStatement()statement.executeUpdate(createPartitionSql)connection.close()// 将数据写入PostgreSQL分区表frame.write.mode("append").jdbc(postgresUrl, "t", connectionProperties)}
}

完整代码

自动生成当天日期和分区名称

object WritePgSQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkPostgreSQLPartitionedTable").config("spark.master", "local").getOrCreate()// 设置PostgreSQL连接信息val postgresUrl = "jdbc:postgresql://192.168.160.123:5432/test"val connectionProperties = new java.util.Properties()connectionProperties.setProperty("user", "test")connectionProperties.setProperty("password", "123456")// 创建测试数据val data = Seq((1, "2023-08-01 10:00:00"),(2, "2023-08-02 12:00:00"),(3, "2023-08-03 15:00:00"))val columns = Seq("a", "insert_time1")val df = spark.createDataFrame(data).toDF(columns: _*)val frame = df.drop("insert_time1").withColumn("insert_time", current_timestamp().cast("timestamp"))// 获取今天和明天的时间范围// 获取当前日期val currentDate = LocalDate.now()// 获取下一天的日期val nextDayDate = currentDate.plusDays(1)// 创建固定的时间部分(00:00:00)val startTime = LocalTime.of(0, 0, 0)// 组合日期和时间来得到完整的日期时间,并格式化为字符串val currentDateTimeString = LocalDateTime.of(currentDate, startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))val nextDayDateTimeString = LocalDateTime.of(nextDayDate, startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))// 格式化为yyyyMMdd字符串val dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd")val currentDateString = currentDate.format(dateFormatter)// 动态创建分区范围val createPartitionSql =s"""CREATE TABLE "p$currentDateString" PARTITION OF tFOR VALUES FROM (TIMESTAMP '$currentDateTimeString') TO (TIMESTAMP '$nextDayDateTimeString') ;"""// 执行创建分区 SQLval connection = java.sql.DriverManager.getConnection(postgresUrl, connectionProperties)val statement = connection.createStatement()statement.executeUpdate(createPartitionSql)connection.close()// 将数据写入PostgreSQL分区表frame.write.mode("append").jdbc(postgresUrl, "t", connectionProperties)}
}

效果

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/75654.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Go语言开发者的Apache Arrow使用指南:读写Parquet文件

Apache Arrow是一种开放的、与语言无关的列式内存格式,在本系列文章[1]的前几篇中,我们都聚焦于内存表示[2]与内存操作[3]。 但对于一个数据库系统或大数据分析平台来说,数据不能也无法一直放在内存中,虽说目前内存很大也足够便宜…

【数据挖掘竞赛】——科大讯飞:锂离子电池生产参数调控及生产温度预测挑战赛

🤵‍♂️ 个人主页:@Lingxw_w的个人主页 ✍🏻作者简介:计算机科学与技术研究生在读 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞👍🏻 收藏 📂加关注+ ​ 【科大讯飞】报名链接:https://challenge.xfyun.cn?invitaC…

【ChatGLM_02】LangChain知识库+Lora微调chatglm2-6b模型+提示词Prompt的使用原则

经验沉淀 1 知识库1.1 Langchain知识库的主要功能(1) 配置知识库(2) 文档数据测试(3) 知识库测试模式(4) 模型配置 2 微调2.1 微调模型的概念2.2 微调模型的方法和步骤(1) 基于ptuning v2 的微调(2) 基于lora的微调 3 提示词3.1 Prompts的定义及原则(1) Prompts是什么&#xf…

CentOS7忘记密码如何重置

忘记密码重置步骤: 1、重启系统,当系统进入引导界面时,按e键。就可以编辑引导选项,在引导选中加入参数rd.break,如图所示: 2、编辑完引导选项后,按Ctrlx组合键引导系统进入紧急模式&#xff0c…

关于CORS的笔记

CORS目录 一、SpringBoot 跨域设置二、CORS(1)总结的图如下(2)简单请求满足的条件(3)响应头(4)请求头(5)使用XMLHttpRequest进行跨域访问1. Access-Control-A…

DEVICENET转ETHERNET/IP网关devicenet协议

捷米JM-EIP-DNT,你听说过吗?这是一款自主研发的ETHERNET/IP从站功能的通讯网关,它能够连接DEVICENET总线和ETHERNET/IP网络,从而解决生产管理系统中协议不同造成的数据交换互通问题。 这款产品在工业自动化领域可谓是一大利器&…

springboot()—— swagger

零、一张图读懂swagger 懂了,这玩意就是用swagger搞出来的! 就是一个后端开发自测的东西嘛! 一、概念 存在即合理,我们看一下swagger诞生的原因:在前后端分离的架构中,前端新增一个字段,后端就…

【FAQ】在Linux中使用curl访问EasyCVR,返回报错Unauthorized的原因排查

EasyCVR可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力,比如:视…

Vue-函数式组件

最近在开发项目的时候&#xff0c;定制了一个公司内部样式的Modal模态框组件。 Modal组件伪代码 <!-- Modal/index.vue--> <template><div class"modal-container" id"modalContainer"><!-- Modal Content --><div class&quo…

45.ubuntu Linux系统安装教程

目录 一、安装Vmware 二、Linux系统的安装 今天开始了新的学习&#xff0c;Linux,下面是今天学习的内容。 一、安装Vmware 这里是在 Vmware 虚拟机中安装 linux 系统&#xff0c;所以需要先安装 vmware 软件&#xff0c;然 后再安装 Linux 系统。 所需安装文件&#xff1a;…

Java-接口

目录 1.接口的概念 2.语法规则 3.接口使用 4.接口特性 5.实现多个接口 6.接口间的继承 7.接口使用实例 1.接口的概念 电脑的USB口上&#xff0c;可以插&#xff1a;U盘、鼠标、键盘等所有符合USB协议的设备&#xff1b;数据线的type-c口上&#xff0c;可以插手机&#xf…

Spring源码篇(九)自动配置扫描class的原理

文章目录 前言ClassLoader如何加载jar包里的class自动配置扫描class的原理spring中的加载方式源码总结 前言 spring是怎样通过ComponentScan&#xff0c;或者自动配置扫描到了依赖包里class的&#xff1f; ClassLoader 这里涉及到了class Loader的机制&#xff0c;有些复杂&…

Unity Image(RawImage) 实现按轴心放大缩小,序列化存储轴心信息,实现编译器窗口保存轴心

工作时分配给我的要实现的功能&#xff0c;写的时候遇到挺多的坑的&#xff0c;在此记录一下 效果 放大缩小的效果 2.编译器扩展窗口记录 实现点 1.Json序列化存储图片轴心位置, 放大倍率&#xff0c;放大所需要的事件 2.用了编译器扩展工具便于保存轴心信息坑点 1.Imag…

飞桨AI Studio可以玩多模态了?MiniGPT4实战演练!

MiniGPT4是基于GPT3的改进版本&#xff0c;它的参数量比GPT3少了一个数量级&#xff0c;但是在多项自然语言处理任务上的表现却不逊于GPT3。项目作者以MiniGPT4-7B作为实战演练项目。 创作者&#xff1a;衍哲 体验链接&#xff1a; https://aistudio.baidu.com/aistudio/proj…

前端如何打开钉钉(如何唤起注册表中路径与软件路径不关联的软件)

在前端唤起本地应用时&#xff0c;我查询了资料&#xff0c;在注册表中找到腾讯视频会议的注册表情况&#xff0c;如下&#xff1a; 在前端代码中加入 window.location.href"wemeet:"; 就可以直接唤起腾讯视频会议&#xff0c;但是我无法唤起钉钉 之所以会这样&…

leetcode每日一题Day2——344. 反转字符串

✨博主&#xff1a;命运之光 &#x1f984;专栏&#xff1a;算法修炼之练气篇&#xff08;C\C版&#xff09; &#x1f353;专栏&#xff1a;算法修炼之筑基篇&#xff08;C\C版&#xff09; &#x1f433;专栏&#xff1a;算法修炼之练气篇&#xff08;Python版&#xff09; …

【css】css实现一个简单的按钮

四种链接状态分别是&#xff1a; a:link - 正常的&#xff0c;未访问的链接a:visited - 用户访问过的链接a:hover - 用户将鼠标悬停在链接上时a:active - 链接被点击时 <style> a:link, a:visited {//未访问、访问过background-color: #07c160;//设置背景颜色color: wh…

【ASP.NET MVC】使用动软(五)(13)

一、问题 前文完成的用户登录后的首页如下&#xff1a; 后续账单管理、人员管理等功能页面都有相同的头部&#xff0c;左边和下边&#xff0c;唯一不同的右边内容部分&#xff0c;所以要解决重复设计的问题。 二、解决方法——使用布局页 在Views上右键添加新建项&#xff…

基于量子同态加密的改进多方量子私有比较

摘要量子同态加密在隐私保护方面具有明显的优势。本文提出了一种改进的基于量子同态加密的多方量子私钥比较协议。首先&#xff0c;引入可信密钥中心&#xff0c;安全辅助加密密钥的分发和解密密钥的更新&#xff0c;同时防止恶意服务器发布虚假结果的攻击;在保证所有参与者得到…

RPC原理与Go RPC详解

文章目录 RPC原理与Go RPC什么是RPC本地调用RPC调用HTTP调用RESTful API net/rpc基础RPC示例基于TCP协议的RPC使用JSON协议的RPCPython调用RPC RPC原理 RPC原理与Go RPC 什么是RPC RPC&#xff08;Remote Procedure Call&#xff09;&#xff0c;即远程过程调用。它允许像调用…