[实战-11] FlinkSql 设置时区对TIMESTAMP和TIMESTAMP_LTZ的影响

table.local-time-zone

  • table.local-time-zone
  • DataStream-to-Table Conversion(拓展知识)
  • 代码测试
  • flinksql代码
  • 执行结果截图
    • 1. Asia/Shanghai 结果如下
    • 2. UTC结果如下

table.local-time-zone

table.local-time-zone可用于设置flinksql的时区。
flink的内置数据类型TIMESTAMP(n)或者是TIMESTAMP_LTZ(n), 我们设置水位线都是基于这两种类型,不同的是前者本质是字符串形势,后者本质是long,也因此前者不受时区影响,后者受时区影响类型。(n指的毫秒级的精度取值范围是 0~9)
原始数据库如果不是时间类型,可能要用TO_TIMESTAMP(字符串格式的时间)或者TO_TIMESTAMP_LTZ(long数字,n)

如果原始数据库是string则需要用TO_TIMESTAMP(字符串格式的时间字段)转成TIMESTAMP(n)
如果原始数据库中是long则需要用TO_TIMESTAMP_LTZ(long数字,n) 转成TIMESTAMP_LTZ(n)

DataStream-to-Table Conversion(拓展知识)

datastream API到Table Api转换的时候,是以后string的形式传递event_time, 并且这个string在DataStream Api是以UTC时区转换的,如果你的原始数据中是long, 如果不做处理展示出来的string就是UTC字符串,为了在东八区展示,则需要将long再加上8小时

        // 水位线 允许乱序WatermarkStrategy<String> waterStrategy = WatermarkStrategy.<String>forMonotonousTimestamps() //ofSeconds(20).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {try {Mybook book= JSON.parseObject(element,Mybook.class);return boo.time+8*60*60*1000  //转成东八区}catch (Exception e){return recordTimestamp;}}}).withIdleness(Duration.ofSeconds(timeWindowIdleness));SingleOutputStreamOperator<UserSlotGame> processStream = env.fromSource(source, waterStrategy, "readKafka").process(new ProcessFunction<String, UserSlotGame>() {@Overridepublic void processElement(String value, Context ctx, Collector<UserSlotGame> out) throws Exception {// 省略}}) ;

代码测试

mysql时区是Asia/Shanghai

