【优化】XXLJOB修改为使用虚拟线程

【优化】XXLJOB修改为使用虚拟线程

新建这几个目录 类, 去找项目对应的xxljob的源码

主要是将 new Thread  改为  虚拟线程

Thread.ofVirtual().name("VT").unstarted

以下代码是 xxljob   2.3.0版本  举一反三 去修改对应版本的代码

<!--        定时任务--><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version></dependency>

XxlJobExecutor
package com.xxl.job.core.executor;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.server.EmbedServer;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** Created by xuxueli on 2016/3/2 21:14.*/
public class XxlJobExecutor  {private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);// ---------------------- param ----------------------private String adminAddresses;private String accessToken;private String appname;private String address;private String ip;private int port;private String logPath;private int logRetentionDays;public void setAdminAddresses(String adminAddresses) {this.adminAddresses = adminAddresses;}public void setAccessToken(String accessToken) {this.accessToken = accessToken;}public void setAppname(String appname) {this.appname = appname;}public void setAddress(String address) {this.address = address;}public void setIp(String ip) {this.ip = ip;}public void setPort(int port) {this.port = port;}public void setLogPath(String logPath) {this.logPath = logPath;}public void setLogRetentionDays(int logRetentionDays) {this.logRetentionDays = logRetentionDays;}// ---------------------- start + stop ----------------------public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);}public void destroy(){// destory executor-serverstopEmbedServer();// destory jobThreadRepositoryif (jobThreadRepository.size() > 0) {for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");// wait for job thread push result to callback queueif (oldJobThread != null) {try {oldJobThread.join();} catch (InterruptedException e) {logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);}}}jobThreadRepository.clear();}jobHandlerRepository.clear();// destory JobLogFileCleanThreadJobLogFileCleanThread.getInstance().toStop();// destory TriggerCallbackThreadTriggerCallbackThread.getInstance().toStop();}// ---------------------- admin-client (rpc invoker) ----------------------private static List<AdminBiz> adminBizList;private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}public static List<AdminBiz> getAdminBizList(){return adminBizList;}// ---------------------- executor-server (rpc provider) ----------------------private EmbedServer embedServer = null;private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}private void stopEmbedServer() {// stop provider factoryif (embedServer != null) {try {embedServer.stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}}// ---------------------- job handler repository ----------------------private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}// ---------------------- job thread repository ----------------------private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);Thread.ofVirtual().name("VT_XXLJOB").start(newJobThread);
//        newJobThread.start();
//        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public static JobThread removeJobThread(int jobId, String removeOldReason){JobThread oldJobThread = jobThreadRepository.remove(jobId);if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();return oldJobThread;}return null;}public static JobThread loadJobThread(int jobId){JobThread jobThread = jobThreadRepository.get(jobId);return jobThread;}}

