物流实时数仓:数仓搭建(DWS)一

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
物流实时数仓:数仓搭建(DWS)一


文章目录

  • 系列文章目录
  • 前言
  • 一、代码编写
    • 1.文件创建
      • 1.主程序
      • 2.实体类
      • 3.自定义触发器
      • 4.自定义聚合函数
      • 5.在HbaseUtil中添加查询方法
      • 6.JedisUtil工具类
      • 7.封装DimUtil工具类,使用旁路缓存优化查询维度
      • 8.修改实体类TmsConfigDimBean
      • 9.传递op
      • 10.获取线程池的工具类
      • 11.异步关联函数DimAsyncFunction
      • 12.ClickHouseUtil工具类
      • 13. DimSinkFunction
      • 14. TransientSink注解
    • 2.主程序
    • 3.开窗聚合
      • 1.MyTriggerFunction
      • 2.MyAggregationFunction
    • 4.关联维度信息
      • 1.DimAsyncFunction
      • 2.DimJoinFunction
      • 3.ThreadPoolUtil
      • 4.DimUtil
      • 5.JedisUtil
      • 6. HbaseUtil
      • 7. DwsBoundOrgSortDayBean
      • 8.补充维度字段
      • 9. MyBroadcastProcessFunction
      • 10. DimSinkFunction
    • 5.写入CK
      • 1. ClickHouseUtil
      • 2.TransientSink
  • 二、代码测试
    • 1.程序启动
    • 2.修改kafka分区
    • 3.ck建表
      • 1.建库
      • 2.建表
      • 3.物化视图
      • 4.查看结果
  • 总结


前言

这次博客,我们进行各机构分拣次数的统计。统计当日各机构的分拣次数,并补充城市、省份等维度信息,写入ClickHouse对应表。要求每十秒钟更新一次统计结果。

大体流程如图。
在这里插入图片描述


一、代码编写

1.文件创建

1.主程序

在这里插入图片描述

2.实体类

在这里插入图片描述

3.自定义触发器

在这里插入图片描述

4.自定义聚合函数

在这里插入图片描述

5.在HbaseUtil中添加查询方法

在这里插入图片描述

6.JedisUtil工具类

在这里插入图片描述

7.封装DimUtil工具类,使用旁路缓存优化查询维度

在这里插入图片描述

8.修改实体类TmsConfigDimBean

在这里插入图片描述

9.传递op

在这里插入图片描述

10.获取线程池的工具类

在这里插入图片描述

11.异步关联函数DimAsyncFunction

在这里插入图片描述

12.ClickHouseUtil工具类

在这里插入图片描述以上就是这次博客要更改或创建的java文件。

13. DimSinkFunction

当维度数据更新时,删除redis中的对应数据。
在这里插入图片描述

14. TransientSink注解

某些字段不需要写入ClickHouse,但对统计有帮助,我们可以通过添加自定义注解,在写出时获取字段的TransientSink注解,通过判断是否注解是否为空在写出时忽略指定字段。
在这里插入图片描述

2.主程序

DwsBoundOrgSortDay需要完成的任务如以下流程图。
在这里插入图片描述

