设计模式:责任链实现数据流风格的数据处理

数据流风格

数据流风格是软件架构中的一种风格,主要是面向数据,用于进行流式的数据处理;数据流风格的代表有管道-过滤器风格和批处理序列风格,这里主要是指管道-过滤器风格。

管道-过滤器风格就像其名字一样,是以一个个的组件连接,数据像水一样,顺序的流向到管道中,然后逐一被组件处理,最终达到目标形式。此种风格是比较适合数据治理或者进行简单的数据接入的。

场景引入

假设需要从一个topic中实时接入数据,其中的每条数据都有五个属性,分别是data_type,source_from,source_to,detail,op_time;

下面是数据处理流程的规则:

  • 如果data_type等于“unknow”,则该条数据丢弃,流程结束,否则,继续处理;
  • 判断source_from和source_to 是否均为空,如果是,则数据丢弃,结束流程,否则继续处理;
  • 如果source_from和source_to 均不为空,则判断op_time是否大于‘2023-01-01 08:09:00’,若大于,则存储到表B,流程结束;
  • 如果source_from不为空但source_to为空,则数据存储到表A,流程结束;

该场景是典型的基于规则,对数据进行处理与处置,换算为逻辑流程应该是:

Kafka 作为一个分布式流处理平台,广泛用于构建实时数据管道和流应用。它能够处理大量的数据流,具有高吞吐量、可持久化存储、容错性和扩展性等特性。这里我们以Kafka作为流数据的开始,也就是系统的输入。SpringBoot的应用也就是消费者的角色,去接入、处理kafka中的数据。

具体关于SpringBoot集成Kafka的基础,可以参考我之前的文章👇

关于SpringBoot集成Kafka-CSDN博客icon-default.png?t=O83Ahttps://blog.csdn.net/qq_40690073/article/details/143960276

我们直接基于SpringBoot和Kafka进行基本实现:

基于@KafkaListener注解进行消息监听,定义消息处理接口及其实现类,然后进行数据处理

定义数据类:

@Data
public class MyData {@JsonProperty("data_type")private String dataType;@JsonProperty("source_from")private String sourceFrom;@JsonProperty("source_to")private String sourceTo;@JsonProperty("detail")private String detail;@JsonProperty("op_time")private String opTime;
}

定义数据处理Service及其实现类:

public interface MessageDealService {void process(MyData data);
}@Service
public class MessageDealServiceImpl implements MessageDealService {@Autowiredprivate TableARepository tableARepository;@Autowiredprivate TableBRepository tableBRepository;@Overridepublic void process(MyData data) {if ("unknow".equals(data.getDataType())) {return; }if (data.getSourceFrom() == null && data.getSourceTo() == null) {return; }if (data.getSourceFrom() != null && data.getSourceTo() != null) {if (isAfter(data.getOpTime(), "2023-01-01 08:09:00")) {TableB tableB = new TableB();tableB.setDataType(data.getDataType());tableB.setSourceFrom(data.getSourceFrom());tableB.setSourceTo(data.getSourceTo());tableB.setDetail(data.getDetail());tableB.setOpTime(data.getOpTime());tableBRepository.save(tableB);}} else if (data.getSourceFrom() != null && data.getSourceTo() == null) {TableA tableA = new TableA();tableA.setDataType(data.getDataType());tableA.setSourceFrom(data.getSourceFrom());tableA.setDetail(data.getDetail());tableA.setOpTime(data.getOpTime());tableARepository.save(tableA);}}private boolean isAfter(String opTime, String threshold) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return sdf.parse(opTime).after(sdf.parse(threshold));}
}

kafka中进行监听并调用

@Component
public class KafkaConsumer {@Autowiredprivate MessageDeal messageDeal;@KafkaListener(topics = "your-topic", groupId = "group-id")public void consume(String message) {try {// 解析消息ObjectMapper objectMapper = new ObjectMapper();MyData data = objectMapper.readValue(message, MyData.class);// 调用消息处理器messageDeal.process(data);} catch (Exception e) {e.printStackTrace();}}
}

模式改造

责任链设计模式

责任链的设计源于数据结构中的链表,从模式的定义中就能看出,它需要一串走下去,而每一个处理请求的对象,都需要记录下一个处理请求的对象,即标准的数据链表方式。

职责链模式的实现主要包含以下角色。

