安装RocketMQ根据上篇文章使用Docker安装RocketMQ并启动之后,有个隐患详情见下文
Spring Boot集成
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
-
rocketmq还没有支持spring boot 3。
-
rocketmq自动转配的方式是spring boot 2的方式解决方法:使用spring boot 3自动装配方式,这个需要去了解
stater的写法
通过源码发现RocketMQAutoConfiguration是需要自动装配的所以在resource文件夹下创建METAINF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
RocketMQ连接失败
RocketMQ yml配置: 隐患就是name-service这里启动会报错连接失败,如果你使用的是腾讯云服务器并且使用官方docker命令部署RocketMQ,问题就出在docker 命令使用了--net=host
此时服务器内9876端口并未开放使用firewall-cmd --add-port=7896/tcp --permanent,注意重启网卡以及腾讯云控制台端口开放
。Tip:-p
docker默认会修改防火墙规则
rocketmq:name-server: ip:9876# 生产者producer:group: test_group# 消息发送超时时间send-message-timeout: 10000# 消息最大长度4Mmax-message-size: 4096# 消息发送失败重试次数retry-times-when-send-failed: 3# 异步消息发送失败重试次数retry-times-when-send-async-failed: 2# 消费者consumer:group: test_group# 每次提取的最大消息数pull-batch-size: 5
RocketMQ Broker连接失败
接下来在启动: 发现还是报错发现是RocketMQ Broker端口连接失败问题,参照如下解决
-
如果是docker单节点部署,应该有两个容器,进入rocketmq容器中找到
broker.conf
-
修改配置文件:只增加多出的配置,然后把这个配置文件复制出去重启这个容器
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1=122.51.115.127 namesrvAddr=122.51.115.127:9876;localhost:9876 autoCreateTopicEnable=true
-
删除Broker容器,重新部署
docker run -d --net=host --name=rocketmqBroker -v type=bind,source=/home/server/rocketmq,target=/home/rocketmq/store apache/rocketmq ./mqbroker -n localhost:9876 -C 配置文件路径
配置文件路径是真实存在容器内的所以需要把配置文件放在映射的文件夹中
然后启动spring boot程序
发送/接收信息
接上文,这个类具有接收信息和发送信息,看以下类注释
@EnableAspectJAutoProxy
@EnableAsync
@EnableScheduling
@Slf4j
@Indexed
@SpringBootApplication
@EnableDiscoveryClient
@RestController
@Tag(name = "rocket测试控制器")
@RequiredArgsConstructor
//这个注解用于回调接收信息,你要接收那个TOPIC主题的信息,以及你所属哪个组
@RocketMQMessageListener(consumerGroup = "test_group",topic = TOPIC)
public class RocketMQApplication implements RocketMQListener<String> {public static final String TOPIC = "test_rocket";public static void main(String[] args) {try {//打开rockermq打印System.setProperty("rocketmq.client.logUseSlf4j", "true");SpringApplication.run(RocketMQApplication.class, args);log.info("项目启动成功(ง ˙o˙)ว");} catch (Exception e) {log.error("启动失败:",e);}}private final RocketMQTemplate rocketMQTemplate;@GetMapping("/send/msg")public String sendMsg1 (){try {Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();//发送信息SendResult sendResult = rocketMQTemplate.syncSend(TOPIC, msg);log.error("发送结果 = {}",sendResult);} catch (Exception e) {e.printStackTrace();}return "OK" ;}@Overridepublic void onMessage(String message) {log.error("我是消费者 = {}",message);}
}