部署服务
参考RocketMq入门介绍
示例
引入maven依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
完整依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.9</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>rocketMqDemo</artifactId><version>0.0.1-SNAPSHOT</version><name>rocketMqDemo</name><description>rocketMqDemo</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
修改application.properties文件
配置文件如下:
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-grouprocketmq.consumer.topic=test-topic
所有的配置参考RocketMQProperties源码中配置。
rocketmq.name-server:服务地址
rocketmq.producer.group:生产者的组名称
rocketmq.consumer.topic:消费者的主题名称
定义生产者
生产者是 Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。
生产者通常被集成在业务系统中,将业务消息按照要求封装成 Apache RocketMQ 的消息(Message)并发送至服务端。
生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。
代码示例如下:
@Component
public class RocketMqProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 发送同步消息* @param msg*/public void sendSyncMsg(String msg){rocketMQTemplate.convertAndSend("test-topic-1", msg);}/*** 发送Spring消息* @param msg*/public void sendSpringMsg(String msg){rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload(msg).build());}/*** 发送异步消息* @param msg*/public void sendAsyncMsg(String msg){rocketMQTemplate.asyncSend("test-topic-1", new MsgBean(msg), new SendCallback() {@Overridepublic void onSuccess(SendResult var1) {System.out.printf("async onSucess SendResult=%s %n", var1);}@Overridepublic void onException(Throwable var1) {System.out.printf("async onException Throwable=%s %n", var1);}});}/*** 发送有序消息* @param msg*/public void sendOrderlyMsg(String msg){rocketMQTemplate.syncSendOrderly("test-topic-1",MessageBuilder.withPayload(msg).build(),"hashkey");}
}
定义消费者
消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从 Apache RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
在消息消费端,可以定义如下传输行为:
-
消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。
-
消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括PushConsumer类型、SimpleConsumer类型、PullConsumer类型(仅推荐流处理场景使用)等。具体信息,请参见消费者分类。
-
消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。
代码示例如下:
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-group")
@Component
public class RocketMqConsumer implements RocketMQListener<String> {public void onMessage(String message) {System.out.println("received message: "+ JSON.toJSONString(message));}
}
定义Controller调用消费者
代码示例如下:
@Controller
public class RestController {@AutowiredRocketMqProducer producer;@RequestMapping(value = "/sendSyncMsg")@ResponseBodypublic String sendSyncMsg(){producer.sendSyncMsg("hello word");return "ok"; }
}