  • 抽象处理者(Handler)角色:定义一个处理请求的接口,包含抽象处理方法和一个后继连接。
  • 具体处理者(Concrete Handler)角色:实现抽象处理者的处理方法,判断能否处理本次请求,如果可以处理请求则处理,否则将该请求转给它的后继者。
  • 客户类(Client)角色:创建处理链,并向链头的具体处理者对象提交请求,它不关心处理细节和请求的传递过程。

责任链模式的本质是解耦请求与处理,让请求在处理链中能进行传递与被处理;理解责任链模式应当理解其模式,而不是其具体实现。责任链模式的独到之处是将其节点处理者组合成了链式结构,并允许节点自身决定是否进行请求处理或跳跃,相当于让请求流动起来。

 UML类图如下:

管道过滤器风格与责任链的结合的思路

基于责任链的模式与数据流的概念图对比:

and

  

 我们可以得出,责任链中的具体处理者(Concrete Handler)角色恰好可以充当数据流风格中的过滤器,然后基于此,我们将繁琐的if else逻辑抽象到一个个的过滤器中,然后让这些过滤器链成数据处理链,让接入的数据走入到对应的数据处理链中即可。

SpringBoot中重新实现

定义数据处理组件框架

首先,基于责任链模式进行数据处理器框架的定义:

/**
* 定义数据处理器接口
**/
interface DataStreamProcessor {void setNext(DataStreamProcessor nextProcessor);void handle(Object data);
}/*** 定义数据处理器抽象类,完成基本的责任链注册机制* 以及预留业务扩展口**/
public abstract class AbstractDataStreamProcessor implements DataStreamProcessor {private DataStreamProcessor nextProcessor;@Overridepublic void setNext(DataStreamProcessor nextProcessor) {this.nextProcessor = nextProcessor;}@Overridepublic void handle(Object data) {AtomicBoolean flag = disposeData(data);if(flag.get() && null != nextProcessor){nextProcessor.handle(data);}}/*** 处理数据* @param data 数据* @return AtomicBoolean 如果返回为true,则代表继续向下处理,否则,则终止     */abstract AtomicBoolean disposeData(Object data);}

使用时,则根据要处理的逻辑,继承 AbstractDataStreamProcessor 类即可,我们以data_type判断为例:

public class DataTypeFilterProcessor extends AbstractDataStreamProcessor {private boolean flag;@Overridevoid disposeData(Object data) {Map<String, Object> record = (Map<String, Object>) data;if ("unknow".equals(record.get("data_type"))) {//结束处理,后续不做处理return new AtomicBoolean(false);}return new AtomicBoolean(false);}}

除DataTypeFilterProcessor外,还需要根据其他的逻辑,新建其他的处理器类A、B、C,才能完成一个完整的链式。

public class DataStreamProcessorTest {public static void main(String[] args) {// 创建处理器实例UnknownTypeProcessor unknownTypeProcessor = new UnknownTypeProcessor();XXXXProcessorA  emptySourceProcessor = new XXXXProcessorA();XXXXProcessorB  sourceToProcessor = new XXXXProcessorB  ();XXXXProcessorC  sourceFromProcessor = new XXXXProcessorC  ();// 构建责任链unknownTypeProcessor.setNext(emptySourceProcessor);emptySourceProcessor.setNext(sourceToProcessor);sourceToProcessor.setNext(sourceFromProcessor);// 测试数据MyData data1 = new MyData();data1.setDataType("unknow");data1.setSourceFrom("source1");data1.setSourceTo("source2");data1.setDetail("detail1");data1.setOpTime("2023-01-02 09:10:00");//处理流程 unknownTypeProcessor.handle(data1);
}

基于SpringBoot进行处理器自动化注册

如果单纯的使用main函数调用,则是根据逻辑流程图进行一个个的链式注入,这显然无法在SpringBoot中使用,如果想在SpringBoot中使用,我们需要解决两个问题:

  • 第一,要保证我们的处理器是Spring的Bean,受Spring的上下文管理,这样才可以自由的使用@Autowired等注解完美的进行其他Service的使用;
  • 第二,最好是摒弃手动逐一注入的情况,对于所处的数据流,最好在处理器类编写的时候就可以指定。

针对以上两点需求,解决方案如下:

