前言
Dubbo3 提供了一个挺有意思的 Executor,用来将提交到线程池里的任务按顺序串行执行。
需求背景:你有一个线程池,但是你不想修改它,现在你的需求是要把提交上去的任务按顺序串行执行。
在这样一个需求背景下,SerializingExecutor 诞生了。
SerializingExecutor 在 Dubbo3 的应用场景是:针对 HTTP2 上的 Stream 接收到的 Frame 要按顺序处理。
SerializingExecutor
SerializingExecutor 类图很简单,首先它实现了 Executor 接口,意味着它可以执行提交的任务。当然了,它本身不创建线程,会依赖实际的 Executor 执行任务。
它还实现了 Runnable 接口,意味着它也是一个可执行的任务,可以被提交到 Executor 里执行。
SerializingExecutor 按顺序串行执行任务的逻辑很简单,核心是:提交的任务先入队等待,而后再按顺序串行化的调度任务执行。
public final class SerializingExecutor implements Executor, Runnable {// 运行状态 CAS防止并发private final AtomicBoolean atomicBoolean = new AtomicBoolean();// 实际跑任务的线程池private final Executor executor;// 任务队列private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
}
SerializingExecutor 本身不创建线程,不具备异步执行任务的能力,它依赖一个实际干活的 Executor。
public SerializingExecutor(Executor executor) {this.executor = executor;
}
它重写了execute()
方法,避免任务被直接执行,而是先入队等待,再自己去调度执行。
@Override
public void execute(Runnable r) {// 先入队runQueue.add(r);// 调度执行schedule(r);
}
schedule()
调度任务执行,通过CAS防止并发。它把自己提交到 executor 里去执行了,所以我们重点看run()
。
private void schedule(Runnable removable) {if (atomicBoolean.compareAndSet(false, true)) {boolean success = false;try {executor.execute(this);success = true;} finally {if (!success) {if (removable != null) {runQueue.remove(removable);}atomicBoolean.set(false);}}}
}
run()
也很简单,就是循环从队列里取出任务,然后挨个执行,队列本身保证了任务的先进先出,所以任务是按顺序串行执行的。
@Override
public void run() {Runnable r;try {// 循环出队 先进先出 按顺序执行while ((r = runQueue.poll()) != null) {InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();try {r.run();} catch (RuntimeException e) {LOGGER.error(COMMON_ERROR_RUN_THREAD_TASK, "", "", "Exception while executing runnable " + r, e);} finally {InternalThreadLocalMap.set(internalThreadLocalMap);}}} finally {atomicBoolean.set(false);}if (!runQueue.isEmpty()) {schedule(null);}
}