EmbedServer
package com.xxl.job.core.server;import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.util.GsonTool;
import com.xxl.job.core.util.ThrowableUtil;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2020-04-11 21:25*/
public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = Thread.ofVirtual().name("VT_EmbedServer").unstarted(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ExecutorService bizThreadPool = Executors.newVirtualThreadPerTaskExecutor();//                ThreadPoolExecutor bizThreadPool2 = new ThreadPoolExecutor(
//                        0,
//                        200,
//                        60L,
//                        TimeUnit.SECONDS,
//                        new LinkedBlockingQueue<Runnable>(2000),
//                        new ThreadFactory() {
//                            @Override
//                            public Thread newThread(Runnable r) {
//                                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
//                            }
//                        },
//                        new RejectedExecutionHandler() {
//                            @Override
//                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
//                            }
//                        });try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);}} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}public void stop() throws Exception {// destroy server threadif (thread!=null && thread.isAlive()) {thread.interrupt();}// stop registrystopRegistry();logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");}// ---------------------- registry ----------------------/*** netty_http** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2015-11-24 22:25:15*/public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz;private String accessToken;
//        private ThreadPoolExecutor bizThreadPool;private ExecutorService bizThreadPool;public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ExecutorService bizThreadPool) {this.executorBiz = executorBiz;this.accessToken = accessToken;this.bizThreadPool = bizThreadPool;}@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);String requestData = msg.content().toString(CharsetUtil.UTF_8);String uri = msg.uri();HttpMethod httpMethod = msg.method();boolean keepAlive = HttpUtil.isKeepAlive(msg);String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invokebizThreadPool.execute(new Runnable() {@Overridepublic void run() {// do invokeObject responseObj = process(httpMethod, uri, requestData, accessTokenReq);// to jsonString responseJson = GsonTool.toJson(responseObj);// write responsewriteResponse(ctx, keepAlive, responseJson);}});}private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken!=null&& accessToken.trim().length()>0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {if ("/beat".equals(uri)) {return executorBiz.beat();} else if ("/idleBeat".equals(uri)) {IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);} else if ("/run".equals(uri)) {TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);} else if ("/kill".equals(uri)) {KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);} else if ("/log".equals(uri)) {LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}/*** write response*/private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write responseFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().close();      // beat 3N, close if idlelogger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");} else {super.userEventTriggered(ctx, evt);}}}// ---------------------- registry ----------------------public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}public void stopRegistry() {// stop registryExecutorRegistryThread.getInstance().toStop();}}

ExecutorRegistryThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 17/3/2.*/
public class ExecutorRegistryThread {private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);private static ExecutorRegistryThread instance = new ExecutorRegistryThread();public static ExecutorRegistryThread getInstance(){return instance;}private Thread registryThread;private volatile boolean toStop = false;public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});registryThread.setDaemon(true);registryThread.setName("VT_xxl-job, executor ExecutorRegistryThread");registryThread.start();}public void toStop() {toStop = true;// interrupt and waitif (registryThread != null) {registryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}}

JobLogFileCleanThread
package com.xxl.job.core.thread;import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;/*** job file clean thread** @author xuxueli 2017-12-29 16:23:43*/
public class JobLogFileCleanThread {private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);private static JobLogFileCleanThread instance = new JobLogFileCleanThread();public static JobLogFileCleanThread getInstance(){return instance;}private Thread localThread;private volatile boolean toStop = false;public void start(final long logRetentionDays){// limit min valueif (logRetentionDays < 3 ) {return;}localThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// clean log dir, over logRetentionDaysFile[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs!=null && childDirs.length>0) {// todayCalendar todayCal = Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY,0);todayCal.set(Calendar.MINUTE,0);todayCal.set(Calendar.SECOND,0);todayCal.set(Calendar.MILLISECOND,0);Date todayDate = todayCal.getTime();for (File childFile: childDirs) {// validif (!childFile.isDirectory()) {continue;}if (childFile.getName().indexOf("-") == -1) {continue;}// file create dateDate logFileCreateDate = null;try {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");logFileCreateDate = simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate == null) {continue;}if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");}});localThread.setDaemon(true);localThread.setName("VT_xxl-job, executor JobLogFileCleanThread");localThread.start();}public void toStop() {toStop = true;if (localThread == null) {return;}// interrupt and waitlocalThread.interrupt();try {localThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}

JobThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;/*** handler thread* @author xuxueli 2016-1-16 19:52:47*/
public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;private IJobHandler handler;private LinkedBlockingQueue<TriggerParam> triggerQueue;private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_IDprivate volatile boolean toStop = false;private String stopReason;private boolean running = false;    // if running jobprivate int idleTimes = 0;			// idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());}public IJobHandler getHandler() {return handler;}/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.debug(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}/*** kill job thread** @param stopReason*/public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}/*** is running job* @return*/public boolean isRunningOrHasQueue() {return running || triggerQueue.size()>0;}@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = Thread.ofVirtual().name("VT_XXL").start(futureTask);
//                            futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just executehandler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// commonmTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.debug(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
}

TriggerCallbackThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import com.xxl.job.core.util.JdkSerializeTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 16/7/22.*/
public class TriggerCallbackThread {private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);private static TriggerCallbackThread instance = new TriggerCallbackThread();public static TriggerCallbackThread getInstance(){return instance;}/*** job results callback queue*/private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();public static void pushCallBack(HandleCallbackParam callback){getInstance().callBackQueue.add(callback);logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());}/*** callback thread*/private Thread triggerCallbackThread;private Thread triggerRetryCallbackThread;private volatile boolean toStop = false;public void start() {// validif (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// callbacktriggerCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// normal callbackwhile(!toStop){try {HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");}});triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName("VT_xxl-job, executor TriggerCallbackThread");triggerCallbackThread.start();// retrytriggerRetryCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while(!toStop){try {retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");}});triggerRetryCallbackThread.setDaemon(true);triggerRetryCallbackThread.start();}public void toStop(){toStop = true;// stop callback, interrupt and waitif (triggerCallbackThread != null) {    // support empty admin addresstriggerCallbackThread.interrupt();try {triggerCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop retry, interrupt and waitif (triggerRetryCallbackThread != null) {triggerRetryCallbackThread.interrupt();try {triggerRetryCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}/*** do callback, will retry if error* @param callbackParamList*/private void doCallback(List<HandleCallbackParam> callbackParamList){boolean callbackRet = false;// callback, will retry if errorfor (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");callbackRet = true;break;} else {callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);}} catch (Exception e) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());}}if (!callbackRet) {appendFailCallbackFile(callbackParamList);}}/*** callback log*/private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){for (HandleCallbackParam callbackParam: callbackParamList) {String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());XxlJobContext.setXxlJobContext(new XxlJobContext(-1,null,logFileName,-1,-1));XxlJobHelper.log(logContent);}}// ---------------------- fail-callback file ----------------------private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){// validif (callbackParamList==null || callbackParamList.size()==0) {return;}// append filebyte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));if (callbackLogFile.exists()) {for (int i = 0; i < 100; i++) {callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));if (!callbackLogFile.exists()) {break;}}}FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);}private void retryFailCallbackFile(){// validFile callbackLogPath = new File(failCallbackFilePath);if (!callbackLogPath.exists()) {return;}if (callbackLogPath.isFile()) {callbackLogPath.delete();}if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {return;}// load and clear file, retryfor (File callbaclLogFile: callbackLogPath.listFiles()) {byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);// avoid empty fileif(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){callbaclLogFile.delete();continue;}List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);callbaclLogFile.delete();doCallback(callbackParamList);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/223155.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Wordpress插件WP-Statistics无法识别来访IP国家和城市处理方法

Wordpress插件WP-Statistics&#xff0c;可以识别网站访问者的IP物理地址&#xff0c;统计出城市、国家&#xff0c;但最近发现都显示unknown/未知&#xff1a; 更新GeoIP数据库到最新还是不行&#xff1a; 偶然找到了之前能用的数据库&#xff0c;恢复回去&#xff0c;竟然大…

安全认证【八】

文章目录 8. 安全认证8.1 访问控制概述8.2 认证管理8.3 授权管理8.4 准入控制 8. 安全认证 8.1 访问控制概述 Kubernetes作为一个分布式集群的管理工具&#xff0c;保证集群的安全性是其一个重要的任务。所谓的安全性其实就是保证对Kubernetes的各种客户端进行认证和鉴权操作…

使用华为云 CodeArts 自动化部署 Discuz实验指导

本实验将介绍基于华为云 CodeArts&#xff0c;更方便地编写 playbook 代码&#xff0c;甚至可以拷贝他人分享的代码仓库&#xff0c;同时基于自身 Ansible 组件实现快速运维部署&#xff0c;减轻工作负担&#xff0c;减少不必要的问题排查环节。 实验后台&#xff1a;开发者云…

pytorch-模型预测概率值为负数

在进行ocr识别模型预测的时候&#xff0c;发现预测的结果是正确的&#xff0c;但是概率值是负数&#xff1a; net_out net(img) #torch.Size([70, 1, 41]) logit, preds net_out.max(2) #41是类别 需要对类别取最大值 preds preds.transpose(1, 0).contiguous().view(-1) …

浏览器原理篇—渲染优化

渲染优化 通常一个页面有三个阶段&#xff1a;加载阶段、交互阶段和关闭阶段 加载阶段&#xff0c;是指从发出请求到渲染出完整页面的过程&#xff0c;影响到这个阶段的主要因素有网络和 JavaScript 脚本。交互阶段&#xff0c;主要是从页面加载完成到用户交互的整合过程&…

省时攻略:快速获得Creo安装包,释放创意天才!

不要再在网上浪费时间寻找Creo的安装包了&#xff0c;一键下载安装&#xff0c; 你要的一切都可以在这里找到&#xff01;我们深知在海量的信息中寻找合适的软件包并非易事&#xff0c;而且往往还伴随着繁琐的安装过程。然而&#xff0c;现在有了我们&#xff0c;一切变得轻松简…

WPF中使用ListView封装组合控件TreeView+DataGrid-粉丝专栏

wpf的功能非常强大&#xff0c;很多控件都是原生的&#xff0c;但是要使用TreeViewDataGrid的组合&#xff0c;就需要我们自己去封装实现。 我们需要的效果如图所示&#xff1a; 这2个图都是第三方控件自带的&#xff0c;并且都是收费使用。 现在我们就用原生的控件进行封装一…

【数据结构入门精讲 | 第十三篇】考研408、公司面试树专项练习(二)

在上一篇中我们进行了树的判断题、选择题、填空题专项练习&#xff0c;在这一篇中我们将进行编程题的相关练习。 目录 编程题R7-1 目录树R7-1 是否同一棵二叉搜索树R7-2 二叉搜索树的结构R7-3 平衡二叉树的根R7-1 完全二叉搜索树R7-1 修理牧场R7-2 嘴强王者R7-3 房屋分拆R7-4 动…

Django 简单图书管理系统

一、图书需求 1. 书籍book_index.html中有超链接&#xff1a;查看所有的书籍列表book_list.html页面 2. 书籍book_list.html中显示所有的书名&#xff0c;有超链接&#xff1a;查看本书籍详情book_detail.html(通过书籍ID)页面 3. 书籍book_detail.html中书的作者和出版社&…

Stable Diffusion系列(三):网络分类与选择

文章目录 网络分类模型基座模型衍生模型二次元模型2.5D模型写实风格模型 名称解读 VAELora嵌入文件放置界面使用 网络分类 当使用SD webui绘图时&#xff0c;为了提升绘图质量&#xff0c;可以多种网络混合使用&#xff0c;可选的网络包括了模型、VAE、超网络、Lora和嵌入。 …

Vue3视图渲染技术(2)

我是南城余&#xff01;阿里云开发者平台专家博士证书获得者&#xff01; 欢迎关注我的博客&#xff01;一同成长&#xff01; 一名从事运维开发的worker&#xff0c;记录分享学习。 专注于AI&#xff0c;运维开发&#xff0c;windows Linux 系统领域的分享&#xff01; 本…

kubernetes集群 应用实践 kafka部署

kubernetes集群 应用实践 kafka部署 零.1、环境说明 零.2、kafka架构说明 zookeeper在kafka集群中的作用 一、Broker注册 二、Topic注册 三、Topic Partition选主 四、生产者负载均衡 五、消费者负载均衡 一、持久化存储资源准备 1.1 创建共享目录 [rootnfsserver ~]# mkdir -…

医学实验室检验科LIS信息系统源码

实验室信息管理是专为医院检验科设计的一套实验室信息管理系统&#xff0c;能将实验仪器与计算机组成网络&#xff0c;使病人样品登录、实验数据存取、报告审核、打印分发&#xff0c;实验数据统计分析等繁杂的操作过程实现了智能化、自动化和规范化管理。 实验室管理系统功能介…

阿里云ECS配置IPv6后,如果无法访问该服务器上的网站,可检查如下配置

1、域名解析到这个IPv6地址,同一个子域名可以同时解析到IPv4和IPv6两个地址&#xff0c;这样就可以给网站配置ip4和ipv6双栈&#xff1b; 2、在安全组规则开通端口可访问&#xff0c;设定端口后注意授权对象要特殊设置“源:::/0” 3、到服务器nginx配置处&#xff0c;增加端口…

二值选择模型-以stata为工具

二值选择模型-以stata为工具 文章目录 1. 命令语法2. 模型 代码示例2.1 读取数据2.2 建立模型2.3 数据预测1. 命令语法 二值选择模型是计量经济学中常用的一种模型,用于处理因变量为二值(0或1)的情况。 这种模型通常用来研究个体在面临两个或多个离散选择时的决策行为。其中…

Mybatis之增删改查

目录 一、引言 二、Mybatis——增 举例&#xff1a;添加用户 三、Mybatis——删 举例&#xff1a;删除用户 四、Mybatis——改 举例&#xff1a;修改用户 五、Mybatis——查 六、注意 END&#xff1a; 一、引言 书接上回&#xff0c;我们在了解完mybatis之后&#xff0c;肯…

会员管理怎么做?

会员管理是企业运营的重要组成部分&#xff0c;它涉及到会员的招募、维护、激励、保留、转化等多个环节。下面&#xff0c;我们将结合具体的案例&#xff0c;详细介绍会员管理的具体做法。 首先&#xff0c;会员的招募是会员管理的第一步 企业需要通过各种方式吸引消费者成为会…

【大数据】NiFi 中的 Controller Service

NiFi 中的 Controller Service 1.Service 简介1.1 Controller Service 的配置1.1.1 SETTING 基础属性1.1.2 PROPERTIES 使用属性1.1.3 COMMENT 页签 1.2 Service 的使用范围 2.全局参数配置3.DBCPConnectionPool 的使用样例4.在 ExcuseGroovyScript 组件中使用 Service 1.Servi…

【Prometheus|报错】Out of bounds

【背景】进入Prometheus地址的9090端口&#xff0c;pushgateway&#xff08;0/1&#xff09;error : out of bounds 【排查分析】 1、out of bounds报错&#xff0c;是由于Prometheus向tsdb存数据出错&#xff0c;与最新存数据的时间序列有问题&#xff0c;有可能当前时间与最…

步兵 cocos2dx 加密和混淆

文章目录 摘要引言正文代码加密具体步骤代码加密具体步骤测试和配置阶段IPA 重签名操作步骤 总结参考资料 摘要 本篇博客介绍了针对 iOS 应用中的 Lua 代码进行加密和混淆的相关技术。通过对 Lua 代码进行加密处理&#xff0c;可以确保应用代码的安全性&#xff0c;同时提高性…