  • 对新建的处理器类上使用@Compoent注解即可使其成为Spring上下文管理的Bean,且可以随意依赖Spring环境中其他的Bean
  • 进行自动注入需要两个参数,一个是这个处理器需要到哪个数据处理流中,另一个是在所处的数据流中的位置,基于这两个参数就可以实现自动注册,所有需要一个注解来额外标明这两个参数
定义注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DataStream {String dataStreamName();int order() default 0;
}
自动化注册
   private final Map<String, List<AbstractDataStreamProcessor>> dataStreamChains = new ConcurrentHashMap<>();@Autowiredpublic void setDataStreamProcessors(Map<String, AbstractDataStreamProcessor> processors) {processors.forEach((beanName, processor) -> {DataStream annotation = processor.getClass().getAnnotation(DataStream.class);if (annotation != null) {String dataStreamName = annotation.dataStreamName();int order = annotation.order();dataStreamChains.computeIfAbsent(dataStreamName, k -> new ArrayList<>()).add(processor);}});dataStreamChains.forEach((dataStreamName, processorsList) -> {Collections.sort(processorsList, (p1, p2) -> {DataStream a1 = p1.getClass().getAnnotation(DataStream.class);DataStream a2 = p2.getClass().getAnnotation(DataStream.class);return Integer.compare(a1.order(), a2.order());});// 构建责任链AbstractDataStreamProcessor current = null;for (AbstractDataStreamProcessor processor : processorsList) {if (current == null) {current = processor;} else {current.setNext(processor);current = processor;}}});}@Beanpublic BeanPostProcessor beanPostProcessor() {return new BeanPostProcessor() {@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof AbstractDataStreamProcessor) {Field field = ReflectionUtils.findField(bean.getClass(), "nextProcessor");if (field != null) {ReflectionUtils.makeAccessible(field);ReflectionUtils.setField(field, bean, getNextHandler((AbstractDataStreamProcessor) bean));}}return bean;}private AbstractDataStreamProcessor getNextHandler(AbstractDataStreamProcessor processor) {DataStream annotation = processor.getClass().getAnnotation(DataStream.class);if (annotation != null) {String dataStreamName = annotation.dataStreamName();List<AbstractDataStreamProcessor> processorsList = dataStreamChains.get(dataStreamName);if (processorsList != null) {int currentIndex = processorsList.indexOf(processor);if (currentIndex < processorsList.size() - 1) {return processorsList.get(currentIndex + 1);}}}return null;}};}@Beanpublic Map<String, AbstractDataStreamProcessor> dataStreamProcessorMap() {return dataStreamChains.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0)));}

改造后场景复现

基于以上设计,规避掉繁琐的if else嵌套,以Java类作为基础单元,进行数据组件化的思路去再次实现场景。

基于之前的封装,此处直接进行数据处理器的实现即可,我们先创建四个具体的处理器来处理这些规则:

  • DataTypeFilterProcessor:过滤掉 data_type 为 "unknow" 的数据。
  • SourceCheckProcessor:检查 source_from 和 source_to 是否均为空,如果是,则丢弃数据。
  • OpTimeFilterAndStoreBProcessor:如果 op_time 大于 '2023-01-01 08:09:00',则存储到表 B。
  • StoreAProcessor:如果 source_from 不为空但 source_to 为空,则存储到表 A。

以下为具体代码:

@Component
//在SpringBoot中只需要DataStream注解就可以自动地注册成为某条数据流地处理
@DataStream(dataStreamName = "default", order = 1)
public class DataTypeFilterProcessor extends AbstractDataStreamProcessor {@OverrideAtomicBoolean disposeData(Object data) {Map<String, Object> record = (Map<String, Object>) data;if ("unknow".equals(record.get("data_type"))) {//结束处理,后续不做处理return new AtomicBoolean(false);}return new AtomicBoolean(false);}
}@Component
@DataStream(dataStreamName = "default", order = 2)
public class SourceCheckProcessor extends AbstractDataStreamProcessor {@OverrideAtomicBoolean disposeData(Object data) {MyData record = (Mydata) data;if (record.get("source_from") == null && record.get("source_to") == null) {//相关逻辑处理return new AtomicBoolean(false);}}}@Component
@DataStream(dataStreamName = "default", order = 3)
class OpTimeFilterAndStoreBProcessor extends AbstractDataStreamProcessor {private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");@AutowiredTableBRepository repository;@OverrideAtomicBoolean disposeData(Object data) {//相关逻辑处理}private void storeToTableB(Map<String, Object> record) {// 实现存储到表B的逻辑}}@Component
@DataStream(dataStreamName = "default", order = 4)
class StoreAProcessor extends AbstractDataStreamProcessor {@OverrideAtomicBoolean disposeData(Object data) {//相关逻辑处理}}

该数据流使用:

@Component
public class KafkaMessageConsumer {@Autowiredprivate Map<String, AbstractDataStreamProcessor> dataStreamProcessorMap;@KafkaListener(topics = "default", groupId = "my-group")public void listen(@Payload String message) {AbstractDataStreamProcessor processor = dataStreamProcessorMap.get("default");processor.handle(data);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/478356.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PGSQL物化视图(Materialized View)

在 PostgreSQL 中&#xff0c;物化视图&#xff08;Materialized View&#xff09;是一种特殊的数据库对象&#xff0c;它存储了查询的结果集&#xff0c;并可以定期刷新以反映基础表中的数据变化。物化视图可以提高查询性能&#xff0c;因为它减少了每次查询时重新计算数据的需…

浏览器缓存与协商缓存

1. 强缓存&#xff08;Strong Cache&#xff09; 定义 强缓存是指在缓存的资源有效期内&#xff0c;浏览器会直接使用缓存中的数据&#xff0c;而不会发起网络请求。也就是说&#xff0c;浏览器会直接从本地缓存读取资源&#xff0c;不会与服务器进行任何交互。 如何控制强缓…

JS听到了替罪的回响

这篇还是继续写JS 这是有关函数的一些内容 函数 为什么需要函数 函数是被设计为执行指定任务的代码块 函数可以把具有相同或者相似逻辑的代码包裹起来&#xff0c;通过函数调用执行这些被包裹的代码逻辑&#xff0c;这样的优势是有利于精简代码方便复用 函数使用 这是函…

【优选算法】前缀和

目录 一、[【模板】前缀和](https://www.nowcoder.com/practice/acead2f4c28c401889915da98ecdc6bf?tpId230&tqId2021480&ru/exam/oj&qru/ta/dynamic-programming/question-ranking&sourceUrl%2Fexam%2Foj%3Fpage%3D1%26tab%3D%25E7%25AE%2597%25E6%25B3%2595…

SAP ME2L/ME2M/ME3M报表增强添加字段

SAP ME2L/ME2M/ME3M报表增强添加字段&#xff08;包含&#xff1a;LMEREPI02、SE18:ES_BADI_ME_REPORTING&#xff09; ME2L、ME2M、ME3M这三个报表的字段增强&#xff0c;核心点都在同一个结构里 SE11:MEREP_OUTTAB_PURCHDOC 在这里加字段&#xff0c;如果要加的字段是EKKO、…

破解天然气巡检挑战,构建智能运维体系

一、行业现状 天然气行业在能源领域地位举足轻重&#xff0c;其工作环境高风险&#xff0c;存在有毒有害、易爆气体及高温等情况&#xff0c;且需持续监控设备运行状态&#xff0c;人工巡检面临巨大挑战与风险。好在随着科技发展&#xff0c;防爆巡检机器人的应用为天然气管道…

TSmaster CAN/CANFD 诊断(Diagnostic_CAN)

文章目录 1、Diagnostic TP 参数配置1.1 传输层参数&#xff1a;1.2 服务层参数1.3 Seed&Key 2、基础诊断配置2.1 添加/删除 服务2.2 配置 BasicDiagnostic 服务参数 3、诊断控制台4、自动诊断流程4.1 流程用例管理4.2 配置诊断流程&#xff08;UDS Flow&#xff09;4.2.1 …

详解Servlet的使用

目录 Servlet 定义 动态页面 vs 静态页面 主要功能 Servlet的使用 创建Maven项目 引入依赖 创建目录 编写代码 打war包 部署程序 验证程序 Smart Tomcat 安装Smart Tomcat 配置Smart Tomcat插件 启动Tomcat 访问页面 路径对应关系 Servlet运行原理 Tomcat的…

mysql数据库双机互为主从设置与数据库断电无法启动处理

一、mysql数据库双机互为主从设置 前言 1.环境windows 2.数据库8.0 3.服务器1&#xff1a;192.168.12.1 4.服务器2&#xff1a;192.168.12.2 1. 设置数据库的配置文件 对文件名&#xff1a;my.ini进行修改 服务器1&#xff1a;192.168.12.1配置文件设置 [mysql] 下添加如…

strupr(arr);模拟实现(c基础)

hi , I am 36 适合对象c语言初学者 strupr(arr);函数是把arr数组变为大写字母&#xff0c;并返回arr 介绍一下strupr(arr)&#xff1b;(c基础&#xff09;-CSDN博客 现在进行My__strupr(arr);模拟实现 #include<stdio.h>//My__strupr(arr); //返回值为arr(地址),于是…

项目实战:基于深度学习的人脸表情识别系统设计与实现

大家好&#xff0c;人脸表情识别是计算机视觉领域中的一个重要研究方向&#xff0c;它涉及到对人类情感状态的理解和分析。随着深度学习技术的发展&#xff0c;基于深度学习的人脸表情识别系统因其高精度和强大的特征学习能力而受到广泛关注。本文旨在探讨基于深度学习的人脸表…

架构师思维中的人、产品和技术

架构思维主要是一种以产品和业务为驱动的顶层解决问题的思维,需要同时考虑产品、人和技术3重关系,思维点需要同时落在三维体系中。虽然架构师很多时候做的工作其实只是分和合,即所谓的系统分拆及重新组合,但综合能力要求很高,需要同时具备思维的高度和深度,在思维抽象的同…

智能显示屏插座:能否成为家庭用电安全的守护天使?

关键词&#xff1a;显示屏插座、LCD显示屏插座、LCD插座、智能计量插座、计量监测插座 最近&#xff0c;一则令人揪心的新闻在网络上疯传 在一个老旧小区里&#xff0c;由于电线老化和插座过载问题&#xff0c;引发了一场小型火灾。火势迅速蔓延&#xff0c;虽然幸运的是没有…

SAP_MM/CO模块-超详细的CK11N/CK40N取值逻辑梳理(十几种业务场景,1.76W字)

一、业务背景 财务月结完成后,对次月物料进行成本发布时,经常会提物料成本不准的问题,譬如说同一个物料,CK40N发布的成本与CK11N发布的成本对不上;再有就是因为物料有多个生产版本,多个采购价格,多个货源清单等主数据,导致CK11N发布成本的时候,跟用户理解的取数逻辑对…

今天你学C++了吗?——C++中的类与对象(第二集)

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨ 个…

部署实战(二)--修改jar中的文件并重新打包成jar文件

一.jar文件 JAR 文件就是 Java Archive &#xff08; Java 档案文件&#xff09;&#xff0c;它是 Java 的一种文档格式JAR 文件与 ZIP 文件唯一的区别就是在 JAR 文件的内容中&#xff0c;多出了一个META-INF/MANIFEST.MF 文件META-INF/MANIFEST.MF 文件在生成 JAR 文件的时候…

微信小程序+Vant-自定义选择器组件(多选

实现效果 无筛选&#xff0c;如有需要可参照单选组件中的方法.json文件配置"component": true,columns需要处理成含dictLabel和dictValue字段&#xff0c;我是这样处理的&#xff1a; let list arr.map(r > {return {...r,dictValue: r.xxxId,dictLabel: r.xxx…

.NET Core发布网站报错 HTTP Error 500.31

报错如图&#xff1a; 解决办法&#xff1a; 打开任务管理器》》服务》》找到这仨服务&#xff0c;右键启动即可&#xff0c;如果已经启动了就重启&#xff1a;

Canvas 前端艺术家

目前各种数据来看&#xff0c;前端未来在 数据可视化 和 AI 这两个领域会比较香&#xff0c;而 Canvas 是 数据可视化 在前端方面的基础技术。所以给大家唠唠Canvas这个魔幻工具。 Canvas 介绍 Canvas 中文名叫 “画布”&#xff0c;是 HTML5 新增的一个标签。Canvas 允许开发…

Leetcode142. 环形链表 II(HOT100)

链接 我的错误代码&#xff1a; class Solution { public:ListNode *detectCycle(ListNode *head) {if(!head||!head->next)return nullptr;ListNode* f head->next,*s head;while(f){f f->next,s s->next;if(!f)return nullptr;f f->next;if(fs){ListNo…