package com.atguigu.tms.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwsBoundOrgSortDayBean;
import com.atguigu.tms.realtime.utils.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.concurrent.TimeUnit;public class DwsBoundOrgSortDay {public static void main(String[] args) throws Exception {// 环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// kafka读取数据String topic = "tms_dwd_bound_sort";String groupId = "dws_tms_dwd_bound_sort";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 对流中的数据进行类型转换 jsonStr-> 实体类SingleOutputStreamOperator<DwsBoundOrgSortDayBean> dwsBoundOrgSortDayBeanSingleOutputStreamOperator = kafkaStrDS.map(new MapFunction<String, DwsBoundOrgSortDayBean>() {@Overridepublic DwsBoundOrgSortDayBean map(String jsonStr) throws Exception {DwdBoundSortBean dwdBoundSortBean = JSON.parseObject(jsonStr, DwdBoundSortBean.class);return DwsBoundOrgSortDayBean.builder().orgId(dwdBoundSortBean.getOrgId()).sortCountBase(1L).ts(dwdBoundSortBean.getTs() + 8 * 60 * 60 * 1000L).build();}});// 指定Watermark以及提取事件事件字段SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withWatermarkDS = dwsBoundOrgSortDayBeanSingleOutputStreamOperator.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsBoundOrgSortDayBean>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<DwsBoundOrgSortDayBean>() {@Overridepublic long extractTimestamp(DwsBoundOrgSortDayBean boundOrgSortDayBean, long recordTimestamp) {return boundOrgSortDayBean.getTs();}}));//        withWatermarkDS.print("###");// 按照机构id进行分组KeyedStream<DwsBoundOrgSortDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsBoundOrgSortDayBean::getOrgId);// 开窗WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1L)));// 指定自定义触发器WindowedStream<DwsBoundOrgSortDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<>());// 聚合SingleOutputStreamOperator<DwsBoundOrgSortDayBean> aggregateDS = triggerDS.aggregate(new MyAggregationFunction<DwsBoundOrgSortDayBean>() {@Overridepublic DwsBoundOrgSortDayBean add(DwsBoundOrgSortDayBean value, DwsBoundOrgSortDayBean accumulator) {if (accumulator == null) {return value;}accumulator.setSortCountBase(accumulator.getSortCountBase() + 1);return accumulator;}},new ProcessWindowFunction<DwsBoundOrgSortDayBean, DwsBoundOrgSortDayBean, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<DwsBoundOrgSortDayBean> elements, Collector<DwsBoundOrgSortDayBean> out) throws Exception {for (DwsBoundOrgSortDayBean element : elements) {// 获取窗口起始时间long stt = context.window().getStart();// 将窗口时间左移8小时 并转换格式element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));element.setTs(System.currentTimeMillis());out.collect(element);}}});// 关联维度(城市、省份)// 关联机构维度 获取机构名称// 异步I/OSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withOrgNameDS = AsyncDataStream.unorderedWait(aggregateDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setOrgName(dimInfoJsonObj.getString("org_name"));String orgParentId = dimInfoJsonObj.getString("org_parent_id");sortDayBean.setJoinOrgId(orgParentId != null?orgParentId:sortDayBean.getOrgId());}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getOrgId());}},60,TimeUnit.SECONDS);// 补充城市IDSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityIdDS = AsyncDataStream.unorderedWait(withOrgNameDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_organ") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityId(dimInfoJsonObj.getString("region_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getJoinOrgId());}},60,TimeUnit.SECONDS);// 关联地区维度表 根据城市的id获取城市名称以及当前城市所属的省份idSingleOutputStreamOperator<DwsBoundOrgSortDayBean> withCityNameAndProvinceIdDS = AsyncDataStream.unorderedWait(withCityIdDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setCityName(dimInfoJsonObj.getString("name"));sortDayBean.setProvinceId(dimInfoJsonObj.getString("parent_id"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getCityId());}},60, TimeUnit.SECONDS);// 关联地区维度表 根据省份的id获取省份的名称SingleOutputStreamOperator<DwsBoundOrgSortDayBean> withProvinceDS = AsyncDataStream.unorderedWait(withCityNameAndProvinceIdDS,new DimAsyncFunction<DwsBoundOrgSortDayBean>("dim_base_region_info") {@Overridepublic void join(DwsBoundOrgSortDayBean sortDayBean, JSONObject dimInfoJsonObj) {sortDayBean.setProvinceName(dimInfoJsonObj.getString("name"));}@Overridepublic Tuple2<String, String> getCondition(DwsBoundOrgSortDayBean sortDayBean) {return Tuple2.of("id", sortDayBean.getProvinceId());}},60, TimeUnit.SECONDS);withProvinceDS.print(">>>>");// 将关联的结果写入ck中withProvinceDS.addSink(ClickHouseUtil.getJdbcSink("insert into dws_bound_org_sort_day_base values(?,?,?,?,?,?,?,?,?)"));env.execute();}
}

现在我们就按照主程序的调用来完成其他文件的编写。

3.开窗聚合

开窗之前没有用到新的函数,所以不说了。

1.MyTriggerFunction

自定义触发器

package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;//自定义触发器 每10s触发一次窗口计算
public class MyTriggerFunction<T>  extends Trigger<T, TimeWindow> {@Overridepublic TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {ValueStateDescriptor<Boolean> valueStateDescriptor= new ValueStateDescriptor<Boolean>("isFirstState",Boolean.class);ValueState<Boolean> isFirstState = ctx.getPartitionedState(valueStateDescriptor);Boolean isFirst = isFirstState.value();if(isFirst == null){//如果是窗口中的第一个元素//将状态中的值进行更新isFirstState.update(true);//注册定时器  当前事件时间向下取整后 + 10s后执行ctx.registerEventTimeTimer(timestamp -timestamp%10000L  + 2000L);}else if(isFirst){isFirstState.update(false);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}//time 表示事件时间触发器 触发时间@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {long end = window.getEnd();if(time < end){if(time + 2000L < end){ctx.registerEventTimeTimer(time + 2000L);}return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}
}

2.MyAggregationFunction

自定义聚合函数

package com.atguigu.tms.realtime.app.func;import org.apache.flink.api.common.functions.AggregateFunction;public abstract class MyAggregationFunction<T> implements AggregateFunction<T,T,T> {@Overridepublic T createAccumulator() {return null;}@Overridepublic T getResult(T accumulator) {return accumulator;}@Overridepublic T merge(T a, T b) {return null;}
}

4.关联维度信息

1.DimAsyncFunction

异步I/O

package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DimJoinFunction;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.ThreadPoolUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import java.util.Collections;
import java.util.concurrent.ExecutorService;public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T> {private String tableName;public DimAsyncFunction(String tableName) {this.tableName = tableName;}private ExecutorService executorService;@Overridepublic void open(Configuration parameters) throws Exception {executorService = ThreadPoolUtil.getInstance();}@Overridepublic void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {// 从线程池中获取线程,发送异步请求executorService.submit(new Runnable() {@Overridepublic void run() {// 根据流中的对象获取要作为查询条件的主键或者外键Tuple2<String, String> keyNameAndValue = getCondition(obj);// 根据查询条件获取维度对象JSONObject dimInfoJsonObj = DimUtil.getDimInfo(TmsConfig.HBASE_NAMESPACE, tableName, keyNameAndValue);// 将维度对象的属性补充到流中的对象上if (dimInfoJsonObj != null) {join(obj, dimInfoJsonObj);}// 向下游传递数据resultFuture.complete(Collections.singleton(obj));}});}
}

2.DimJoinFunction

我们将需要DimAsyncFunction中一些需要子类实现的函数写入DimJoinFunction,当作接口接入。

package com.atguigu.tms.realtime.beans;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;public interface DimJoinFunction<T> {void join(T obj, JSONObject dimInfoJsonObj);Tuple2<String, String> getCondition(T obj);
}

3.ThreadPoolUtil

异步I/O中用作创建线程池的工具类

package com.atguigu.tms.realtime.utils;import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {private static volatile ThreadPoolExecutor poolExecutor;public static synchronized ThreadPoolExecutor getInstance() {if (poolExecutor == null) {synchronized (ThreadPoolUtil.class){if (poolExecutor == null) {poolExecutor = new ThreadPoolExecutor(4,20,300,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(Integer.MAX_VALUE));}}}return poolExecutor;}
}

4.DimUtil

在维度关联时,我们需要从hbase中获取维度信息,为了为了优化查询速度,我们引入了redis,流程如图所示
在这里插入图片描述

package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class DimUtil {public static JSONObject getDimInfo(String namespace, String tableName, Tuple2<String, String> nameAndValue) {// 获取的查询条件中的字段名以及字段值String keyName = nameAndValue.f0;String keyValue = nameAndValue.f1;// 拼接从Redis中查询数据的KeyString redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;// 操作Redis的客户端Jedis jedis = null;// 用于存放从Redis查询的维度数据String dimJsonStr = null;// 用于封装返回结果JSONObject dimJsonObj = null;//  先从缓存中查询维度数据try {jedis = JedisUtil.getJedis();dimJsonStr = jedis.get(redisKey);if (StringUtils.isNotEmpty(dimJsonStr)) {// 如果在缓存中能够找到要查询的维度dimJsonObj = JSON.parseObject(dimJsonStr);} else {// 如果在缓存中,没有找到要查询的维度数据if ("id".equals(keyName)) {dimJsonObj = HbaseUtil.getRowByPrimaryKey(namespace, tableName, nameAndValue);} else {dimJsonObj = HbaseUtil.getRowByForeignKey(namespace, tableName, nameAndValue);}if (dimJsonObj != null && jedis != null) {jedis.setex(redisKey, 3600 * 24, dimJsonObj.toJSONString());}}} catch (Exception e) {log.error("从Redis中查询维度数据发生了一场", e);} finally {if (jedis != null) {System.out.println("关闭客户端");jedis.close();}}return dimJsonObj;}// 从Redis中删除缓存的维度数据public static void delCached(String tableName, Tuple2<String, String> keyNameAndValue) {String keyName = keyNameAndValue.f0;String keyValue = keyNameAndValue.f1;String redisKey = "dim:" + tableName.toLowerCase() + ":" + keyName + "_" + keyValue;Jedis jedis = null;try {jedis = JedisUtil.getJedis();jedis.decr(redisKey);}catch (Exception e){log.error("清除Redis中缓存的维度数据时发生了异常", e);}finally {if (jedis != null) {jedis.close();}}}
}

5.JedisUtil

用于连接reids的jedis客户端。
先在pom中引入依赖

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version>
</dependency>

在这里插入图片描述

package com.atguigu.tms.realtime.utils;import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class JedisUtil {private static JedisPool jedisPool;static {JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(1000);poolConfig.setMaxIdle(5);poolConfig.setMinIdle(5);poolConfig.setBlockWhenExhausted(true);poolConfig.setMaxWaitMillis(2000L);poolConfig.setTestOnBorrow(true);jedisPool=new JedisPool(poolConfig,"hadoop102",6379,10000);}public static Jedis getJedis(){System.out.println("创建Jedis客户端");Jedis jedis = jedisPool.getResource();return jedis;}public static void main(String[] args) {Jedis jedis = getJedis();String pong = jedis.ping();System.out.println(pong);}
}

6. HbaseUtil

之前我们在HbaseUtil完成了创建表和插入操作,现在来完成查询操作。

package com.atguigu.tms.realtime.utils;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class HbaseUtil {private static Connection conn;static {try {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", TmsConfig.hbase_zookeeper_quorum);conn = ConnectionFactory.createConnection(conf);} catch (IOException e) {throw new RuntimeException(e);}}// 创建表public static void createTable(String nameSpace, String tableName, String... families) {Admin admin = null;try {if (families.length < 1) {System.out.println("至少需要一个列族");return;}admin = conn.getAdmin();// 判断表是否存在if (admin.tableExists(TableName.valueOf(nameSpace, tableName))) {System.out.println(nameSpace + ":" + tableName + "已存在");return;}TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));// 指定列族for (String family : families) {ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();builder.setColumnFamily(familyDescriptor);}admin.createTable(builder.build());} catch (IOException e) {throw new RuntimeException(e);} finally {if (admin != null) {try {admin.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 向hbase插入对象public static void putPow(String namespace, String tableName, Put put) {BufferedMutator mutator = null;try {BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(namespace, tableName));params.writeBufferSize(5 * 1024 * 1024);params.setWriteBufferPeriodicFlushTimeoutMs(3000L);mutator = conn.getBufferedMutator(params);mutator.mutate(put);} catch (IOException e) {throw new RuntimeException(e);} finally {if (mutator != null) {try {mutator.close();} catch (IOException e) {throw new RuntimeException(e);}}}}// 根据主键从Hbase表中查询一行数据public static JSONObject getRowByPrimaryKey(String namespace, String tableName, Tuple2<String, String> rowKeyNameAndKey) {Table table = null;JSONObject dimJsonObj = null;String rowKeyName = rowKeyNameAndKey.f0;String rowKeyValue = rowKeyNameAndKey.f1;try {table = conn.getTable(TableName.valueOf(namespace, tableName));Result result = table.get(new Get(Bytes.toBytes(rowKeyValue)));Cell[] cells = result.rawCells();if (cells.length > 0) {dimJsonObj = new JSONObject();dimJsonObj.put(rowKeyName, rowKeyValue);for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println("从Hbase表中没有找到对应的维度数据");}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}//根据外键从hbase表中查询一行数据public static JSONObject getRowByForeignKey(String namespace, String tableName, Tuple2<String, String> foreignKeyNameAndKey) {Table table = null;JSONObject dimJsonObj = null;try {table = conn.getTable(TableName.valueOf(namespace, tableName));Scan scan = new Scan();String foreignKeyName = foreignKeyNameAndKey.f0;String foreignKeyValue = foreignKeyNameAndKey.f1;SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes(foreignKeyName), CompareOperator.EQUAL,Bytes.toBytes(foreignKeyValue));singleColumnValueFilter.setFilterIfMissing(true);scan.setFilter(singleColumnValueFilter);ResultScanner scanner = table.getScanner(scan);Result result = scanner.next();if (result!=null){Cell[] cells = result.rawCells();if (cells.length > 0) {dimJsonObj = new JSONObject();dimJsonObj.put("id", Bytes.toString(result.getRow()));for (Cell cell : cells) {dimJsonObj.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));}} else {System.out.println("从Hbase表中没有找到对应的维度数据");}}} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}return dimJsonObj;}
}

7. DwsBoundOrgSortDayBean

自定义的工具类,其中包含我们要写入ck的字段

package com.atguigu.tms.realtime.beans;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class DwsBoundOrgSortDayBean {// 统计日期String curDate;// 机构 IDString orgId;// 机构名称String orgName;// 用于关联获取省份信息的机构 ID@TransientSinkString joinOrgId;// 城市 IDString cityId;// 城市名称String cityName;// 省份 IDString provinceId;// 省份名称String provinceName;// 分拣次数Long sortCountBase;// 时间戳Long ts;
}

8.补充维度字段

我代码编写我们需要维度表的外键字段,所以我们重新修改mysql维度表,添加外键字段。

DROP TABLE IF EXISTS `tms_config_dim`;
CREATE TABLE `tms_config_dim` (`source_table` varchar(200) NOT NULL COMMENT '数据源表',`sink_table` varchar(200) DEFAULT NULL COMMENT '目标表',`sink_family` varchar(200) DEFAULT NULL COMMENT '目标表列族',`sink_columns` varchar(200) DEFAULT NULL COMMENT '目标表列',`sink_pk` varchar(256) DEFAULT NULL COMMENT '主键字段',`foreign_keys` varchar(256) DEFAULT NULL COMMENT '外键查询字段',PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='物流实时配置表';

然后从新导入数据。
在这里插入图片描述
在这里插入图片描述

然后我们使用dimapp同步一下数据即可,具体方法可看Dim层搭建。

9. MyBroadcastProcessFunction

我们之前在DIM层的搭建中,在MyBroadcastProcessFunction的processElement函数中过滤掉了外键,但现在需要他,我们把它加上。
在传递前添加一段代码

// 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)
String op = jsonObj.getString("op");
if ("u".equals(op)) {afterJsonObj.put("op", op);// 从配置表中获取当前维度表关联的外键名String foreignKeys = tmsConfigDimBean.getForeignKeys();// 定义个json对象,用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj = new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr = foreignKeys.split(",");for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before = jsonObj.getJSONObject("before");String foreignKeyBefore = before.getString(foreignName);String foreignKeyAfter = afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put("foreign_key", foreignjsonObj);
}

完成代码

package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.TmsConfigDimBean;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;// 自定义类 完成主流和广播流的处理
public class MyBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor;private Map<String, TmsConfigDimBean> configMap = new HashMap<>();private String username;private String password;public MyBroadcastProcessFunction(MapStateDescriptor<String, TmsConfigDimBean> mapStateDescriptor, String[] args) {this.mapStateDescriptor = mapStateDescriptor;ParameterTool parameterTool = ParameterTool.fromArgs(args);this.username = parameterTool.get("mysql-username", "root");this.password = parameterTool.get("mysql-password", "000000");}@Overridepublic void open(Configuration parameters) throws Exception {// 将配置表中的数据进行预加载-JDBCClass.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://hadoop102:3306/tms_config?useSSL=false&useUnicode=true" +"&user=" + username + "&password=" + password +"&charset=utf8&TimeZone=Asia/Shanghai";Connection conn = DriverManager.getConnection(url);PreparedStatement ps = conn.prepareStatement("select * from tms_config.tms_config_dim");ResultSet rs = ps.executeQuery();ResultSetMetaData metaData = rs.getMetaData();while (rs.next()) {JSONObject jsonObj = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);Object columValue = rs.getObject(i);jsonObj.put(columnName, columValue);}TmsConfigDimBean tmsConfigDimBean = jsonObj.toJavaObject(TmsConfigDimBean.class);configMap.put(tmsConfigDimBean.getSourceTable(), tmsConfigDimBean);}rs.close();ps.close();conn.close();super.open(parameters);}@Overridepublic void processElement(JSONObject jsonObj, BroadcastProcessFunction<JSONObject, String, JSONObject>.ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {// 获取操作的业务数据库的表名String table = jsonObj.getString("table");// 获取广播状态ReadOnlyBroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 根据操作的业务数据库的表名 到广播状态中获取对应的配置信息TmsConfigDimBean tmsConfigDimBean;if ((tmsConfigDimBean = broadcastState.get(table)) != null || (tmsConfigDimBean = configMap.get(table)) != null) {// 如果对应的配置信息不为空 是维度信息// 获取after对象,对应的是影响的业务数据表中的一条记录JSONObject afterJsonObj = jsonObj.getJSONObject("after");// 数据脱敏switch (table) {// 员工表信息脱敏case "employee_info":String empPassword = afterJsonObj.getString("password");String empRealName = afterJsonObj.getString("real_name");String idCard = afterJsonObj.getString("id_card");String phone = afterJsonObj.getString("phone");// 脱敏empPassword = DigestUtils.md5Hex(empPassword);empRealName = empRealName.charAt(0) +empRealName.substring(1).replaceAll(".", "\\*");//知道有这个操作  idCard是随机生成的,和标准的格式不一样 所以这里注释掉// idCard = idCard.matches("(^[1-9]\\d{5}(18|19|([23]\\d))\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{2}$)")//     ? DigestUtils.md5Hex(idCard) : null;phone = phone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(phone) : null;afterJsonObj.put("password", empPassword);afterJsonObj.put("real_name", empRealName);afterJsonObj.put("id_card", idCard);afterJsonObj.put("phone", phone);break;// 快递员信息脱敏case "express_courier":String workingPhone = afterJsonObj.getString("working_phone");workingPhone = workingPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(workingPhone) : null;afterJsonObj.put("working_phone", workingPhone);break;// 卡车司机信息脱敏case "truck_driver":String licenseNo = afterJsonObj.getString("license_no");licenseNo = DigestUtils.md5Hex(licenseNo);afterJsonObj.put("license_no", licenseNo);break;// 卡车信息脱敏case "truck_info":String truckNo = afterJsonObj.getString("truck_no");String deviceGpsId = afterJsonObj.getString("device_gps_id");String engineNo = afterJsonObj.getString("engine_no");truckNo = DigestUtils.md5Hex(truckNo);deviceGpsId = DigestUtils.md5Hex(deviceGpsId);engineNo = DigestUtils.md5Hex(engineNo);afterJsonObj.put("truck_no", truckNo);afterJsonObj.put("device_gps_id", deviceGpsId);afterJsonObj.put("engine_no", engineNo);break;// 卡车型号信息脱敏case "truck_model":String modelNo = afterJsonObj.getString("model_no");modelNo = DigestUtils.md5Hex(modelNo);afterJsonObj.put("model_no", modelNo);break;// 用户地址信息脱敏case "user_address":String addressPhone = afterJsonObj.getString("phone");addressPhone = addressPhone.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(addressPhone) : null;afterJsonObj.put("phone", addressPhone);break;// 用户信息脱敏case "user_info":String passwd = afterJsonObj.getString("passwd");String realName = afterJsonObj.getString("real_name");String phoneNum = afterJsonObj.getString("phone_num");String email = afterJsonObj.getString("email");// 脱敏passwd = DigestUtils.md5Hex(passwd);if (StringUtils.isNotEmpty(realName)) {realName = DigestUtils.md5Hex(realName);afterJsonObj.put("real_name", realName);}phoneNum = phoneNum.matches("^(13[0-9]|14[01456879]|15[0-35-9]|16[2567]|17[0-8]|18[0-9]|19[0-35-9])\\d{8}$")? DigestUtils.md5Hex(phoneNum) : null;email = email.matches("^[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$")? DigestUtils.md5Hex(email) : null;afterJsonObj.put("birthday", DateFormatUtil.toDate(afterJsonObj.getInteger("birthday") * 24 * 60 * 60 * 1000L));afterJsonObj.put("passwd", passwd);afterJsonObj.put("phone_num", phoneNum);afterJsonObj.put("email", email);break;}// 过滤不需要的维度属性String sinkColumns = tmsConfigDimBean.getSinkColumns();filterColum(afterJsonObj, sinkColumns);// 补充输出目的的表名String sinkTable = tmsConfigDimBean.getSinkTable();afterJsonObj.put("sink_table", sinkTable);// 补充rowKeyString sinkPk = tmsConfigDimBean.getSinkPk();afterJsonObj.put("sink_pk", sinkPk);// 清除Redis缓存的准备工作(传递操作类型、外键字段的k-v)String op = jsonObj.getString("op");if ("u".equals(op)) {afterJsonObj.put("op", op);// 从配置表中获取当前维度表关联的外键名String foreignKeys = tmsConfigDimBean.getForeignKeys();// 定义个json对象,用于存储当前维度表对应的外键名以及值JSONObject foreignjsonObj = new JSONObject();if (StringUtils.isNotEmpty(foreignKeys)) {String[] foreignNameArr = foreignKeys.split(",");for (String foreignName : foreignNameArr) {// 获取修改前的数据JSONObject before = jsonObj.getJSONObject("before");String foreignKeyBefore = before.getString(foreignName);String foreignKeyAfter = afterJsonObj.getString(foreignName);if (!foreignKeyBefore.equals(foreignKeyAfter)) {// 如果修改的是外键foreignjsonObj.put(foreignName, foreignKeyBefore);}else {foreignjsonObj.put(foreignName, foreignKeyBefore);}}}afterJsonObj.put("foreign_key", foreignjsonObj);}// 将维度数据传递out.collect(afterJsonObj);}}private void filterColum(JSONObject afterJsonObj, String sinkColumns) {String[] fieldArr = sinkColumns.split(",");List<String> fieldList = Arrays.asList(fieldArr);Set<Map.Entry<String, Object>> entrySet = afterJsonObj.entrySet();entrySet.removeIf(entry -> !fieldList.contains(entry.getKey()));}@Overridepublic void processBroadcastElement(String jsonStr, BroadcastProcessFunction<JSONObject, String, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {JSONObject jsonObj = JSON.parseObject(jsonStr);// 获取广播状态BroadcastState<String, TmsConfigDimBean> broadcastState = ctx.getBroadcastState(mapStateDescriptor);// 获取对配置表的操作类型String op = jsonObj.getString("op");if ("d".equals(op)) {String sourceTable = jsonObj.getJSONObject("before").getString("source_table");broadcastState.remove(sourceTable);configMap.remove(sourceTable);} else {TmsConfigDimBean configDimBean = jsonObj.getObject("after", TmsConfigDimBean.class);String sourceTable = configDimBean.getSourceTable();broadcastState.put(sourceTable, configDimBean);configMap.put(sourceTable, configDimBean);}}
}

10. DimSinkFunction

添加清除代码

// 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉if ("u".equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));// 删除当前维度数据在Redis中对应外键的缓存Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();for (Map.Entry<String, Object> entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}

完整代码

package com.atguigu.tms.realtime.app.func;import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.commom.TmsConfig;
import com.atguigu.tms.realtime.utils.DimUtil;
import com.atguigu.tms.realtime.utils.HbaseUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.client.Put;import java.util.Map;
import java.util.Set;public class DimSinkFunction implements SinkFunction<JSONObject> {public void invoke(JSONObject jsonObj, Context context) throws Exception {// 获取输出目的地表名和rowKeyString sinkTable = jsonObj.getString("sink_table");String sinkPk = jsonObj.getString("sink_pk");jsonObj.remove("sink_table");jsonObj.remove("sink_pk");String op = jsonObj.getString("op");jsonObj.remove("op");JSONObject foreignKeyJsonObj = jsonObj.getJSONObject("foreign_key");jsonObj.remove("foreign_key");// 获取json中的每一个键值对Set<Map.Entry<String, Object>> entrySet = jsonObj.entrySet();Put put = new Put(jsonObj.getString(sinkPk).getBytes());for (Map.Entry<String, Object> entry : entrySet) {if (!sinkPk.equals(entry.getKey())) {put.addColumn("info".getBytes(), entry.getKey().getBytes(), entry.getValue().toString().getBytes());}}System.out.println("向hbase表中插入数据");HbaseUtil.putPow(TmsConfig.HBASE_NAMESPACE, sinkTable, put);// 如果维度数据发生了变化,将Redis中缓存的维度数据清空掉if ("u".equals(op)) {// 删除当前维度数据在Redis中对应主键的缓存DimUtil.delCached(sinkTable, Tuple2.of("id", jsonObj.getString("id")));// 删除当前维度数据在Redis中对应外键的缓存Set<Map.Entry<String, Object>> set = foreignKeyJsonObj.entrySet();for (Map.Entry<String, Object> entry : set) {DimUtil.delCached(sinkTable, Tuple2.of(entry.getKey(), entry.getValue().toString()));}}}
}

5.写入CK

1. ClickHouseUtil

先导入需要的依赖。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.2</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId></exclusion></exclusions>
</dependency>

在这里插入图片描述
ClickHouseUtil

package com.atguigu.tms.realtime.utils;import com.atguigu.tms.realtime.beans.TransientSink;
import com.atguigu.tms.realtime.commom.TmsConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;public class ClickHouseUtil {// 获取SinkFunctionpublic static <T> SinkFunction<T> getJdbcSink(String sql) {SinkFunction<T> sinkFunction = JdbcSink.<T>sink(sql,new JdbcStatementBuilder<T>() {@Overridepublic void accept(PreparedStatement ps, T obj) throws SQLException {// 将流中对象的属性给问号占位符赋值// 获取单签流中对象岁数的类型 以及类中的属性Field[] fieldsArr = obj.getClass().getDeclaredFields();// 遍历所有属性int skipNum = 0;for (int i = 0; i < fieldsArr.length; i++) {Field field = fieldsArr[i];// 判断当前属性是否需要向流中保存TransientSink transientSink = field.getAnnotation(TransientSink.class);if (transientSink != null) {skipNum++;continue;}// 设置私有属性的访问权限field.setAccessible(true);try {Object fieldValue = field.get(obj);ps.setObject(i + 1 - skipNum, fieldValue);} catch (IllegalAccessException e) {throw new RuntimeException(e);}}}},new JdbcExecutionOptions.Builder().withBatchSize(5000).withBatchIntervalMs(3000L).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName(TmsConfig.CLICKHOUSE_DRIVER).withUrl(TmsConfig.CLICKHOUSE_URL).build());return sinkFunction;}
}

2.TransientSink

package com.atguigu.tms.realtime.beans;// 自定义主键 用于标记不需要向ck中保存的属性import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}

二、代码测试

1.程序启动

根据代码逻辑,我们需要启动以下程序。
hdfs、zk、kafka、hbase、redise、ck、OdsApp、DwdBoundRelevantApp、DimApp和DwsBoundOrgSortDay。其中DimApp只需启动一次完成维度数据更新即可。

2.修改kafka分区

再从kafka读取数据时,应该保证kafka有4个分区,不然聚合无法成功。

kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_bound_sort --partitions 4

3.ck建表

1.建库

DROP DATABASE IF EXISTS tms_realtime;
CREATE DATABASE IF NOT EXISTS tms_realtime;
USE tms_realtime;

2.建表

DROP TABLE IF EXISTS dws_bound_org_sort_day_base;
CREATE TABLE IF NOT EXISTS dws_bound_org_sort_day_base
(`cur_date` Date COMMENT '统计日期',`org_id` String COMMENT '机构ID',`org_name` String COMMENT '机构名称',`city_id` String COMMENT '城市ID',`city_name` String COMMENT '城市名称',`province_id` String COMMENT '省份ID',`province_name` String COMMENT '省份名称',`sort_count_base` UInt64 COMMENT '分拣次数',`ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);

3.物化视图

DROP VIEW IF EXISTS dws_bound_org_sort_day;
CREATE MATERIALIZED VIEW IF NOT EXISTS dws_bound_org_sort_day
(`cur_date` Date, `org_id` String, `org_name` String, `city_id` String, `city_name` String, `province_id` String, `province_name` String, `sort_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT cur_date, org_id, org_name, city_id, city_name, province_id, province_name, argMaxState(sort_count_base, ts) AS sort_count
FROM dws_bound_org_sort_day_base
GROUP BY cur_date, org_id, org_name, city_id, city_name, province_id, province_name;

4.查看结果

当运行程序后,开始生成数据,等待执行完成之后,可以在ck中使用如下代码查看。

clickhouse-client -m

-m 参数代表可以使用回车。

SELECTcur_date,org_id,org_name,city_id,city_name,province_id,province_name,argMaxMerge(sort_count) AS sort_count
FROM dws_bound_org_sort_day
GROUP BYcur_date,org_id,org_name,city_id,city_name,province_id,province_name
LIMIT 10;

在这里插入图片描述


总结

至此,Dws的部分搭建就结束了,为了方便进行文件管理,我把项目开源到了github上。
项目地址:https://github.com/lcc-666/tms-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/230562.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

22款奔驰GLE450升级香氛负离子 车载香薰

相信大家都知道&#xff0c;奔驰自从研发出香氛负离子系统后&#xff0c;一直都受广大奔驰车主的追捧&#xff0c;香氛负离子不仅可以散发出清香淡雅的香气外&#xff0c;还可以对车内的空气进行过滤&#xff0c;使车内的有害气味通过负离子进行过滤&#xff0c;达到车内保持清…

神经网络:经典模型热门模型

在这里插入代码片【一】目标检测中IOU的相关概念与计算 IoU&#xff08;Intersection over Union&#xff09;即交并比&#xff0c;是目标检测任务中一个重要的模块&#xff0c;其是GT bbox与pred bbox交集的面积 / 二者并集的面积。 下面我们用坐标&#xff08;top&#xff0…

Oracle导出CSV文件

利用spool spool基本格式&#xff1a; spool 路径文件名 select col1||,||col2||,||col3||,||col4 from tablename; spool off spool常用的设置&#xff1a; set colsep ;    //域输出分隔符 set echo off;    //显示start启动的脚本中的每个sql命令&#xff0c;缺…

ROS学习记录:在ROS中用C++实现激光雷达避障

前言 本文建立在成功获取激光雷达数据的基础上&#xff0c;详细参考 在ROS中用C实现获取激光雷达的数据 一、实现思路 二、在VScode中打开之前编写好的lidar_node.cpp 三、在lidar_node.cpp中写入如下代码 #include <ros/ros.h> #include <std_msgs/String.h> …

k8s---pod的生命周期

pod的相关知识 pod是k8s中最小的资源管理组件 pod也是最小化运行容器化的应用的资源管理对象 pod是一个抽象的概念&#xff0c;可以理解为一个或者多个容器化应用的集合。 k8s中pod的两种使用方式 &#xff08;1&#xff09;一个pod中运行一个容器。"每个po中一个容器&…

知虾会员**成为知虾会员,尊享专属权益**

在当今繁忙的生活中&#xff0c;线上购物已经成为现代人们的主要消费方式之一。而作为线上购物平台的领军者之一&#xff0c;Shopee为了提供更加个性化和便利的购物体验&#xff0c;推出了知虾会员&#xff08;Shopee会员&#xff09;服务。知虾会员不仅可以享受到一系列会员专…

国产化软硬件升级之路:πDataCS 赋能工业软件创新与实践

在国产化浪潮的推动下&#xff0c;基础设施软硬件替换和升级的需求日益增长。全栈国产化软硬件升级替换已成为许多领域中的必选项&#xff0c;也引起了数据库和存储领域的广泛关注。近年来&#xff0c;虽然涌现了许多成功的替换案例&#xff0c;但仍然面临着一些问题。 数据库…

某音关键词搜索商品接口,某音关键词搜索商品列表接口,宝贝详情页接口,某音商品比价接口接入方案

要接入API接口以采集电商平台上的商品数据&#xff0c;可以按照以下步骤进行&#xff1a; 1、找到可用的API接口&#xff1a;首先&#xff0c;需要找到支持查询商品信息的API接口。这些信息通常可以在电商平台的官方文档或开发者门户网站上找到。 2、注册并获取API密钥&#x…

界面控件DevExpress Blazor Grid v23.2 - 支持全新的单元格编辑模式

DevExpress Blazor UI组件使用了C#为Blazor Server和Blazor WebAssembly创建高影响力的用户体验&#xff0c;这个UI自建库提供了一套全面的原生Blazor UI组件&#xff08;包括Pivot Grid、调度程序、图表、数据编辑器和报表等&#xff09;。 在这篇文章中&#xff0c;我们将介…

Golang 通用代码生成器仙童已发布 2.4.0 电音仙女尝鲜版三及其介绍视频,详细介绍了 Oracle 代码生成

Golang 通用代码生成器仙童已发布 2.4.0 电音仙女尝鲜版三及其介绍视频&#xff0c;详细介绍了 Oracle 代码生成 Golang 通用代码生成器仙童已发布 2.4.0 电音仙女尝鲜版三及其介绍视频。详细介绍了 Oracle 代码生成。即生成后端数据库为 Oracle 的 golang web 代码。并同时生…

解决SyntaxError: future feature annotations is not defined,可适用其他包

方法&#xff1a;对报错的包进行降级 pip install tikzplotlib0.9.8site-packages后面是使用pip install安装的包&#xff0c;根据这个找到报错的包 想法来源&#xff1a; 环境是python3.6&#xff0c;完全按照作者要求进行环境配置&#xff0c;但仍报错。 我在网上找的解决…

使用Redis进行搜索

文章目录 构建反向索引 构建反向索引 在Begin-End区域编写 tokenize(content) 函数&#xff0c;实现文本标记化的功能&#xff0c;具体参数与要求如下&#xff1a; 方法参数 content 为待标记化的文本&#xff1b; 文本标记的实现&#xff1a;使用正则表达式提取全小写化后的…

阿里云服务器Valheim端口2456、2457和2458放行设置

使用阿里云服务器搭建Valheim英灵神殿需要开启2456-2458端口&#xff0c;阿里云服务器默认只开放了22核3389端口&#xff0c;开通2456端口是在安全组中配置的&#xff0c;阿里云服务器网aliyunfuwuqi.com来详细说下阿里云服务器安全组开通端口流程&#xff1a; 阿里云服务器安…

文件管理方法:利用文件大小进行筛选,高效移动文件至目标文件夹

在日常工作中&#xff0c;文件管理是一项至关重要的任务。为了更高效地管理文件&#xff0c;可以利用文件大小进行筛选&#xff0c;并将文件快速移动至目标文件夹。接下来一起来看看云炫文件管理器如何利用文件大小进行筛选&#xff0c;以及如何高效移动文件至目标文件夹的方法…

程序媛的mac修炼手册--MacOS系统更新升级史

啊&#xff0c;我这个口罩三年从未感染过新冠的天选免疫王&#xff0c;却被支原体击倒&#x1f637;大意了&#xff0c;前几天去医院体检&#xff0c;刚检查完出医院就摘口罩了&#x1f926;大伙儿还是要注意戴口罩&#xff0c;保重身体啊&#xff01;身体欠恙&#xff0c;就闲…

论文查重降重写成大白话可以吗

大家好&#xff0c;今天来聊聊论文查重降重写成大白话可以吗&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff0c;可以借助此类工具&#xff1a; 论文查重降重&#xff1a;用大白话解析 一、引言 写论文是每个…

网络安全—SSL安全访问应用

文章目录 网络拓扑部署CA服务器颁发证书开启Web服务安装IIS服务修改Web默认网页 申请Web证书前提准备申请文件生成申请web证书开始安装web证书 客户机访问web默认网站使用HTTP使用HTTPS 为客户机安装浏览器证书 环境&#xff1a;Windows Server 2003 网络拓扑 这里使用NAT还是…

二叉树题目:删点成林

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;删点成林 出处&#xff1a;1110. 删点成林 难度 6 级 题目描述 要求 给定二叉树的根结点 root \texttt{root} root&#xff0c;树中每个结点的值…

阿里云ECS服务器无法访问端口(防火墙在关闭状态也启作用)

问题&#xff1a;一直用得好好的端口&#xff0c;突然在某一时间不可以访问这个端口了 &#xff0c;在服务器录入外网地址访问如下图&#xff1a; 先按正常流程检测&#xff1a; 1 先云服务商的管理网站查看防火墙端口是否开放 看了正常开放了端口&#xff0c;如下图&#xff…

Apollo自动驾驶系统:实现城市可持续交通的迈向

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 ChatGPT体验地址 文章目录 前言引言&#xff1a;1. 什么是微服务架构&#xff1f;2. 微服务架构的组成要素3. 微服务架构的挑战和解决方案4. 微服务架构的可扩展性和弹性 第二部分&#x…