1. 在pom.xml中添加rocketmq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
注意:rocketmq的版本需要与java版本对应,rocketmq starter包含了
rocketmq-client
2. 配置yaml文件
server:port: 8081
rocketmq:consumer:group: defaultGroupname-server: 127.0.0.1:9876producer:group: defaultGroup
3. 生产者代码代码
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "defaultGroup")
public class ConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("Received message: " + s);}
}
4. 消费者代码
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service;@Service @RocketMQMessageListener(topic = "test-topic", consumerGroup = "defaultGroup") public class ConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("Received message: " + s);} }
5. 测试案例
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class MQ_Test implements CommandLineRunner {private final ProducerService producer;public MQ_Test(ProducerService producerService) {this.producer = producerService;}public static void main(String[] args) {SpringApplication.run(MQ_Test.class, args);}@Overridepublic void run(String... args) throws Exception {producer.sendMessage("test-topic", "Hello RocketMQ!");}
}
结果如下: