一、前言
在现代的分布式系统中,消息传递已成为一个非常流行的模式。它使得系统内的不同部分可以松耦合地通信,从而实现更高效、更可靠的应用程序。本博客将介绍SpringBoot如何提供简单易用的消息传递机制,并展示如何自定义消息总线以满足特定需求。
二、依赖引入
// gradle 自身需求资源库 放头部
buildscript {repositories {maven { url 'https://maven.aliyun.com/repository/public' }// 加载其他Maven仓库mavenCentral()}dependencies {classpath('org.springframework.boot:spring-boot-gradle-plugin:2.1.1.RELEASE')// 加载插件,用到里面的函数方法}
}apply plugin: 'java'
apply plugin: 'idea'
// 使用spring boot 框架
apply plugin: 'org.springframework.boot'
// 使用spring boot的自动依赖管理
apply plugin: 'io.spring.dependency-management'// 版本信息
group 'com.littledyf'
version '1.0-SNAPSHOT'// 执行项目中所使用的的资源仓库
repositories {maven { url 'https://maven.aliyun.com/repository/public' }mavenCentral()
}// 项目中需要的依赖
dependencies {// 添加 jupiter 测试的依赖testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'// 添加 jupiter 测试的依赖testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'// 添加 spring-boot-starter-web 的依赖 必须 排除了security 根据自身需求implementation('org.springframework.boot:spring-boot-starter-web') {exclude group: 'org.springframework.security', module: 'spring-security-config'}// 添加 spring-boot-starter-test 该依赖对于编译测试是必须的,默认包含编译产品依赖和编译时依赖testImplementation 'org.springframework.boot:spring-boot-starter-test'// 添加 junit 测试的依赖testImplementation group: 'junit', name: 'junit', version: '4.11'// 添加 lombokannotationProcessor 'org.projectlombok:lombok:1.18.22' // annotationProcessor代表main下代码的注解执行器testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'// testAnnotationProcessor代表test下代码的注解执行器compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.22' // compile代表编译时使用的lombok}test {useJUnitPlatform()
}
三、代码
定义注册器实现类:
import org.springframework.context.ApplicationContext;
import org.springframework.core.GenericTypeResolver;import java.util.HashMap;
import java.util.Map;/*** @description 注册器*/
public class Registry {/*** Query对象和命令提供者的对应关系*/private Map<Class<? extends Query>,QueryProvider> queryProviderMap = new HashMap<>();/*** Event对象和命令提供者的对应关系*/private Map<Class<? extends Event>,EventProvider> eventProviderMap = new HashMap<>();public Registry(ApplicationContext applicationContext){String[] names = applicationContext.getBeanNamesForType(QueryHandler.class);for (String name : names) {registerQuery(applicationContext,name);}names = applicationContext.getBeanNamesForType(EventHandler.class);for (String name : names) {registerEvent(applicationContext,name);}}private void registerQuery(ApplicationContext applicationContext, String name) {Class<QueryHandler<?,?>> handlerClass = (Class<QueryHandler<?,?>>) applicationContext.getType(name);Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, QueryHandler.class);Class<? extends Query> queryType = (Class<? extends Query>) generics[1];queryProviderMap.put(queryType, new QueryProvider(applicationContext, handlerClass));}private void registerEvent(ApplicationContext applicationContext, String name) {Class<EventHandler<?>> handlerClass = ( Class<EventHandler<?>>) applicationContext.getType(name);Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, EventHandler.class);Class<? extends Event> eventType = (Class<? extends Event>) generics[0];eventProviderMap.put(eventType, new EventProvider(applicationContext, handlerClass));}/*** 获取具体的QueryHandler <R, Q extends Query<R>>定义R Q的类型* @param queryClass* @param <R>* @param <Q>* @return*/<R, Q extends Query<R>> QueryHandler<R,Q> getQuery(Class<Q> queryClass) {return queryProviderMap.get(queryClass).get();}/*** 获取具体的EventHandler* @param eventClass* @return*/<E extends Event> EventHandler<E> getEvent(Class<? extends Event> eventClass) {return eventProviderMap.get(eventClass).get();}
}
消息总线接口,定义两个方法,一个执行查询,一个执行事件:
/*** @description 消息总线*/
public interface Bus {<R,Q extends Query<R>> R executeQuery(Q query);<E extends Event> void dispatchEvent(E event);
}
消息总线实现类:
public class SpringBus implements Bus {private final Registry registry;public SpringBus(Registry registry) {this.registry = registry;}@Overridepublic <R, Q extends Query<R>> R executeQuery(Q query) {QueryHandler<R, Q> queryHandler = (QueryHandler<R, Q>) registry.getQuery(query.getClass());return queryHandler.handle(query);}@Overridepublic <E extends Event> void dispatchEvent(E event) {EventHandler<E> eventHandler = (EventHandler<E>) registry.getEvent(event.getClass());eventHandler.process(event);}
}
Query接口:
public interface Query<R> {}
QueryHandler接口:
public interface QueryHandler<R, C extends Query<R>> {R handle(C query);
}
QueryProvider类:
import org.springframework.context.ApplicationContext;/*** query 提供者* @param <H>*/
public class QueryProvider<H extends QueryHandler<?, ?>> {private final ApplicationContext applicationContext;private final Class<H> type;QueryProvider(ApplicationContext applicationContext, Class<H> type) {this.applicationContext = applicationContext;this.type = type;}public H get() {return applicationContext.getBean(type);}
}
Event类似,Event接口:
public interface Event {}
EventHandler接口:
/*** @description 事件处理器*/
public interface EventHandler<E extends Event> {/**** @param event 事件*/void process(E event);
}
EventProvider类:
import org.springframework.context.ApplicationContext;/*** event 提供者* @param <H>*/
public class EventProvider<H extends EventHandler<?>> {private final ApplicationContext applicationContext;private final Class<H> type;EventProvider(ApplicationContext applicationContext, Class<H> type) {this.applicationContext = applicationContext;this.type = type;}public H get() {return applicationContext.getBean(type);}
}
实体类:
import com.littledyf.cqs.Query;
import lombok.Data;import java.io.Serializable;
import java.util.List;@Data
public class TestDto implements Serializable, Query<List<TestVo>> {private String name;
}
import lombok.Data;@Data
public class TestVo {private String nameVo;
}
Query具体实现类:
import com.littledyf.cqs.QueryHandler;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Component
@NoArgsConstructor
public class TestQueryHandler implements QueryHandler<List<TestVo>, TestDto> {@Overridepublic List<TestVo> handle(TestDto testDto) {List<TestVo> testVos = new ArrayList<>();TestVo testVo = new TestVo();testVo.setNameVo(testDto.getName());testVos.add(testVo);return testVos;}
}
Controller层:
import com.littledyf.cqs.Bus;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;
import java.util.List;@Slf4j
@RestController
@RequestMapping("/my-test/cqs")
public class CqsController {@Resourceprivate Bus bus;@PostMapping(value = "/query-test")public List<TestVo> queryTest(@RequestBody TestDto testDto) {return bus.executeQuery(testDto);}
}
SpringBoot启动类,启动类中进行ApplicationContext的注入:
import com.littledyf.cqs.Bus;
import com.littledyf.cqs.Registry;
import com.littledyf.cqs.SpringBus;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class MyTestApplication {public static void main(String[] args) {SpringApplication.run(MyTestApplication.class, args);}/*** 注册器*/@Beanpublic Registry registry(ApplicationContext applicationContext) {return new Registry(applicationContext);}/*** 消息总线*/@Beanpublic Bus commandBus(Registry registry) {return new SpringBus(registry);}
}
yml文件配置:
server:port: 8080
spring:application:name: my-test-service
四、测试
这里只要模拟了查询,事件等与查询类似,需要实现具体的接口。整体实现就是在SpringBoot启动时加载注册类,注册类会根据具体的类注入相应的bean,在具体调用时,会根据不同的类实现调用相关的bean。