企业网站制作公司电话/深圳做网站seo
【优化】XXLJOB修改为使用虚拟线程
新建这几个目录 类, 去找项目对应的xxljob的源码
主要是将 new Thread 改为 虚拟线程
Thread.ofVirtual().name("VT").unstarted
以下代码是 xxljob 2.3.0版本 举一反三 去修改对应版本的代码
<!-- 定时任务--><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version></dependency>
XxlJobExecutor
package com.xxl.job.core.executor;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.server.EmbedServer;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** Created by xuxueli on 2016/3/2 21:14.*/
public class XxlJobExecutor {private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);// ---------------------- param ----------------------private String adminAddresses;private String accessToken;private String appname;private String address;private String ip;private int port;private String logPath;private int logRetentionDays;public void setAdminAddresses(String adminAddresses) {this.adminAddresses = adminAddresses;}public void setAccessToken(String accessToken) {this.accessToken = accessToken;}public void setAppname(String appname) {this.appname = appname;}public void setAddress(String address) {this.address = address;}public void setIp(String ip) {this.ip = ip;}public void setPort(int port) {this.port = port;}public void setLogPath(String logPath) {this.logPath = logPath;}public void setLogRetentionDays(int logRetentionDays) {this.logRetentionDays = logRetentionDays;}// ---------------------- start + stop ----------------------public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);}public void destroy(){// destory executor-serverstopEmbedServer();// destory jobThreadRepositoryif (jobThreadRepository.size() > 0) {for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");// wait for job thread push result to callback queueif (oldJobThread != null) {try {oldJobThread.join();} catch (InterruptedException e) {logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);}}}jobThreadRepository.clear();}jobHandlerRepository.clear();// destory JobLogFileCleanThreadJobLogFileCleanThread.getInstance().toStop();// destory TriggerCallbackThreadTriggerCallbackThread.getInstance().toStop();}// ---------------------- admin-client (rpc invoker) ----------------------private static List<AdminBiz> adminBizList;private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}public static List<AdminBiz> getAdminBizList(){return adminBizList;}// ---------------------- executor-server (rpc provider) ----------------------private EmbedServer embedServer = null;private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}private void stopEmbedServer() {// stop provider factoryif (embedServer != null) {try {embedServer.stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}}// ---------------------- job handler repository ----------------------private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}// ---------------------- job thread repository ----------------------private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);Thread.ofVirtual().name("VT_XXLJOB").start(newJobThread);
// newJobThread.start();
// logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public static JobThread removeJobThread(int jobId, String removeOldReason){JobThread oldJobThread = jobThreadRepository.remove(jobId);if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();return oldJobThread;}return null;}public static JobThread loadJobThread(int jobId){JobThread jobThread = jobThreadRepository.get(jobId);return jobThread;}}
EmbedServer
package com.xxl.job.core.server;import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.util.GsonTool;
import com.xxl.job.core.util.ThrowableUtil;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2020-04-11 21:25*/
public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = Thread.ofVirtual().name("VT_EmbedServer").unstarted(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ExecutorService bizThreadPool = Executors.newVirtualThreadPerTaskExecutor();// ThreadPoolExecutor bizThreadPool2 = new ThreadPoolExecutor(
// 0,
// 200,
// 60L,
// TimeUnit.SECONDS,
// new LinkedBlockingQueue<Runnable>(2000),
// new ThreadFactory() {
// @Override
// public Thread newThread(Runnable r) {
// return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
// }
// },
// new RejectedExecutionHandler() {
// @Override
// public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
// }
// });try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);}} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}public void stop() throws Exception {// destroy server threadif (thread!=null && thread.isAlive()) {thread.interrupt();}// stop registrystopRegistry();logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");}// ---------------------- registry ----------------------/*** netty_http** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2015-11-24 22:25:15*/public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz;private String accessToken;
// private ThreadPoolExecutor bizThreadPool;private ExecutorService bizThreadPool;public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ExecutorService bizThreadPool) {this.executorBiz = executorBiz;this.accessToken = accessToken;this.bizThreadPool = bizThreadPool;}@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);String requestData = msg.content().toString(CharsetUtil.UTF_8);String uri = msg.uri();HttpMethod httpMethod = msg.method();boolean keepAlive = HttpUtil.isKeepAlive(msg);String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invokebizThreadPool.execute(new Runnable() {@Overridepublic void run() {// do invokeObject responseObj = process(httpMethod, uri, requestData, accessTokenReq);// to jsonString responseJson = GsonTool.toJson(responseObj);// write responsewriteResponse(ctx, keepAlive, responseJson);}});}private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken!=null&& accessToken.trim().length()>0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {if ("/beat".equals(uri)) {return executorBiz.beat();} else if ("/idleBeat".equals(uri)) {IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);} else if ("/run".equals(uri)) {TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);} else if ("/kill".equals(uri)) {KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);} else if ("/log".equals(uri)) {LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}/*** write response*/private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write responseFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().close(); // beat 3N, close if idlelogger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");} else {super.userEventTriggered(ctx, evt);}}}// ---------------------- registry ----------------------public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}public void stopRegistry() {// stop registryExecutorRegistryThread.getInstance().toStop();}}
ExecutorRegistryThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 17/3/2.*/
public class ExecutorRegistryThread {private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);private static ExecutorRegistryThread instance = new ExecutorRegistryThread();public static ExecutorRegistryThread getInstance(){return instance;}private Thread registryThread;private volatile boolean toStop = false;public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});registryThread.setDaemon(true);registryThread.setName("VT_xxl-job, executor ExecutorRegistryThread");registryThread.start();}public void toStop() {toStop = true;// interrupt and waitif (registryThread != null) {registryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}}
JobLogFileCleanThread
package com.xxl.job.core.thread;import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;/*** job file clean thread** @author xuxueli 2017-12-29 16:23:43*/
public class JobLogFileCleanThread {private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);private static JobLogFileCleanThread instance = new JobLogFileCleanThread();public static JobLogFileCleanThread getInstance(){return instance;}private Thread localThread;private volatile boolean toStop = false;public void start(final long logRetentionDays){// limit min valueif (logRetentionDays < 3 ) {return;}localThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// clean log dir, over logRetentionDaysFile[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs!=null && childDirs.length>0) {// todayCalendar todayCal = Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY,0);todayCal.set(Calendar.MINUTE,0);todayCal.set(Calendar.SECOND,0);todayCal.set(Calendar.MILLISECOND,0);Date todayDate = todayCal.getTime();for (File childFile: childDirs) {// validif (!childFile.isDirectory()) {continue;}if (childFile.getName().indexOf("-") == -1) {continue;}// file create dateDate logFileCreateDate = null;try {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");logFileCreateDate = simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate == null) {continue;}if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");}});localThread.setDaemon(true);localThread.setName("VT_xxl-job, executor JobLogFileCleanThread");localThread.start();}public void toStop() {toStop = true;if (localThread == null) {return;}// interrupt and waitlocalThread.interrupt();try {localThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}
JobThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;/*** handler thread* @author xuxueli 2016-1-16 19:52:47*/
public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;private IJobHandler handler;private LinkedBlockingQueue<TriggerParam> triggerQueue;private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_IDprivate volatile boolean toStop = false;private String stopReason;private boolean running = false; // if running jobprivate int idleTimes = 0; // idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());}public IJobHandler getHandler() {return handler;}/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.debug(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}/*** kill job thread** @param stopReason*/public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}/*** is running job* @return*/public boolean isRunningOrHasQueue() {return running || triggerQueue.size()>0;}@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = Thread.ofVirtual().name("VT_XXL").start(futureTask);
// futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just executehandler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// commonmTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.debug(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
}
TriggerCallbackThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import com.xxl.job.core.util.JdkSerializeTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 16/7/22.*/
public class TriggerCallbackThread {private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);private static TriggerCallbackThread instance = new TriggerCallbackThread();public static TriggerCallbackThread getInstance(){return instance;}/*** job results callback queue*/private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();public static void pushCallBack(HandleCallbackParam callback){getInstance().callBackQueue.add(callback);logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());}/*** callback thread*/private Thread triggerCallbackThread;private Thread triggerRetryCallbackThread;private volatile boolean toStop = false;public void start() {// validif (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// callbacktriggerCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// normal callbackwhile(!toStop){try {HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");}});triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName("VT_xxl-job, executor TriggerCallbackThread");triggerCallbackThread.start();// retrytriggerRetryCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while(!toStop){try {retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");}});triggerRetryCallbackThread.setDaemon(true);triggerRetryCallbackThread.start();}public void toStop(){toStop = true;// stop callback, interrupt and waitif (triggerCallbackThread != null) { // support empty admin addresstriggerCallbackThread.interrupt();try {triggerCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop retry, interrupt and waitif (triggerRetryCallbackThread != null) {triggerRetryCallbackThread.interrupt();try {triggerRetryCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}/*** do callback, will retry if error* @param callbackParamList*/private void doCallback(List<HandleCallbackParam> callbackParamList){boolean callbackRet = false;// callback, will retry if errorfor (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");callbackRet = true;break;} else {callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);}} catch (Exception e) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());}}if (!callbackRet) {appendFailCallbackFile(callbackParamList);}}/*** callback log*/private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){for (HandleCallbackParam callbackParam: callbackParamList) {String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());XxlJobContext.setXxlJobContext(new XxlJobContext(-1,null,logFileName,-1,-1));XxlJobHelper.log(logContent);}}// ---------------------- fail-callback file ----------------------private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){// validif (callbackParamList==null || callbackParamList.size()==0) {return;}// append filebyte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));if (callbackLogFile.exists()) {for (int i = 0; i < 100; i++) {callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));if (!callbackLogFile.exists()) {break;}}}FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);}private void retryFailCallbackFile(){// validFile callbackLogPath = new File(failCallbackFilePath);if (!callbackLogPath.exists()) {return;}if (callbackLogPath.isFile()) {callbackLogPath.delete();}if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {return;}// load and clear file, retryfor (File callbaclLogFile: callbackLogPath.listFiles()) {byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);// avoid empty fileif(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){callbaclLogFile.delete();continue;}List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);callbaclLogFile.delete();doCallback(callbackParamList);}}}
相关文章:

【优化】XXLJOB修改为使用虚拟线程
【优化】XXLJOB修改为使用虚拟线程 新建这几个目录 类, 去找项目对应的xxljob的源码 主要是将 new Thread 改为 虚拟线程 Thread.ofVirtual().name("VT").unstarted 以下代码是 xxljob 2.3.0版本 举一反三 去修改对应版本的代码 <!-- 定…...

金蝶Apusic应用服务器 loadTree JNDI注入漏洞复现(QVD-2023-48297)
0x01 产品简介 金蝶Apusic应用服务器是一款企业级应用服务器,支持Java EE技术,适用于各种商业环境。 0x02 漏洞概述 由于金蝶Apusic应用服务器权限验证不当,导致攻击者可以向loadTree接口执行JNDI注入,造成远程代码执行漏洞。利用该漏洞需低版本JDK。(漏洞比较旧,8月份…...

PromptNER: Prompt Locating and Typing for Named Entity Recognition
原文链接: https://aclanthology.org/2023.acl-long.698.pdf ACL 2023 介绍 问题 目前将prompt方法应用在ner中主要有两种方法:对枚举的span类型进行预测,或者通过构建特殊的prompt来对实体进行定位。但作者认为这些方法存在以下问题…...

QT编写应用的界面自适应分辨率的解决方案
博主在工作机上完成QT软件开发(控件大小与字体大小比例正常),部署到客户机后,发现控件大小与字体大小比例失调,具体表现为控件装不下字体,即字体显示不全,推测是软件不能自适应分辨率导致的。 文…...

Kubernetes pod ip 暴露
1. k8s pod 和 service 网络暴露 借助 iptables 的路由转发功能,打通k8s集群内的pod和service网络,与外部网络联通 # 查看集群的 pod 网段和 service 网段 kubectl -n kube-system describe cm kubeadm-config networking:dnsDomain: cluster.localpod…...

442. 数组中重复的数据
数组中重复的数据 描述 : 给你一个长度为 n 的整数数组 nums ,其中 nums 的所有整数都在范围 [1, n] 内,且每个整数出现 一次 或 两次 。请你找出所有出现 两次 的整数,并以数组形式返回。 你必须设计并实现一个时间复杂度为 O(n) 且仅使用…...

Qt/C++视频监控Onvif工具/组播搜索/显示监控画面/图片参数调节/OSD管理/祖传原创
一、前言 能够写出简单易用而又不失功能强大的组件,一直是我的追求,简单主要体现在易用性,不能搞一些繁琐的流程和一些极难使用的API接口,或者一些看不懂的很难以理解的函数名称,一定是要越简单越好。功能强大主要体现…...

word2003 open word2007+
Win 7 C:\Documents and Settings\Administrator\Application Data\Microsoft\Templates 还是不行,重装office2003吧,再安装转换插件,但是再高版本好像没转换工具...

windows安装、基本使用vim
标题:windows安装、基本使用vim 1.下载并安装GVIM 百度网盘链接 提取码:2apr 进入安装界面,如下,勾选 其它都是默认即可 参考; 2.在powershell中使用vim 参考blog:window10安装vim编辑器 安装好后&…...

【SpringBoot快速入门】(1)SpringBoot的开发步骤、工程构建方法以及工程的快速启动详细讲解
目录 SpringBoot简介1 SpringBoot快速入门1.1 开发步骤1.1.1 创建新模块1.1.2 创建 Controller1.1.3 启动服务器1.1.4 进行测试 2 对比3 官网构建工程3.1 进入SpringBoot官网3.2 选择依赖3.3 生成工程 4 SpringBoot工程快速启动4.1 问题导入4.2 打包4.3 启动 之前我们已经学习的…...

Day69力扣打卡
打卡记录...

机器学习:手撕 AlphaGo(一)
图 1-1: AphaGo 结构概览 1. 前言 AlphaGo 是一个非常经典的模型,不论从影响力还是模型设计上。它的技术迭代演进路径:AlphaGo,AlphaGoZero,AlphaZero,MuZero 更是十分精彩。相信有很多同学因为听了 AlphaGo 的故事对…...

ElasticSearch学习篇9_文本相似度计算方法现状以及基于改进的 Jaccard 算法代码实现
背景 XOP亿级别题库的试题召回以及搜题的举一反三业务场景都涉及使用文本相似搜索技术,学习此方面技术以便更好的服务于业务场景。 目前基于集合的Jaccard算法以及基于编辑距离的Levenshtein在计算文本相似度场景中有着各自的特点,为了优化具体的计算时…...

大创项目推荐 深度学习+python+opencv实现动物识别 - 图像识别
文章目录 0 前言1 课题背景2 实现效果3 卷积神经网络3.1卷积层3.2 池化层3.3 激活函数:3.4 全连接层3.5 使用tensorflow中keras模块实现卷积神经网络 4 inception_v3网络5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 *…...

Debezium系列之:Flink SQL消费Debezium数据格式,同步数据到下游存储系统
Debezium系列之:Flink SQL消费Debezium数据格式,同步数据到下游存储系统 一、Debezium二、依赖三、使用Debezium Format四、可用元数据五、Format参数六、重复的变更事件七、消费 Debezium Postgres Connector 产生的数据八、数据类型映射一、Debezium Debezium 是一个 CDC(…...

webrtc支持的最小宽度和高度
代码在:h264/sps_parser.cc // // IMPORTANT ONES! Now were getting to resolution. First we read the pic // width/height in macroblocks (16x16), which gives us the base resolution, // and then we continue on until we hit the frame crop offsets, wh…...

虚拟机对象的创建
虚拟机对象 虚拟机在Java堆中对象分配、布局和访问的访问过程 对象的创建 Java对象的创建步骤: 1)类加载检查 虚拟机遇到一条 new 指令时,首先将去检查这个指令的参数是否能在常量池中定位到这个类的符号引用,并且检查这个符号…...

阿里云吴结生:云计算是企业实现数智化的阶梯
云布道师 近年来,越来越多人意识到,我们正处在一个数据爆炸式增长的时代。IDC 预测 2027 年全球产生的数据量将达到 291 ZB,与 2022 年相比,增长了近 2 倍。其中 75% 的数据来自企业,每一个现代化的企业都是一家数据公…...

MySQL——复合查询
目录 一.基本查询回顾 二. 多表查询 三.自连接 四.子查询 1.单行子查询 2.多行子查询 3.多列子查询 4.在from子句中使用子查询 5.合并查询 一.基本查询回顾 准备数据库: 查询工资高于500或岗位为MANAGER的雇员,同时还要满足他们的姓名首字母为…...

mysql 23-3day 数据库授权(DCL)
目录 创建一个用户 并授权(grant)设置最大连接数客户端链接服务器创建用户删除用户修改用户修改密码root修改自己密码授予 mysql 权限收回权限收回权限刷新一下授权表mydql 知识点确保 mysql 用户为普通用户删除空口令账号安全建议 创建一个用户 并授权&…...

OpenHarmony之内核层解析~
OpenHarmony简介 技术架构 OpenHarmony整体遵从分层设计,从下向上依次为:内核层、系统服务层、框架层和应用层。系统功能按照“系统 > 子系统 > 组件”逐级展开,在多设备部署场景下,支持根据实际需求裁剪某些非必要的组件…...

Chatgpt如何共享可以防止封号!
ChatGPT 是一个基于 GPT-3.5/GPT-4 模型的对话系统,它主要用于处理自然语言对话。通过训练模型来模拟人类的语言行为,ChatGPT 可以通过文本交流与用户互动。每个新版本的 GPT 通常都会在模型规模、性能和其他方面有一些改进。在目前免费版GPT-3.5 中&…...

智能优化算法应用:基于社交网络算法3D无线传感器网络(WSN)覆盖优化 - 附代码
智能优化算法应用:基于社交网络算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于社交网络算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.社交网络算法4.实验参数设定5.算法结果6.…...

thinkphp+vue+mysql酒店客房管理系统 b1g8z
本系统包括前台界面、用户界面和管理员界面、员工界面。在前台界面里游客和用户可以浏览客房信息、公告信息等,用户可以预定客房,在用户中心界面里,用户可以管理预定信息,管理员负责用户预定的审核以及客房的发布、用户的入住等。…...

nodejs+vue+ElementUi摄影作品图片分享工作室管理系统
第1周 2.21~2.27 查阅资料,学习vscode开发平台和vue框架技术 第2周 2.28~3.6 对软件功能需求进行分析, 软件功能模块划分及软件界面设计 第3周 3.7~3.13 撰写并提交毕业设计开题报告、英文资料翻译 第4周 3.14࿵…...

详解FreeRTOS:专栏总述
目录 1、理论篇 2、基础篇 3、进阶篇 4、高级篇 5、拓展篇 本专栏基于FreeRTOS底层源码介绍了嵌入式实时操作系统的概念,FreeRTOS任务创建、任务调度、任务同步与消息传递,软件定时器、事件通知等知识。 主要分为5方面内容:理论篇、基础…...

在 linux 服务器上安装Redis数据库
先打开我们的Linux服务器 终端执行 安装redis sudo yum install redis然后 他会提示你要占多少磁盘空间 例如 我这里是 1.7 M 没问题就 y 然后回车就可以了 然后 我们这里执行 redis-cli --version这样 就能看到版本了 然后 我们可以根据版本选择启动命令 使用systemctl命…...

阿里云经济型、通用算力型、计算型、通用型、内存型云服务器最新活动报价
阿里云作为国内领先的云计算服务提供商,提供了多种规格的云服务器供用户选择。为了满足不同用户的需求,阿里云推出了经济型、通用算力型、计算型、通用型和内存型等不同类型的云服务器。下面将详细介绍这些云服务器的最新活动报价。 一、阿里云特惠云服…...

回溯算法 典型习题
vector<vector<int>> res; vector<int> path;void dfs() {if (递归终止条件){res.push_back(path);return;}// 递归方向for (xxx) {path.push_back(val);dfs();path.pop_back();} } 1.涉及枚举 2.不确定 for 循环的次数 总结 枚举各种可能的情况。 0.直接…...

14. 从零用Rust编写正反向代理, HTTP文件服务器的实现过程及参数
wmproxy wmproxy是由Rust编写,已实现http/https代理,socks5代理, 反向代理,静态文件服务器,内网穿透,配置热更新等, 后续将实现websocket代理等,同时会将实现过程分享出来ÿ…...