flink: 将接收到的tcp文本流写入HBase

一、依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo2</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>2.8.0</pulsar.version><jackson.version>2.10.5</jackson.version><!--<jackson.version>2.6.7</jackson.version>--></properties><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-kafka</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>${pulsar.version}</version><exclusions><exclusion><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId></exclusion></exclusions></dependency><!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version></dependency>--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-1.4_2.12</artifactId><version>1.13.6</version></dependency></dependencies></project>

二、HBase中建表:

create 'hbasetable','family1','family2','family3','family4'

三、在一台服务器上开启nc

nc -lk 9999

四、运行,demo程序

package cn.edu.tju;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;import java.util.UUID;public class FlinkHBase3 {
//nc 服务器地址private static String HOST_NAME = "xx.xx.xx.xx";private static int PORT = 9999;private static String DELIMITER ="\n";public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> socketDataInfo =  env.socketTextStream(HOST_NAME, PORT, DELIMITER);SingleOutputStreamOperator<DataInfo> dataInfoStream = socketDataInfo.map(new MapFunction<String, DataInfo>() {@Overridepublic DataInfo map(String value) throws Exception {String[] stringList = value.split(",");DataInfo dataInfo = new DataInfo(UUID.randomUUID().toString(), Long.parseLong(stringList[0]), stringList[1], Double.parseDouble(stringList[2]));return dataInfo;}});Table dataTable = tableEnv.fromDataStream(dataInfoStream,"rowkey,ts,info,val");tableEnv.createTemporaryView("dataTable", dataTable);// 这里要配自己HBase的zookeeper地址tableEnv.executeSql("CREATE TABLE flinkTable (\n" +" rowkey STRING,\n" +" family1 ROW<ts BIGINT, info STRING, val DOUBLE>,\n" +" PRIMARY KEY (rowkey) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'hbase-1.4',\n" +" 'table-name' = 'hbasetable',\n" +" 'zookeeper.quorum' = 'xx.xx.xx.xx:2181'\n" +")");tableEnv.executeSql("INSERT INTO flinkTable " +"SELECT rowkey, ROW(ts,info,val) FROM dataTable");env.execute("HBaseFlinkJob");}public static class DataInfo{private String rowkey;private Long ts;private String info;private double val;public String getRowkey() {return rowkey;}public void setRowkey(String rowkey) {this.rowkey = rowkey;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public String getInfo() {return info;}public void setInfo(String info) {this.info = info;}public double getVal() {return val;}public void setVal(double val) {this.val = val;}@Overridepublic String toString() {return "DataInfo{" +"ts=" + ts +", info='" + info + '\'' +", val='" + val + '\'' +'}';}public DataInfo( String rowkey, Long ts, String info, double val) {this.rowkey = rowkey;this.ts = ts;this.info = info;this.val = val;}public DataInfo() {}}}

五、在nc窗口输入:

1689999832,dong,32.45

六、在HBase检查数据是否已经写入:

scan 'hbasetable'

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/294269.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 排序的多种实现方式(经典)

一、 对数组进行排序 最常见的排序是对一个数组排序&#xff0c;比如&#xff1a; int[] aArray new int[8] { 18, 17, 21, 23, 11, 31, 27, 38 }; 1、利用冒泡排序进行排序&#xff1a; &#xff08;即每个值都和它后面的数值比较&#xff0c;每次拿出最小值&#xff09; s…

java数据结构与算法刷题-----LeetCode695. 岛屿的最大面积

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 深度优先遍历2. 广度优先 1. 深度优先遍历 这不是找最短路径&…

[ruby on rails] ruby使用vscode做开发

ruby LSP实现 ruby插件推荐用这个来实现&#xff0c;但是现在这个在加载文件索引时候&#xff0c;特别慢&#xff0c;时好时坏&#xff0c;所以现在推荐用Solargraph实现 ruby LSP要求ruby版本3以上&#xff0c;如果在旧版本中使用&#xff0c;需要指定bundleGemfile路径 旧版…

电脑win10系统更新后开机很慢,更新win10后电脑开机怎么变慢了

很多用户反映&#xff0c;更新win10后电脑开机怎么变慢了呢?现在动不动就要30几秒&#xff0c;以前都是秒开机的&#xff0c;要怎么设置才能提高开机速度?小伙伴们别着急&#xff0c;主要原因可能是关机设置中没有勾选启用快速启动&#xff0c;或者是开机启动设置的问题&…

Excel 隔几行批量插入空白行

例如如下表格&#xff0c;每隔6行插入一行数据&#xff1a; 1&#xff09;第7个单元格输入1 2&#xff09;选中6个单元格&#xff0c;然后双击填充数据&#xff1a; 3&#xff09;F5 找到常量 Ctrlshift 复制插入的数据&#xff0c;然后选中数据 按F5&#xff0c;定位到空值

两数之和-考察哈希表的运用

题目 给定一个整数数组 n u m s nums nums和一个整数目标值 t a r g e t target target&#xff0c;请你在该数组中找出和为目标值 t a r g e t target target的那 两个整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同…

网络基础——ISIS

名词 ISIS&#xff1a;中间系统到中间系统&#xff0c;优先级是15集成化ISIS&#xff1a;这是在优化后&#xff0c;可以使用在OSI模型上的NET地址&#xff1a;由区域ID、系统ID和SEL组成&#xff0c;一台设备上最多配置3个NET地址&#xff0c;条件是区域号要不一致&#xff0c;…

使用VM搭建Linux服务器局域网

最近在了解一些LAN相关的内容&#xff0c;抱着学习的心态就使用了VM安装Linux虚拟机进行组建LAN&#xff08;局域网&#xff09;的测试。 一、虚拟机网络规划 下面是我安装的虚拟机网络配置 虚拟机编号 IP地址 子网掩码 网络连接 1 192.168.164.100 255.255.255.0 NAT…

golang语言系列:Git

云原生学习路线导航页&#xff08;持续更新中&#xff09; 本文是 golang语言系列 文章&#xff0c;主要对编程通用技能Git进行学习 1.为什么使用版本控制系统 版本控制系统可以解决的问题 代码备份很重要版本控制很重要协同工作很重要责任追溯很重要 常见的版本控制系统 GitS…

ES学习日记(七)-------Kibana安装和简易使用

前言 首先明确一点&#xff0c;Kibana是一个软件&#xff0c;不是插件。 Kibana 是一款开源的数据分析和可视化平台&#xff0c;它是 Elastic stack 成员之一&#xff0c;设计用于和Elasticsearch 协作。您可以使用 Kibana 对 Elasticsearch 索引中的数据进行搜索&#xff0c;…

css酷炫边框

边框一 .leftClass {background: #000;/* -webkit-animation: twinkling 1s infinite ease-in-out; 1秒钟的开始结束都慢的无限次动画 */ } .leftClass::before {content: "";width: 104%;height: 102%;border-radius: 8px;background-image: linear-gradient(var(…

服务器设置了端口映射之后外网还是访问不了服务器

目录 排查思路参考&#xff1a; 1、确认服务是否在运行 2、确认端口映射设置是否正确 3、使用防火墙测试到服务器的连通性 4、检查服务内部的配置 5、解决办法 6、学习小分享 我们在一个完整的网络数据存储服务系统设备中都会存有业务服务器、防火墙、交换机、路由器&a…

穿山甲广告平台SDK接入效果怎么样?

广告收入是大多数开发者的应用变现收入来源&#xff0c;如何进行流流量变现是从应用设计之初就需要开发者思考的问题。 穿山甲广告平台作为国内第三方广告变现平台&#xff0c;是不少开发者选择的对接平台。 穿山甲广告平台的广告类型较多&#xff0c;有信息流&#xff0c;ba…

【单片机 5.3开关检测】

文章目录 前言一、5.3开关检测1.1没按键按下的1.2有按键按下的 二、改进1.改进 三、独立键盘3.1为什么要取反3.2 实用的按键 总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 课程需要&#xff1a; 提示&#xff1a;以下是本篇文章正文内容&#xf…

VScode 集成终端设置默认打开当前文件夹 mac系统

一.快捷键设置 搜索 openInIntegratedTerminal 如图&#xff1a; 二.设置cmd 默认打开位置 点击设置 搜索 ntegrated:cwd 如下图&#xff1a; 三.查看ip 快捷指令&#xff1a; ipconfig getifaddr en0

前端性能优化-Table渲染速度优化

教务系统-排课页面性能优化总结 一、前言 在公司教务系统中,排课页面慢的令人发指,在某些情况由于数据量大导致页面主进程卡死,遂组织进行一次排查优化,现记录一下 二、效果对比 以下数据均为UAT环境 Performence对比 更改前: 主进程渲染时间为 8s 教务系统-排课页面性…

HTTP和HTTPS谁传输数据更安全?

1.HTTP HTTP在传输数据时&#xff0c;通常都是明文传输&#xff0c;也就是传输的数据没有进行加密。在这种情况下&#xff0c;如果传输的是一些敏感数据&#xff0c;比如某银行卡密码&#xff0c;就很容易被别人截获到&#xff0c;这就对我们的个人利益产生了威胁。 HTTP传输数…

go优雅读取zip压缩包-进阶2

【前言】 看到这里就晓得了&#xff0c;之前那一一篇文章go优雅读取zip压缩包&#xff0c;依旧还是有些问题&#xff0c;接下来&#xff0c;我就开始描述下本文章讲述的内容&#xff1a; 面对需要多次读取多个zip压缩包里的指定文件内容&#xff0c;如何提升读取的速度&#x…

C++初阶:5.STL简介(了解)

STL简介&#xff08;了解&#xff09; 一.什么是STL STL(standard template libaray-标准模板库)&#xff1a;是C标准库的重要组成部分&#xff0c;不仅是一个可复用的组件库&#xff0c;而且是一个包罗数据结构与算法的软件框架。 二. STL的版本 原始版本 Alexander Stepan…

基于PSO优化的CNN-LSTM-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络&#xff08;CNN&#xff09;在时间序列中的应用 4.2 长短时记忆网络&#xff08;LSTM&#xff09;处理序列依赖关系 4.3 注意力机制&#xff08;Attention&#xff09; 5…