科普文:JUC系列之ForkJoinPool基本使用及原理解读-CSDN博客
科普文:JUC系列之ForkJoinPool源码解读概叙-CSDN博客
科普文:JUC系列之ForkJoinPool源码解读WorkQueue-CSDN博客
科普文:JUC系列之ForkJoinPool源码解读ForkJoinTask-CSDN博客
ForkJoinWorkerThread实际上非常简单,就是结合ForkJoinPool,然后根据其需要,创建合适的线程的过程。这里面值得我们借鉴的是,如果需要创建无其他访问权限的线程,实际上这两种线程大部分内容都是相同的,因此可以通过继承来复用大部分代码。之后定义两个factory,让最终的用户根据需要选择factory。
一、类结构及其成员变量
1.1 类结构和注释
类结构代码如下:
public class ForkJoinWorkerThread extends Thread {}
ForkJoinWorkerThread继承了Thread类,ForkJoinWorkerThread是由ForkJoinPool管理的线程,该线程执行ForkJoinTask。此类仅可做为扩展功能的需要而被集成,因为没有提供可以调度或者可重新的方法。但是,你可以覆盖主任务处理循环周围初始化和终止方法。如果确实创建了这个类的子类,还需要在ForkJoinPool中提供自定义的ForkJoinWorkerThreadFactory来使用。
1.2 常量
主要有两个:
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
ForkJoinWorkerThreads由ForkJoinPools管理,并执行ForkJoinTasks。请参见ForkJoinPool的内部文档。此类仅仅维护了指向pool和WorkQueue的链接。pool字段在构造的时候直接设置。但是直到对registerWorker调用完成之后,才设置workQueue字段。这将导致可见性竞争,可以通过要求workQueue字段仅由其所属线程访问来规避这个问题。对于InnocuousForkJoinWorkerThread子类的支持,要求我们在此处和子类中破坏很多封装,通过Unsafe以访问和设置Thread字段。
这是两个final修饰的常量,智能初始化一次。
二、构造函数
2.1 ForkJoinWorkerThread(ForkJoinPool pool)
protected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;this.workQueue = pool.registerWorker(this);
}
其构造函数主要是创建一个在给定pool中的ForkJoinWorkerThread。构造函数中最主要的方法就是registerWorker。
2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,AccessControlContext acc) {super(threadGroup, null, "aForkJoinWorkerThread");U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);eraseThreadLocals(); // clear before registeringthis.pool = pool;this.workQueue = pool.registerWorker(this);
}
这个方法需要注意的是,采用unSafe方法,在这个类的INHERITEDACCESSCONTROLCONTEXT位置处,设置传入的AccessControlContext对象。
之后调用方法eraseThreadLocals将threadLocals清除。
之后与同用的构造函数一致。
擦除ThreadLocal也是采用UnSafe来完成。通过putObject将ThreadLocals的位置设置为null。
final void eraseThreadLocals() {U.putObject(this, THREADLOCALS, null);U.putObject(this, INHERITABLETHREADLOCALS, null);
}
实际上这个方法将会提供给InnocuousForkJoinWorkerThread继承的时候使用。
三、核心方法
3.1 run
做为Thread,最重要的就是run方法,我们来看看ForkJoinWorkerThread的实现:
public void run() {//如果workQueue为空,则抛出异常if (workQueue.array == null) { // only run onceThrowable exception = null;try {//调用onStart方法onStart();//调用pool的runWorker方法,运行workQueuepool.runWorker(workQueue);} catch (Throwable ex) {exception = ex;} finally {//操作完之后处理try {//调用onTermination方法onTermination(exception);} catch (Throwable ex) {//如果出现异常,则进行异常处理if (exception == null)exception = ex;} finally {//最终需要指向deregister方法pool.deregisterWorker(this, exception);}}}
}
runWorker方法实际上是对workQueue结合随机魔数,选择一个workQueue进行遍历,调用scan方法,如果不为空则执行,反之则wait。
其中registerWorker与deregisterWorker方法 ,我们可以参考前面的ForkJoinPool源码解读。
3.2 定义的可扩展方法
由于ForkJoinWorkerThread还支持继承扩展,因此在此定义了两个扩展的方法:
protected void onStart() {
}protected void onTermination(Throwable exception) {
}
这两个方法用于执行之前和之后,onStart用于run实际执行之前,执行一些初始化操作。onTermination用于run实际执行之后,执行一些清理操作。
四、内部类InnocuousForkJoinWorkerThread
这个类就是继承了ForkJoinWorkerThread的一个实现类。此类定义了一个没有任何权限、也非用户定义的任何线程组的线程。这个线程在运行完每个top的task之后,会擦除所有的ThreadLocals。
源码如下:
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {/** The ThreadGroup for all InnocuousForkJoinWorkerThreads */private static final ThreadGroup innocuousThreadGroup =createThreadGroup();/** An AccessControlContext supporting no privileges */private static final AccessControlContext INNOCUOUS_ACC =new AccessControlContext(new ProtectionDomain[] {new ProtectionDomain(null, null)});InnocuousForkJoinWorkerThread(ForkJoinPool pool) {super(pool, innocuousThreadGroup, INNOCUOUS_ACC);}@Override // to erase ThreadLocalsvoid afterTopLevelExec() {eraseThreadLocals();}@Override // to always report system loaderpublic ClassLoader getContextClassLoader() {return ClassLoader.getSystemClassLoader();}@Override // to silently failpublic void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }@Override // paranoicallypublic void setContextClassLoader(ClassLoader cl) {throw new SecurityException("setContextClassLoader");}private static ThreadGroup createThreadGroup() {try {sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();Class<?> tk = Thread.class;Class<?> gk = ThreadGroup.class;long tg = u.objectFieldOffset(tk.getDeclaredField("group"));long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));ThreadGroup group = (ThreadGroup)u.getObject(Thread.currentThread(), tg);while (group != null) {ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);if (parent == null)return new ThreadGroup(group,"InnocuousForkJoinWorkerThreadGroup");group = parent;}} catch (Exception e) {throw new Error(e);}// fall through if null as cannot-happen safeguardthrow new Error("Cannot create ThreadGroup");}
}
AccessControlContext INNOCUOUS_ACC 定义了一个不支持任何特权访问的AccessControlContext。这个类会创建一个单独的threadGroup,以确保其不属于任何一个用户创建的ThreadGroup。
五、ForkJoinPool中创建工作线程的过程
此时再来结合ForkJoinPool中的ForkJoinWorkerThreadFactory,就能明白ForkJoinThread的创建意义了。ForkJoinPool根据访问权限的需要,定义了采用默认的创建方法,还是创建InnocuousForkJoinWorkerThread。
5.1 makeCommonPool创建过程
再ForkJoinPool重的makeCommPool,有如下代码:
if (factory == null) {if (System.getSecurityManager() == null)factory = defaultForkJoinWorkerThreadFactory;else // use security-managed defaultfactory = new InnocuousForkJoinWorkerThreadFactory();
}
这里也就是说,如果System.getSecurityManager()为null,则返回默认的ThreadFactory,而不为null,则说, 使用了默认的安全管理级别,因此将创建InnocuousForkJoinWorkerThreadFactory。
5.2 createWorker创建过程
ForkJoinWorkerThreadFactory fac = factory;
try {if (fac != null && (wt = fac.newThread(this)) != null) {wt.start();return true;}
} catch (Throwable rex) {ex = rex;
}
也就是说,createWorker根据ForkJoinWorkerThreadFactory的实现类来创建。
5.3 InnocuousForkJoinWorkerThreadFactory
static final class InnocuousForkJoinWorkerThreadFactoryimplements ForkJoinWorkerThreadFactory {private static final AccessControlContext innocuousAcc;static {Permissions innocuousPerms = new Permissions();innocuousPerms.add(modifyThreadPermission);innocuousPerms.add(new RuntimePermission("enableContextClassLoaderOverride"));innocuousPerms.add(new RuntimePermission("modifyThreadGroup"));innocuousAcc = new AccessControlContext(new ProtectionDomain[] {new ProtectionDomain(null, innocuousPerms)});}public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinWorkerThread>() {public ForkJoinWorkerThread run() {return new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool);}}, innocuousAcc);}
}
5.4 DefaultForkJoinWorkerThreadFactory
static final class DefaultForkJoinWorkerThreadFactoryimplements ForkJoinWorkerThreadFactory {public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new ForkJoinWorkerThread(pool);}
}