`Disruptor`是LMAX公司开源的高性能内存消息队列,单线程处理能力可达600w订单/秒。本文将使用该框架实现生产-消费者模式。
一、框架的maven依赖
<!-- https://mvnrepository.com/artifact/com.lmax/disruptor --><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency>
二、消息事件
package com.monika.main.system.mq;import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventProcessor;import java.util.EventObject;/*** @author:whh* @date: 2024-12-04 20:27* <p></p>*/
public class MsgEvent {private String data;public String getData() {return data;}public void setData(String data) {this.data = data;}
}
三、消息事件处理器
package com.monika.main.system.mq;import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;/*** @author:whh* @date: 2024-12-04 22:28* <p>* * * </p>*/
public class MsgEventHandler implements EventHandler<MsgEvent>, WorkHandler<MsgEvent> {private String name;public MsgEventHandler(String name) {this.name = name;}@Overridepublic void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {System.out.println(name+"-----start-----"+sequence);Thread.sleep(1000*10);System.out.println("ThreadName: "+Thread.currentThread().getName());System.out.println(event.getData()+" end seq: "+sequence);}@Overridepublic void onEvent(MsgEvent event) throws Exception {System.out.println(name+"-----start-----");Thread.sleep(1000*10);System.out.println("ThreadName: "+Thread.currentThread().getName());System.out.println(event.getData());System.out.println(name+"-----end-----");}
}
该消息处理器实现了两个接口,EventHandler接口,该接口实现统一消费一个消息会被所有消费者消费;WorkHandler接口,该接口实现分组消费一个消息只能被一个消费者消费,多消费者轮询处理。
四、Disruptor配置
package com.monika.main.system.mq;import cn.hutool.core.thread.NamedThreadFactory;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author:whh* @date: 2024-12-04 20:33* <p></p>*/@Configuration
public class RingBufferConfig {@Beanpublic RingBuffer<MsgEvent> ringBuffer(){NamedThreadFactory threadFactory = new NamedThreadFactory("MsgEvent-",true);EventFactory<MsgEvent> eventFactory = new EventFactory<MsgEvent>() {@Overridepublic MsgEvent newInstance() {return new MsgEvent();}};Disruptor<MsgEvent> disruptor = new Disruptor(eventFactory,1024, threadFactory);//定义两个消费者MsgEventHandler m1 = new MsgEventHandler("m1");MsgEventHandler m2 = new MsgEventHandler("m2");//disruptor.handleEventsWith(m1,m2); //统一消费:一个消息会被所有消费者消费disruptor.handleEventsWithWorkerPool(m1,m2);//分组消费:一个消息只能被一个消费者消费,多消费者轮询处理//disruptor.handleEventsWith(m1).then(m2); //顺序消费:1、3先并行处理,然后2再处理disruptor.start();//配置多消费者,每个消费者将有单独的线程处理return disruptor.getRingBuffer();}
}
五、消息生产者MsgPublish
package com.monika.main.system.mq;import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author:whh* @date: 2024-12-04 20:45* <p></p>*/
@Component
public class MsgPublish {public static void publish(String message){/*** 返回布尔值,表示事件是否发布成功,如果失败可根据此值进行业务逻辑判断*/boolean b = ringBuffer.tryPublishEvent(TRANSLATOR, message);}private static final EventTranslatorOneArg<MsgEvent,String> TRANSLATOR = new EventTranslatorOneArg<MsgEvent,String>() {@Overridepublic void translateTo(MsgEvent event, long sequence, String arg0) {event.setData(arg0);}};private static RingBuffer<MsgEvent> ringBuffer;@Autowiredpublic void setRingBuffer(RingBuffer<MsgEvent> ringBuffer) {MsgPublish.ringBuffer = ringBuffer;}
}
六、测试
本次测试使用的是分组模式,可以发现一个消息只能被一个消费者消费,且每个消费者都由单独的线程处理。
七、总结
本次只是简单的应用disruptor框架实现生产-消费者模式,对于disruptor的原理主要是RingBuffer环形数组,这个咱们后续再进一步研究。