系列文章目录
物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
文章目录
- 系列文章目录
- 前言
- 一、文件编写
- 1.目录创建
- 2.bean文件
- 1.DwdOrderDetailOriginBean
- 2.DwdOrderInfoOriginBean
- 3.DwdTradeCancelDetailBean
- 4.DwdTradeOrderDetailBean
- 5.DwdTradePaySucDetailBean
- 6.DwdTransBoundFinishDetailBean
- 7.DwdTransDeliverSucDetailBean
- 8.DwdTransDispatchDetailBean
- 9.DwdTransReceiveDetailBean
- 10.DwdTransSignDetailBean
- 3.DwdOrderRelevantApp
- 二、代码测试
- 1.环境启动
- 2.kafka消费者
- 3.修改配置
- 4.测试结果
- 总结
前言
这次博客我们进行DWD层的搭建,内容比较多,一次可能写不完。
以上就是本次博客需要完成的内容,简单来说就是,从kafka读取数据,然后根据不同的关键字,将其从主流中进行分离,然后在写入各自的kafka中以便后续的操作
一、文件编写
1.目录创建
我们现在beans中创建后边需要的的bean
然后在dwd目录中创建此次需要的app
2.bean文件
1.DwdOrderDetailOriginBean
package com.atguigu.tms.realtime.beans;import lombok.Data;import java.math.BigDecimal;/***订单货物明细实体类*/@Data
public class DwdOrderDetailOriginBean {// 编号(主键)String id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumnLength;// 宽cmInteger volumnWidth;// 高cmInteger volumnHeight;// 重量 kgBigDecimal weight;// 创建时间String createTime;// 更新时间String updateTime;// 是否删除String isDeleted;
}
2.DwdOrderInfoOriginBean
package com.atguigu.tms.realtime.beans;import lombok.Data;import java.math.BigDecimal;/*** 订单实体类*/
@Data
public class DwdOrderInfoOriginBean {// 编号(主键)String id;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间Long estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 创建时间String createTime;// 更新时间String updateTime;// 是否删除String isDeleted;
}
3.DwdTradeCancelDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/*** 交易域:取消运单事务事实表实体类*/
@Data
public class DwdTradeCancelDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 取消时间String cancelTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.cancelTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}
4.DwdTradeOrderDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/***交易域:下单事务事实表实体类*/
@Data
public class DwdTradeOrderDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 下单时间String orderTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;this.orderTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(detailOriginBean.createTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(detailOriginBean.createTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;}
}
5.DwdTradePaySucDetailBean
package com.atguigu.tms.realtime.beans;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/***交易域:支付成功事务事实表实体类*/
@Data
public class DwdTradePaySucDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 支付时间String payTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.payTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}
6.DwdTransBoundFinishDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;
/***物流域:转运完成事务事实表实体类*/
@Data
public class DwdTransBoundFinishDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 发单时间String boundFinishTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.boundFinishTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}
7.DwdTransDeliverSucDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;
/***物流域:派送成功事务事实表实体类*/
@Data
public class DwdTransDeliverSucDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 派送成功时间String deliverTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.deliverTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}
8.DwdTransDispatchDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;import lombok.Data;import java.math.BigDecimal;/***物流域:发单事务事实表实体类*/
@Data
public class DwdTransDispatchDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 发单时间String dispatchTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.dispatchTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}
9.DwdTransReceiveDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/***物流域:揽收(接单)事务事实表实体类*/
@Data
public class DwdTransReceiveDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 揽收时间String receiveTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.receiveTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}
10.DwdTransSignDetailBean
package com.atguigu.tms.realtime.beans;import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;import java.math.BigDecimal;/*** 物流域:签收事务事实表实体类*/
@Data
public class DwdTransSignDetailBean {// 运单明细IDString id;// 运单idString orderId;// 货物类型String cargoType;// 长cmInteger volumeLength;// 宽cmInteger volumeWidth;// 高cmInteger volumeHeight;// 重量 kgBigDecimal weight;// 签收时间String signTime;// 运单号String orderNo;// 运单状态String status;// 取件类型,1为网点自寄,2为上门取件String collectType;// 客户idString userId;// 收件人小区idString receiverComplexId;// 收件人省份idString receiverProvinceId;// 收件人城市idString receiverCityId;// 收件人区县idString receiverDistrictId;// 收件人姓名String receiverName;// 发件人小区idString senderComplexId;// 发件人省份idString senderProvinceId;// 发件人城市idString senderCityId;// 发件人区县idString senderDistrictId;// 发件人姓名String senderName;// 支付方式String paymentType;// 货物个数Integer cargoNum;// 金额BigDecimal amount;// 预计到达时间String estimateArriveTime;// 距离,单位:公里BigDecimal distance;// 时间戳Long ts;public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {// 合并原始明细字段this.id = detailOriginBean.id;this.orderId = detailOriginBean.orderId;this.cargoType = detailOriginBean.cargoType;this.volumeLength = detailOriginBean.volumnLength;this.volumeWidth = detailOriginBean.volumnWidth;this.volumeHeight = detailOriginBean.volumnHeight;this.weight = detailOriginBean.weight;// 合并原始订单字段this.orderNo = infoOriginBean.orderNo;this.status = infoOriginBean.status;this.collectType = infoOriginBean.collectType;this.userId = infoOriginBean.userId;this.receiverComplexId = infoOriginBean.receiverComplexId;this.receiverProvinceId = infoOriginBean.receiverProvinceId;this.receiverCityId = infoOriginBean.receiverCityId;this.receiverDistrictId = infoOriginBean.receiverDistrictId;this.receiverName = infoOriginBean.receiverName;this.senderComplexId = infoOriginBean.senderComplexId;this.senderProvinceId = infoOriginBean.senderProvinceId;this.senderCityId = infoOriginBean.senderCityId;this.senderDistrictId = infoOriginBean.senderDistrictId;this.senderName = infoOriginBean.senderName;this.paymentType = infoOriginBean.paymentType;this.cargoNum = infoOriginBean.cargoNum;this.amount = infoOriginBean.amount;this.estimateArriveTime = DateFormatUtil.toYmdHms(infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);this.distance = infoOriginBean.distance;this.signTime =DateFormatUtil.toYmdHms(DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000);this.ts = DateFormatUtil.toTs(infoOriginBean.updateTime.replaceAll("T", " ").replaceAll("Z", ""), true)+ 8 * 60 * 60 * 1000;}
}
3.DwdOrderRelevantApp
package com.atguigu.tms.realtime.app.dwd;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.*;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class DwdOrderRelevantApp {public static void main(String[] args) throws Exception {// 1.环境准备StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2.从Kafka读数据String topic = "tms_ods";String groupId = "dwd_order_relevant_group";KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source").uid("kafka_source");// 3.筛选订单和订单明细数据SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter((FilterFunction<String>) jsonStr -> {JSONObject jsonObj = JSON.parseObject(jsonStr);String tableName = jsonObj.getJSONObject("source").getString("table");return "order_info".equals(tableName) || "order_cargo".equals(tableName);});
// filterDS.print(">>>");// 4.对流中的数据类型进行转换 jsonStr->jsonObjSingleOutputStreamOperator<JSONObject> jsonObjDS = filterDS.map((MapFunction<String, JSONObject>) jsonStr -> {JSONObject jsonObj = JSON.parseObject(jsonStr);String tableName = jsonObj.getJSONObject("source").getString("table");jsonObj.put("table", tableName);jsonObj.remove("source");jsonObj.remove("transaction");return jsonObj;});// jsonObjDS.print(">>>");// 5.按照order_id进行分组KeyedStream<JSONObject, String> keyDS = jsonObjDS.keyBy((KeySelector<JSONObject, String>) jsonObj -> {String table = jsonObj.getString("table");if ("order_info".equals(table)) {return jsonObj.getJSONObject("after").getString("id");}return jsonObj.getJSONObject("after").getString("order_id");});
// keyDS.print(">>>");// 6.定义侧输出流标签 下单放到主流,支付成功、取消运单、揽收(接单)、发单 转运完成、派送成功、签收放到侧输出流// 支付成功明细流标签OutputTag<String> paySucTag = new OutputTag<String>("dwd_trade_pay_suc_detail") {};// 取消运单明细流标签OutputTag<String> cancelDetailTag = new OutputTag<String>("dwd_trade_cancel_detail") {};// 揽收明细流标签OutputTag<String> receiveDetailTag = new OutputTag<String>("dwd_trans_receive_detail") {};// 发单明细流标签OutputTag<String> dispatchDetailTag = new OutputTag<String>("dwd_trans_dispatch_detail") {};// 转运完成明细流标签OutputTag<String> boundFinishDetailTag = new OutputTag<String>("dwd_trans_bound_finish_detail") {};// 派送成功明细流标签OutputTag<String> deliverSucDetailTag = new OutputTag<String>("dwd_trans_deliver_detail") {};// 签收明细流标签OutputTag<String> signDetailTag = new OutputTag<String>("dwd_trans_sign_detail") {};// 7.分流SingleOutputStreamOperator<String> orderDetailDS = keyDS.process(new KeyedProcessFunction<String, JSONObject, String>() {private ValueState<DwdOrderInfoOriginBean> infoBeanState;private ValueState<DwdOrderDetailOriginBean> detailBeanState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<DwdOrderInfoOriginBean> InfoOriginBeanStateDescriptor= new ValueStateDescriptor<>("infoBeanState", DwdOrderInfoOriginBean.class);InfoOriginBeanStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(5)).build());infoBeanState = getRuntimeContext().getState(InfoOriginBeanStateDescriptor);ValueStateDescriptor detailBeanStateDescriptor= new ValueStateDescriptor<>("detailBeanState", DwdOrderDetailOriginBean.class);detailBeanState = getRuntimeContext().getState(detailBeanStateDescriptor);}@Overridepublic void processElement(JSONObject jsonObj, KeyedProcessFunction<String, JSONObject, String>.Context ctx, Collector<String> out) throws Exception {String table = jsonObj.getString("table");String op = jsonObj.getString("op");JSONObject data = jsonObj.getJSONObject("after");if ("order_info".equals(table)) {//处理的是订单数据DwdOrderInfoOriginBean infoOriginBean = data.toJavaObject(DwdOrderInfoOriginBean.class);// 脱敏String senderName = infoOriginBean.getSenderName();String receiverName = infoOriginBean.getReceiverName();senderName = senderName.charAt(0) + senderName.substring(1).replaceAll(".", "\\*");receiverName = receiverName.charAt(0) + receiverName.substring(1).replaceAll(".", "\\*");infoOriginBean.setSenderName(senderName);infoOriginBean.setReceiverName(receiverName);DwdOrderDetailOriginBean detailOriginBean = detailBeanState.value();if ("c".equals(op)) {// 下单操作if (detailOriginBean == null) {// 订单数据 比明细数据先到,将订单数据放到状态中infoBeanState.update(infoOriginBean);} else {// 说明订单数据来之前,明细数据已经来到了,直接关联DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);// 将下单业务过程数据 放到主流中out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));}} else if ("u".equals(op) && detailOriginBean != null) {// 其它操作// 获取修改前的数据JSONObject oldData = jsonObj.getJSONObject("before");// 获取修改前的状态值String oldStatus = oldData.getString("status");String status = infoOriginBean.getStatus();if (!oldStatus.equals(status)) {// 说明修改的是status字段String changeLog = oldStatus + " -> " + status;switch (changeLog) {case "60010 -> 60020":// 处理支付成功数据DwdTradePaySucDetailBean dwdTradePaySucDetailBean = new DwdTradePaySucDetailBean();dwdTradePaySucDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(paySucTag, JSON.toJSONString(dwdTradePaySucDetailBean));break;case "60020 -> 60030":// 处理揽收明细数据DwdTransReceiveDetailBean dwdTransReceiveDetailBean = new DwdTransReceiveDetailBean();dwdTransReceiveDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(receiveDetailTag, JSON.toJSONString(dwdTransReceiveDetailBean));break;case "60040 -> 60050":// 处理发单明细数据DwdTransDispatchDetailBean dispatchDetailBean = new DwdTransDispatchDetailBean();dispatchDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(dispatchDetailTag, JSON.toJSONString(dispatchDetailBean));break;case "60050 -> 60060":// 处理转运完成明细数据DwdTransBoundFinishDetailBean boundFinishDetailBean = new DwdTransBoundFinishDetailBean();boundFinishDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(boundFinishDetailTag, JSON.toJSONString(boundFinishDetailBean));break;case "60060 -> 60070":// 处理派送成功数据DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean = new DwdTransDeliverSucDetailBean();dwdTransDeliverSucDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(deliverSucDetailTag, JSON.toJSONString(dwdTransDeliverSucDetailBean));break;case "60070 -> 60080":// 处理签收明细数据DwdTransSignDetailBean dwdTransSignDetailBean = new DwdTransSignDetailBean();dwdTransSignDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(signDetailTag, JSON.toJSONString(dwdTransSignDetailBean));// 签收后订单数据不会再发生变化,状态可以清除detailBeanState.clear();break;default:if (status.equals("60999")) {DwdTradeCancelDetailBean dwdTradeCancelDetailBean = new DwdTradeCancelDetailBean();dwdTradeCancelDetailBean.mergeBean(detailOriginBean, infoOriginBean);ctx.output(cancelDetailTag, JSON.toJSONString(dwdTradeCancelDetailBean));// 取消后订单数据不会再发生变化,状态可以清除detailBeanState.clear();}break;}}}} else {// 处理订单明细DwdOrderDetailOriginBean detailOriginBean = data.toJavaObject(DwdOrderDetailOriginBean.class);if ("c".equals(op)) {detailBeanState.update(detailOriginBean);// 获取状态中存放的订单数据 注意:只有下单操作,并且订单数据先到,明细数据后到的情况,才会从状态中拿到订单数据DwdOrderInfoOriginBean infoOriginBean = infoBeanState.value();if (infoOriginBean != null) {//属于下单业务过程DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);// 将下单业务过程数据 放到主流中out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));}}}}}).uid("process_data");// 8.从主流中提取侧输出流// 支付成功明细流//8.1 支付成功明细流SideOutputDataStream<String> paySucDS = orderDetailDS.getSideOutput(paySucTag);// 8.2 取消运单明细流SideOutputDataStream<String> cancelDetailDS = orderDetailDS.getSideOutput(cancelDetailTag);// 8.3 揽收明细流SideOutputDataStream<String> receiveDetailDS = orderDetailDS.getSideOutput(receiveDetailTag);// 8.4 发单明细流SideOutputDataStream<String> dispatchDetailDS = orderDetailDS.getSideOutput(dispatchDetailTag);// 8.5 转运成功明细流SideOutputDataStream<String> boundFinishDetailDS = orderDetailDS.getSideOutput(boundFinishDetailTag);// 8.6 派送成功明细流SideOutputDataStream<String> deliverSucDetailDS = orderDetailDS.getSideOutput(deliverSucDetailTag);// 8.7 签收明细流SideOutputDataStream<String> signDetailDS = orderDetailDS.getSideOutput(signDetailTag);// 9.将不同流的数据写到kafka的不同主题中// 9.1.1 交易域下单明细主题String detailTopic = "tms_dwd_trade_order_detail";// 9.1.2 交易域支付成功明细主题String paySucDetailTopic = "tms_dwd_trade_pay_suc_detail";// 9.1.3 交易域取消运单明细主题String cancelDetailTopic = "tms_dwd_trade_cancel_detail";// 9.1.4 物流域接单(揽收)明细主题String receiveDetailTopic = "tms_dwd_trans_receive_detail";// 9.1.5 物流域发单明细主题String dispatchDetailTopic = "tms_dwd_trans_dispatch_detail";// 9.1.6 物流域转运完成明细主题String boundFinishDetailTopic = "tms_dwd_trans_bound_finish_detail";// 9.1.7 物流域派送成功明细主题String deliverSucDetailTopic = "tms_dwd_trans_deliver_detail";// 9.1.8 物流域签收明细主题String signDetailTopic = "tms_dwd_trans_sign_detail";// 9.2 发送数据到 Kafka// 9.2.1 运单明细数据KafkaSink<String> kafkaProducer = KafkaUtil.getKafkaSink(detailTopic, args);orderDetailDS.print("~~");orderDetailDS.sinkTo(kafkaProducer).uid("order_detail_sink");// 9.2.2 支付成功明细数据KafkaSink<String> paySucKafkaProducer = KafkaUtil.getKafkaSink(paySucDetailTopic, args);paySucDS.print("!!");paySucDS.sinkTo(paySucKafkaProducer).uid("pay_suc_detail_sink");// 9.2.3 取消运单明细数据KafkaSink<String> cancelKafkaProducer = KafkaUtil.getKafkaSink(cancelDetailTopic, args);cancelDetailDS.print("@@");cancelDetailDS.sinkTo(cancelKafkaProducer).uid("cancel_detail_sink");// 9.2.4 揽收明细数据KafkaSink<String> receiveKafkaProducer = KafkaUtil.getKafkaSink(receiveDetailTopic, args);receiveDetailDS.print("##");receiveDetailDS.sinkTo(receiveKafkaProducer).uid("reveive_detail_sink");// 9.2.5 发单明细数据KafkaSink<String> dispatchKafkaProducer = KafkaUtil.getKafkaSink(dispatchDetailTopic, args);dispatchDetailDS.print("$$");dispatchDetailDS.sinkTo(dispatchKafkaProducer).uid("dispatch_detail_sink");// 9.2.6 转运完成明细主题KafkaSink<String> boundFinishKafkaProducer = KafkaUtil.getKafkaSink(boundFinishDetailTopic, args);boundFinishDetailDS.print("%%");boundFinishDetailDS.sinkTo(boundFinishKafkaProducer).uid("bound_finish_detail_sink");// 9.2.7 派送成功明细数据KafkaSink<String> deliverSucKafkaProducer = KafkaUtil.getKafkaSink(deliverSucDetailTopic, args);deliverSucDetailDS.print("^^");deliverSucDetailDS.sinkTo(deliverSucKafkaProducer).uid("deliver_suc_detail_sink");// 9.2.8 签收明细数据KafkaSink<String> signKafkaProducer = KafkaUtil.getKafkaSink(signDetailTopic, args);signDetailDS.print("&&");signDetailDS.sinkTo(signKafkaProducer).uid("sign_detail_sink");env.execute();}
}
二、代码测试
1.环境启动
hadoop,zk,kf全部启动
根据流程图可以看到,流程中没有使用到dim层的内容,所以我们不需要启动hbase。
2.kafka消费者
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail
一共需要查看8个消费者主题,你可以开8个窗口,也可以一个一个看,kafka如果没有消费者,会先将数据保存,等待消费,所以不需要8个主题同时消费。
3.修改配置
4.测试结果
先启动OdsApp和DwdOrderRelevantApp,然后生成模拟数据,之后查看kakfa消费者,有些数据可能要多生成几次才行。
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
这个主题是特殊情况,正常可能没有输出。
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail
总结
至此这篇博客的内容结束。