CREATE TABLE `versioned_rates` (`operation_code` int DEFAULT NULL,`update_time` varchar(255) DEFAULT NULL, -- 注意这是字符串`product_id` varchar(255) DEFAULT NULL,`product_name` varchar(255) DEFAULT NULL,`price` float DEFAULT NULL,`time_long` bigint NOT NULL DEFAULT '0' -- 注意这是long
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ciINSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:01:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(1, '2024-01-01 00:02:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_001', 'scooter', 11.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_001', 'scooter', 12.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(2, '2024-01-01 12:00:00', 'p_002', 'basketball', 23.11, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(3, '2024-01-01 12:00:00', 'p_002', 'basketball', 19.99, 1730346179000);
INSERT INTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)
VALUES(4, '2024-01-01 18:00:00', 'p_001', 'scooter', 12.99, 1730346179000);

flinksql代码

package com.pg.TableAndDataStreamApi;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;/*
*
* */
public class version_table {private static final String SOURCE="CREATE TABLE source_table(\n" +"\toperation_code int,\n" +"\tupdate_time string,\n" +"\tup_t AS TO_TIMESTAMP(update_time),\n" +"\ttime_long bigint,\n" +"\tbbb AS TO_TIMESTAMP_LTZ(time_long,3) \n" +"    ) WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://ip:3306/flink',\n" +"   'driver'='com.mysql.cj.jdbc.Driver',\n "+"   'username'='用户名',\n"+"   'password'='密码',\n"+"   'table-name' = 'versioned_rates'\n" +")";public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql(SOURCE);Configuration configuration = new Configuration();
//        configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "Asia/Shanghai");tableEnv.getConfig().addConfiguration(configuration);// 从 MySQL 表中选择所有行Table t = tableEnv.sqlQuery("select * from source_table");t.execute().print();}
}

执行结果截图

TO_TIMESTAMP_LTZ 受时区影响
而TO_TIMESTAMP()意味着原始数据中本就是string, 是不会受到时区影响的

  1. 下方第一个红色列不管是UTC还是 Asia/Shanghai 我们看大的string都是一样的
  2. 下方第一个红色列UTC比 Asia/Shanghai 少了8个小时

1. Asia/Shanghai 结果如下

在这里插入图片描述

2. UTC结果如下

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/464077.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

rnn/lstm 项目实战

tip:本项目用到的数据和代码在https://pan.baidu.com/s/1Cw6OSSWJevSv7T1ouk4B6Q?pwdz6w2 1. RNN : 预测股价 任务&#xff1a;基于zgpa_train.csv数据,建立RNN模型,预测股价 1.完成数据预处理&#xff0c;将序列数据转化为可用于RNN输入的数据 2.对新数据zgpa_test.csv进…

MySQL超大分页怎么优化处理?limit 1000000,10 和 limit 10区别?覆盖索引、面试题

1. limit 100000,10 和 limit 10区别 LIMIT 100000, 10&#xff1a; 这个语句的意思是&#xff0c;从查询结果中跳过前100000条记录&#xff0c;然后返回接下来的10条记录。这通常用于分页查询中&#xff0c;当你需要跳过大量的记录以获取后续的记录时。例如&#xff0c;如果你…

规范:项目、目录、文件、样式、事件、变量、方法、url参数、注释、git提交 命名规范及考证

一、规范命名的重要性 易懂、通用、规范、标准、专业性、是经验积累的体现 1.1、常见命名方法 序号命名方法解释1全小写2全大写3驼峰&#xff1a;小驼峰命名法4驼峰&#xff1a;大驼峰命名法5烤串命名法 / 脊柱命名法6下划线分隔法 二、项目名 采用小写字母和中划线&#…

NumPy Ndarray学习

1.NumPy Ndarray 对象简介 NumPy 最重要的特点是其 N 维数组对象 ndarray&#xff0c;它是一系列同类型数据的集合&#xff0c;以 0 下标为开始进行集合中元素的索引。ndarray 对象是用于存放同类型元素的多维数组。ndarray 中的每个元素在内存中都有相同存储大小的区域。 2.N…

二:MySQL基础---查询专项练习

目录 表结构 1. 数据月表&#xff08;zbr_data_monthly_data_YYYYMM&#xff09; 2. 分类表&#xff08;zbr_category&#xff09; 3. 用户表&#xff08;zbr_user&#xff09; 4. 交易表&#xff08;zbr_transaction&#xff09; 查询知识点 1. 基本查询 2. 连接查询 …

C++线程异步

本文内容来自&#xff1a; 智谱清言 《深入应用C11 代码优化与工程级应用》 std::future std::future作为异步结果的传输通道&#xff0c;可以很方便地获取线程函数的返回值。 std::future_status Ready (std::future_status::ready): 当与 std::future 对象关联的异步操作…

Python小游戏19——滑雪小游戏

运行效果 python代码 import pygame import random # 初始化Pygame pygame.init() # 设置屏幕尺寸 screen_width 800 screen_height 600 screen pygame.display.set_mode((screen_width, screen_height)) pygame.display.set_caption("滑雪小游戏") # 定义颜色 WH…

批量删除redis数据【亲测可用】

文章目录 引言I redis客户端基础操作key的命名规则批量查询keyII 批量删除key使用连接工具进行分组shell脚本示例其他方法III 知识扩展:控制短信验证码获取频率引言 批量删除redis数据的应用: 例如缓存数据使用了新的key存储,需要删除废弃的key。RedisTemplate的key序列化采…

04字符串算法/代码随想录

四、字符串 反转字符串 力扣344 遇到数组双指针真是太好用了&#xff0c;左右指针不断逼近即可&#xff0c;代码也很简单 class Solution {public void reverseString(char[] s) {int fast s.length - 1;int slow 0;while (slow < fast) {char temp s[fast];s[fast] s[…

conda找不到对应版本的pytorch,就会自动下载cpu版本的

踩坑一&#xff1a; conda install pytorch2.0.1 torchvision0.15.2 torchaudio2.0.2 pytorch-cuda11.7 -c pytorch -c nvidia (本人的服务器支持的 且python3.8.20) 先nvidia-smi查看自己cuda支持的最高版本&#xff0c;然后去pytorch官网寻找对应的torch、torchaudio、to…

信息学科平台设计与实现:Spring Boot技术详解

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

二、应用层,《计算机网络(自顶向下方法 第7版,James F.Kurose,Keith W.Ross)》

文章目录 零、前言一、应用层协议原理1.1 网络应用的体系结构1.1.1 客户-服务器(C/S)体系结构1.1.2 对等体&#xff08;P2P&#xff09;体系结构1.1.3 C/S 和 P2P体系结构的混合体 1.2 进程通信1.2.1 问题1&#xff1a;对进程进行编址&#xff08;addressing&#xff09;&#…

Java面向对象 C语言字符串常量

1. &#xff08;1&#xff09;. package liujiawei;public class Phone {String brand;double price;public void call(){System.out.println("手机打电话");}public void play(){System.out.println("手机打游戏");} } public class phonetest {public…

【逆向基础】十八、PE文件格式(三)

一、简介 文本章主要讲结构体IMAGE_DATA_DIRECTORY数组。它制定了各种数据目录的地址与大&#xff1b;PE装载器可以通过这些信息准确加载PE文件所需的函数&#xff0c;资源等&#xff1b;此外&#xff0c;数据目录表也是设置钩子&#xff0c;注入等逆向的理论基础。所以学习这…

Session条件竞争--理论

条件竞争 多个线程同时访问一个共享变量或文件时&#xff0c;由于线程的执行顺序不符合预期而导致最后的执行结果不符合开发者的预期。 session session,被称为“会话控制”。Session对象存储特定用户会话所需的属性及配置信息。这样&#xff0c;当用户在应用程序的Web页之间…

Centos8安装软件失败更换镜像源

问题 在Centos 8上安装git&#xff0c;报错如下&#xff1a; sudo dnf install git -y Repository extras is listed more than once in the configuration CentOS Linux 8 - AppStream 0.0 B/s …

如何让网页中的图片不可下载,让文字不可选中/复制

使用css中的伪属性来完成这个操作. 效果展示 文字不可复制: 图中这几个中文字符无法被选中,双击前面这几个字也只能选中后面的英文内容,无法选中也就无法复制. 既然常规方式无法选中,那打开浏览器开发者工具总能复制吧! 我经常这样干, 但是很遗憾,页面检查中根本就没那些内容…

Linux 之 信号概念、进程、进程间通信、线程、线程同步

学习任务&#xff1a; 1、 信号&#xff1a;信号的分类、进程对信号的处理、向进程发送信号、信号掩码 2、 进程&#xff1a;进程与程序的概念、进程的内存布局、进程的虚拟地址空间、fork创建子进程、wait监视子进程 3、 学习进程间通信&#xff08;管道和FIFO、信号、消息队列…

Jmeter——结合Allure展示测试报告

在平时用jmeter做测试时&#xff0c;生成报告的模板&#xff0c;不是特别好。大家应该也知道allure报告&#xff0c;页面美观。 先来看效果图&#xff0c;报告首页&#xff0c;如下所示&#xff1a; 报告详情信息&#xff0c;如下所示&#xff1a; 运行run.py文件&#xff0c;…

ElasticSearch - Bucket Script 使用指南

文章目录 官方文档Bucket Script 官文1. 什么是 ElasticSearch 中的 Bucket Script&#xff1f;2. 适用场景3. Bucket Script 的基本结构4. 关键参数详解5. 示例官方示例&#xff1a;计算每月 T 恤销售额占总销售额的比率百分比示例计算&#xff1a;点击率 (CTR) 6. 注意事项与…