Apache DolphinScheduler-1.3.9源码分析(一)
引言
随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。
在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,介绍 Master 启动以及调度流程。
通过这些分析,开发者可以更好地理解 DolphinScheduler 的工作机制,并在实际使用中更高效地进行二次开发或优化。
Master Server启动
启动流程图
Master调度工作流流程图
MasterServer启动方法
public void run() {// init remoting serverNettyServerConfig serverConfig = new NettyServerConfig();serverConfig.setListenPort(masterConfig.getListenPort());this.nettyRemotingServer = new NettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());this.nettyRemotingServer.start();// self tolerantthis.zkMasterClient.start();this.zkMasterClient.setStoppable(this);// scheduler startthis.masterSchedulerService.start();// start QuartzExecutors// what system should do if exceptiontry {logger.info("start Quartz server...");QuartzExecutors.getInstance().start();} catch (Exception e) {try {QuartzExecutors.getInstance().shutdown();} catch (SchedulerException e1) {logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);}logger.error("start Quartz failed", e);}/*** register hooks, which are called before the process exits*/Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (Stopper.isRunning()) {close("shutdownHook");}}));}
- nettyServer会注册三种Command
- TASK_EXECUTE_ACK:Worker在接收到Master执行任务的请求后,会给Master发送一条Ack Command,告诉Master已经开始执行Task了。
- TASK_EXECUTE_RESPONSE:Worker在执行完Task之后,会给Master发送一条Response Command,告诉Master任务调度/执行结果。
- TASK_KILL_RESPONSE:Master接收到Task停止的请求会,会给Worker发送TASK_KILL_REQUEST Command,之后Worker会把Task_KILL_RESPONSE Command返回给Master。
- 启动调度和定时器。
- 添加ShutdownHook,关闭资源。
Master 配置文件
master.listen.port=5678# 限制Process Instance并发调度的线程数
master.exec.threads=100# 限制每个ProcessInstance可以执行的任务数
master.exec.task.num=20# 每一批次可以分发的任务数
master.dispatch.task.num=3# master需要选择一个稳定的worker去执行任务
# 算法有:Random,RoundRobin,LowerWeight。默认是LowerWeight
master.host.selector=LowerWeight# master需要向Zookeeper发送心跳,单位:秒
master.heartbeat.interval=10# master提交任务失败,重试次数
master.task.commit.retryTimes=5# master提交任务失败,重试时间间隔
master.task.commit.interval=1000# master最大cpu平均负载,只有当系统cpu平均负载还没有达到这个值,master才能调度任务
# 默认值为-1,系统cpu核数 * 2
master.max.cpuload.avg=-1# master为其他进程保留内存,只有当系统可用内存大于这个值,master才能调度
# 默认值0.3G
master.reserved.memory=0.3
Master Scheduler启动
MasterSchedulerService初始化方法
public void init(){// masterConfig.getMasterExecThreads(),master.properties里master.exec.threads=100// 该线程池的核心线程数和最大线程数为100this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());NettyClientConfig clientConfig = new NettyClientConfig();this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
MasterSchedulerService启动方法
public void run() {logger.info("master scheduler started");while (Stopper.isRunning()){try {// 这个方法是用来检查master cpu load和memory,判断master是否还有资源进行调度// 如果不能调度,Sleep 1 秒种boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());if(!runCheckFlag) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {// 这里才是真正去执行调度的方法scheduleProcess();}} catch (Exception e) {logger.error("master scheduler thread error", e);}}
}
MasterSchedulerService调度方法
private void scheduleProcess() throws Exception {InterProcessMutex mutex = null;try {// 阻塞式获取分布式锁mutex = zkMasterClient.blockAcquireMutex();// 获取线程池的活跃线程数int activeCount = masterExecService.getActiveCount();// make sure to scan and delete command table in one transaction// 获取其中一个command,必须保证操作都在一个事务里Command command = processService.findOneCommand();if (command != null) {logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());try{// 获取ProcessInstance,// 这个方法会根据master.exec.threads配置和活跃线程数来判断是否可以调度processInstanceProcessInstance processInstance = processService.handleCommand(logger,getLocalAddress(),this.masterConfig.getMasterExecThreads() - activeCount, command);if (processInstance != null) {logger.info("start master exec thread , split DAG ...");masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));}}catch (Exception e){logger.error("scan command error ", e);processService.moveToErrorCommand(command, e.toString());}} else{//indicate that no command ,sleep for 1sThread.sleep(Constants.SLEEP_TIME_MILLIS);}} finally{// 释放分布式锁zkMasterClient.releaseMutex(mutex);}
}
ProcessService处理Command的方法
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {// 这里是去构造ProcessInstanceProcessInstance processInstance = constructProcessInstance(command, host);//cannot construct process instance, return null;if(processInstance == null){logger.error("scan command, command parameter is error: {}", command);moveToErrorCommand(command, "process instance is null");return null;}// 这里是检测当前剩余线程数是否大于等于该ProcessDefinition及其所有子Process的数量// 如果检测不通过,process instance的状态变为wait thread.并且返回空的process instanceif(!checkThreadNum(command, validThreadNum)){logger.info("there is not enough thread for this command: {}", command);return setWaitingThreadProcess(command, processInstance);}processInstance.setCommandType(command.getCommandType());processInstance.addHistoryCmd(command.getCommandType());saveProcessInstance(processInstance);this.setSubProcessParam(processInstance);delCommandByid(command.getId());return processInstance;
}
MasterExecThread初始化方法
public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){this.processService = processService;this.processInstance = processInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);// master.properties文件里的master.task.exec.numint masterTaskExecNum = masterConfig.getMasterExecTaskNum();this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",masterTaskExecNum);this.nettyRemotingClient = nettyRemotingClient;
}
MasterExecThread启动方法
public void run() {// 省略...try {if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){// 补数逻辑... 暂不看executeComplementProcess();}else{// 执行task方法executeProcess();}}catch (Exception e){logger.error("master exec thread exception", e);logger.error("process execute failed, process id:{}", processInstance.getId());processInstance.setState(ExecutionStatus.FAILURE);processInstance.setEndTime(new Date());processService.updateProcessInstance(processInstance);}finally {taskExecService.shutdown();}
}private void executeProcess() throws Exception {// 前置prepareProcess();// 执行runProcess();// 后置endProcess();
}private void runProcess(){// 从根task开始提交submitPostNode(null);boolean sendTimeWarning = false;while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){// 省略部分代码...// 根据cpu load avg和Memorry判断是否可以调度if(canSubmitTaskToQueue()){submitStandByTask();}try {Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (InterruptedException e) {logger.error(e.getMessage(),e);}updateProcessInstanceState();}logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}// 获取可以并行的task
/**
* task 1 -> task 2 -> task3
* task 4 -> task 5
* task 6
* task 1,task4,task6可以并行跑
*/
private void submitPostNode(String parentNodeName){Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);List<TaskInstance> taskInstances = new ArrayList<>();for(String taskNode : submitTaskNodeList){taskInstances.add(createTaskInstance(processInstance, taskNode,dag.getNode(taskNode)));}// if previous node success , post node submitfor(TaskInstance task : taskInstances){if(readyToSubmitTaskQueue.contains(task)){continue;}if(completeTaskList.containsKey(task.getName())){logger.info("task {} has already run success", task.getName());continue;}if(task.getState().typeIsPause() || task.getState().typeIsCancel()){logger.info("task {} stopped, the state is {}", task.getName(), task.getState());}else{// task添加到priorityQueueaddTaskToStandByList(task);}}
}/*** handling the list of tasks to be submitted*/
private void submitStandByTask(){try {int length = readyToSubmitTaskQueue.size();for (int i=0;i<length;i++) {// 从队列里面取task, 提交给worker执行TaskInstance task = readyToSubmitTaskQueue.peek();// 先判断task的前置依赖有没有都运行成功,如果运行成功,在提交该task运行// 如果运行失败,或者没有执行,则不提交DependResult dependResult = getDependResultForTask(task);if(DependResult.SUCCESS == dependResult){if(retryTaskIntervalOverTime(task)){submitTaskExec(task);removeTaskFromStandbyList(task);}}else if(DependResult.FAILED == dependResult){// if the dependency fails, the current node is not submitted and the state changes to failure.dependFailedTask.put(task.getName(), task);removeTaskFromStandbyList(task);logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);} else if (DependResult.NON_EXEC == dependResult) {// for some reasons(depend task pause/stop) this task would not be submitremoveTaskFromStandbyList(task);logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);}}} catch (Exception e) {logger.error("submit standby task error",e);}
}/**
* 创建TaskExecThread
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {MasterBaseTaskExecThread abstractExecThread = null;if(taskInstance.isSubProcess()){abstractExecThread = new SubProcessTaskExecThread(taskInstance);}else if(taskInstance.isDependTask()){abstractExecThread = new DependentTaskExecThread(taskInstance);}else if(taskInstance.isConditionsTask()){abstractExecThread = new ConditionsTaskExecThread(taskInstance);}else {abstractExecThread = new MasterTaskExecThread(taskInstance);}Future<Boolean> future = taskExecService.submit(abstractExecThread);activeTaskNode.putIfAbsent(abstractExecThread, future);return abstractExecThread.getTaskInstance();
}
MasterBaseTaskExecThread
MasterBaseTaskExecThread
是SubProcessTaskExecThread
,DependentTaskExecThread
,ConditionsTaskExecThread
,MasterTaskExecThread
的父类,实现Callable接口。
SubProcessTaskExecThread
任务实例不会下发到worker节点执行,在submitTask(TaskInstance taskInstance)方法中,针对子流程,会增加一条子流程实例命令,然后在waitTaskQuit方法中循环等待子流程执行完成。在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。
DependentTaskExecThread
Dependent 节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。
ConditionsTaskExecThrea
Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。
MasterTaskExecThread
将任务实例下发到worker节点执行,并在waitTaskQuit方法中循环等待任务实例执行完成,任务完成后则即出。例如SQKL,Shell等任务类型。
MasterBaseTaskExecThread初始化方法
public MasterBaseTaskExecThread(TaskInstance taskInstance){this.processService = SpringApplicationContext.getBean(ProcessService.class);this.alertDao = SpringApplicationContext.getBean(AlertDao.class);this.cancel = false;this.taskInstance = taskInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);initTaskParams();
}
MasterBaseTaskExecThread执行方法
@Override
public Boolean call() throws Exception {this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());return submitWaitComplete(); // 由各子类实现
}
MasterBaseTaskExecThread公共方法
submit()
protected TaskInstance submit(){// 提交任务重试次数. master.task.commit.retryTimes=5Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();// 提交任务失败,重试间隔时间 master.task.commit.interval=1000Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();int retryTimes = 1;boolean submitDB = false;boolean submitTask = false;TaskInstance task = null;while (retryTimes <= commitRetryTimes){try {if(!submitDB){// 持久化TaskInstance到数据库task = processService.submitTask(taskInstance);if(task != null && task.getId() != 0){submitDB = true;}}if(submitDB && !submitTask){// 分发任务到Woroker执行submitTask = dispatchTask(task);}if(submitDB && submitTask){return task;}if(!submitDB){logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);}else if(!submitTask){logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);}Thread.sleep(commitRetryInterval);} catch (Exception e) {logger.error("task commit to mysql and dispatcht task failed",e);}retryTimes += 1;}return task;
}
dispatchTask(TaskInstance task)
public Boolean dispatchTask(TaskInstance taskInstance) {try{// 如果是子流程,条件任务,依赖任务,直接返回true,不提交给worker执行if(taskInstance.isConditionsTask()|| taskInstance.isDependTask()|| taskInstance.isSubProcess()){return true;}if(taskInstance.getState().typeIsFinished()){logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));return true;}// task cannot submit when runningif(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));return true;}logger.info("task ready to submit: {}", taskInstance);/*** taskPriority*/TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),processInstance.getId(),taskInstance.getProcessInstancePriority().getCode(),taskInstance.getId(),org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);// 放入TaskPriorityQueue中,// org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl用于消费,从队列里取出TaskInstance,提交给Worker执行taskUpdateQueue.put(taskPriority);logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );return true;}catch (Exception e){logger.error("submit task Exception: ", e);logger.error("task error : %s", JSONUtils.toJson(taskInstance));return false;}
}
MasterTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {Boolean result = false;// 提交任务this.taskInstance = submit();if(this.taskInstance == null){logger.error("submit task instance to mysql and queue failed , please check and fix it");return result;}if(!this.taskInstance.getState().typeIsFinished()) {// 等待任务执行结果result = waitTaskQuit();}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );return result;
}
waitTaskQuit()
public Boolean waitTaskQuit(){// query new statetaskInstance = processService.findTaskInstanceById(taskInstance.getId());logger.info("wait task: process id: {}, task id:{}, task name:{} complete",this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());while (Stopper.isRunning()){try {if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// task instance add queue , waiting worker to killif(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){cancelTaskInstance();}if(processInstance.getState() == ExecutionStatus.READY_PAUSE){pauseTask();}// task instance finishedif (taskInstance.getState().typeIsFinished()){// if task is final result , then remove taskInstance from cache// taskInstanceCacheManager其实现类为:org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl// taskInstance在触发ack和response Command会被添加到taskInstanceCache里taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());break;}if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();}// updateProcessInstance task instancetaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}
SubProcessTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {Boolean result = false;try{// submit task instancethis.taskInstance = submit();if(taskInstance == null){logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it");return result;}setTaskInstanceState();waitTaskQuit();subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());// at the end of the subflow , the task state is changed to the subflow stateif(subProcessInstance != null){if(subProcessInstance.getState() == ExecutionStatus.STOP){this.taskInstance.setState(ExecutionStatus.KILL);}else{this.taskInstance.setState(subProcessInstance.getState());}}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );result = true;}catch (Exception e){logger.error("exception: ",e);if (null != taskInstance) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}return result;
}
waitTaskQuit()
private void waitTaskQuit() throws InterruptedException {logger.info("wait sub work flow: {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",this.taskInstance.getName(),this.taskInstance.getState(),this.processInstance.getState());return;}while (Stopper.isRunning()) {// waiting for subflow process instance establishmentif (subProcessInstance == null) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);if(!setTaskInstanceState()){continue;}}subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();handleTimeoutFailed();}updateParentProcessState();if (subProcessInstance.getState().typeIsFinished()){break;}if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){// parent process "ready to pause" , child process "pause"pauseSubProcess();}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){// parent Process "Ready to Cancel" , subflow "Cancel"stopSubProcess();}Thread.sleep(Constants.SLEEP_TIME_MILLIS);}
}
ConditionsTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {try{this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();logger.info("dependent task start");// 等待判断waitTaskQuit();// 更新最终依赖结果updateTaskState();}catch (Exception e){logger.error("conditions task run exception" , e);}return true;
}
waitTaskQuit
private void waitTaskQuit() {List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());for(TaskInstance task : taskInstances){completeTaskList.putIfAbsent(task.getName(), task.getState());}// 获取所有依赖结果List<DependResult> modelResultList = new ArrayList<>();for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){List<DependResult> itemDependResult = new ArrayList<>();for(DependentItem item : dependentTaskModel.getDependItemList()){itemDependResult.add(getDependResultForItem(item));}DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);modelResultList.add(modelResult);}// 根据逻辑运算符,合并依赖结果conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList);logger.info("the conditions task depend result : {}", conditionResult);
}
DependentTaskExecThread
submitWaitComplete()
public Boolean submitWaitComplete() {try{logger.info("dependent task start");this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();initDependParameters();waitTaskQuit();updateTaskState();}catch (Exception e){logger.error("dependent task run exception" , e);}return true;
}
waitTaskQuit()
private Boolean waitTaskQuit() {logger.info("wait depend task : {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("task {} already complete. task state:{}",this.taskInstance.getName(),this.taskInstance.getState());return true;}while (Stopper.isRunning()) {try{if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// 省略部分代码// allDependentTaskFinish()等待所有依赖任务执行结束if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){break;}// update process tasktaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}
TaskPriorityQueueConsumer
@Override
public void run() {List<TaskPriority> failedDispatchTasks = new ArrayList<>();while (Stopper.isRunning()){try {// 每一批次分发任务数量,master.dispatch.task.num = 3int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();failedDispatchTasks.clear();for(int i = 0; i < fetchTaskNum; i++){if(taskPriorityQueue.size() <= 0){Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// if not task , blocking here// 从队列里面获取taskTaskPriority taskPriority = taskPriorityQueue.take();// 分发给worker执行boolean dispatchResult = dispatch(taskPriority);if(!dispatchResult){failedDispatchTasks.add(taskPriority);}}if (!failedDispatchTasks.isEmpty()) {// 分发失败的任务,需要重新加入队列中,等待重新分发for (TaskPriority dispatchFailedTask : failedDispatchTasks) {taskPriorityQueue.put(dispatchFailedTask);}// If there are tasks in a cycle that cannot find the worker group,// sleep for 1 secondif (taskPriorityQueue.size() <= failedDispatchTasks.size()) {TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);}}}catch (Exception e){logger.error("dispatcher task error",e);}}
}/*** dispatch task** @param taskPriority taskPriority* @return result*/
protected boolean dispatch(TaskPriority taskPriority) {boolean result = false;try {int taskInstanceId = taskPriority.getTaskId();TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());if (taskInstanceIsFinalState(taskInstanceId)){// when task finish, ignore this task, there is no need to dispatch anymorereturn true;}else{// 分发任务// 分发算法支持:低负载优先算法,随机算法, 轮询算法。result = dispatcher.dispatch(executionContext);}} catch (ExecuteException e) {logger.error("dispatch error: {}",e.getMessage());}return result;
}
通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。DolphinScheduler 的 Master 架构充分保证了任务调度的高可用性和扩展性,而通过 Zookeeper 实现的集群协调则为系统提供了强大的容错机制。
如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。
本文完!
本文由 白鲸开源科技 提供发布支持!
相关文章:
Apache DolphinScheduler-1.3.9源码分析(一)
引言 随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。 在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进…...
高级java每日一道面试题-2024年9月29日-数据库篇-索引怎么定义,分哪几种?
如果有遗漏,评论区告诉我进行补充 面试官: 索引怎么定义,分哪几种? 我回答: 在Java高级面试中,尤其是涉及数据库和数据结构的部分,索引(Index)是一个核心概念。索引的目的是提高数据库表中数据的检索速度,从而加快…...
现代LLM基本技术整理
0 开始之前 作者:hadiii,北京大学 电子信息硕士在读 本文从Llama 3报告出发,基本整理一些现代LLM的技术。基本,是说对一些具体细节不会过于详尽,而是希望得到一篇相对全面,包括预训练,后训练&…...
EasyX与少儿编程:轻松上手的编程启蒙工具
EasyX:开启少儿编程的图形化启蒙之路 随着科技发展,编程逐渐成为孩子们教育中重要的一部分。如何让孩子在编程启蒙阶段更容易接受并激发他们的兴趣,成为许多家长和老师关心的问题。相比起传统的编程语言,图形化编程工具显得更直观…...
【C语言指南】数据类型详解(上)——内置类型
💓 博客主页:倔强的石头的CSDN主页 📝Gitee主页:倔强的石头的gitee主页 ⏩ 文章专栏:《C语言指南》 期待您的关注 目录 引言 1. 整型(Integer Types) 2. 浮点型(Floating-Point …...
视频汇聚/视频存储/安防视频监控EasyCVR平台RTMP推流显示离线是什么原因?
视频汇聚/视频存储/安防视频监控EasyCVR视频汇聚平台兼容性强、支持灵活拓展,平台可提供视频远程监控、录像、存储与回放、视频转码、视频快照、告警、云台控制、语音对讲、平台级联等视频能力。 EasyCVR安防监控视频综合管理平台采用先进的网络传输技术࿰…...
联想电脑怎么开启vt_联想电脑开启vt虚拟化教程(附intel和amd主板开启方法)
最近使用联想电脑的小伙伴们问我,联想电脑怎么开启vt虚拟。大多数可以在Bios中开启vt虚拟化技术,当CPU支持VT-x虚拟化技术,有些电脑会自动开启VT-x虚拟化技术功能。而大部分的电脑则需要在Bios Setup界面中,手动进行设置ÿ…...
手把手教你使用YOLOv11训练自己数据集(含环境搭建 、数据集查找、模型训练)
一、前言 本文内含YOLOv11网络结构图 训练教程 推理教程 数据集获取等有关YOLOv11的内容! 官方代码地址:https://github.com/ultralytics/ultralytics/tree/main/ultralytics/cfg/models/11 二、整体网络结构图 三、环境搭建 项目环境如下…...
LabVIEW界面输入值设为默认值
在LabVIEW中,将前面板上所有控件的当前输入值设为默认值,可以通过以下步骤实现: 使用控件属性节点:你可以创建一个属性节点来获取所有控件的引用。 右键点击控件,选择“创建” > “属性节点”。 设置属性节点为“D…...
【Android 14源码分析】Activity启动流程-1
忽然有一天,我想要做一件事:去代码中去验证那些曾经被“灌输”的理论。 – 服装…...
Java 中 synchronized 和 Thread 的使用场合介绍
在 Java 编程中,synchronized 和 Thread 是处理并发与多线程编程的关键工具。多线程编程是为了在单一程序中并行执行多个任务,Java 提供了丰富的 API 和关键字以实现这一目标,而其中 synchronized 和 Thread 是非常基础和重要的部分。 synch…...
爬虫库是什么?是ip吗
爬虫库通常指的是用于网页爬虫(Web Scraping)开发的代码库或框架,它不是IP地址。以下是关于爬虫库的详细解释: 爬虫库的定义 爬虫库是一些用于简化网络数据抓取过程的工具和框架,通常提供了一系列函数和类࿰…...
【MySQL】查询原理 —— B+树查询数据全过程
使用B树作为索引结构的原因: 一种自平衡树: B树在插入和删除的时候节点会进行分裂和合并操作,以保持树的平衡,存在冗余节点,使得删除的时候树结构变化小,更高效。 高度不会增长过快,查询磁盘I…...
系统设置 WIFI输入框被挡住解决方案
文章目录 问题点复现的场景机器横屏可复现,竖屏不存在跟density 相关的。 解决问题方案设置输入模式路径 部分源码跟踪方法 延伸思考设置输入模式设置主题 问题点 进入系统设置-网络和互联网-WLAN-点击WIFI item ,密码输入框被遮挡,输入的密码不可见.如…...
SpringCloud无法注册Nacos和配置中心
今天升级SpringCloud版本,导致服务无法注册到nacos,使用nacos作为配置中心也无法刷新配置信息,后来发现是因为只更新了SpringCloud版本,SpringCloud-Alibaba没有更新导致的问题。 升级出现问题的版本是: <dependen…...
word2vector训练数据集整理(代码实现)
import math import os import random import torch import dltools from matplotlib import pyplot as plt #读取数据集 def read_ptb():"""将PTB数据集加载到文本行的列表中"""with open(./ptb/ptb.train.txt) as f:raw_text f.read()return…...
无心上班,只想为祖国庆生?让ChatGPT帮你搞定工作!
国庆假期临近,大家的心早已飞向诗和远方了吧。 然而,现实总是无情地将我们拉回到堆积如山的工作任务上:紧急报告的截止日期就在眼前,复杂的项目策划还未动笔,客户的定制需求迫在眉睫。每年的这个时候,如何…...
【Python】YOLO牛刀小试:快速实现视频物体检测
YOLO牛刀小试:快速实现视频物体检测 在深度学习的众多应用中,物体检测是一个热门且重要的领域。YOLO(You Only Look Once)系列模型以其快速和高效的特点,成为了物体检测的首选之一。本文将介绍如何使用YOLOv8模型进行…...
Vscode超好看的渐变主题插件
样式效果: 插件使用方法: 然后重启,之后会显示vccode损坏,不用理会,因为这个插件是更改了应用内部代码,直接不再显示即可。...
OceanBase技术解析:自适应分布式下压技术
在《OceanBase 数据库源码解析》这本书中,关于SQL执行器的深入剖析相对较少,因此,希望增添一些实用且详尽的补充内容。 上一篇博客《 OceanBase技术解析: 执行器中的自适应技术》中,已初步介绍了执行器中几项典型的自适…...
Firebase和JavaScript创建Postback Link逻辑
Firebase是一个提供后端即服务(BaaS)的平台,它允许开发者快速构建应用程序而无需管理服务器。Firebase不直接提供生成Postback Link的功能,但您可以使用Firebase的功能来构建和管理URL,然后在客户端使用这些URL来实现Postback。 以下是如何使用Firebase和JavaScript来创建…...
docker配置daemon.json文件
报错 :Get "https://registry-1.docker.io/v2/": net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers) 解决方法 配置加速地址 vim /etc/docker/daemon.json添加以下内容 {"registry-mirro…...
【08】纯血鸿蒙HarmonyOS NEXT星河版开发0基础学习笔记-Scroll容器与Tabs组件
序言: 本文详细讲解了关于我们在页面上经常看到的可滚动页面和导航栏在鸿蒙开发中如何用Scroll和Tabs组件实现,介绍了Scroll和Tabs的基本用法与属性。 笔者也是跟着B站黑马的课程一步步学习,学习的过程中添加部分自己的想法整理为笔记分享出…...
苏州 数字化科技展厅展馆-「世岩科技」一站式服务商
数字化科技展厅展馆设计施工是一个综合性强、技术要求高的项目,涉及到众多方面的要点。以下是对数字化科技展厅展馆设计施工要点的详细分析: 一、明确目标与定位 在设计之初,必须明确展厅的目标和定位。这包括确定展厅的主题、目标受众、展…...
音频搜索公司 DeepGram,定位语音搜索AI大脑,DeepGram想做“音频版”
1. 亦仁分享 DeepGram 成立于 2015 年,位于美国山景城,是一家基于 AI 技术的音频搜索引擎公司。运用机器学习进行语音识别、搜寻重要时刻并对音频和视频进行分类,帮助用户快速索引和浏览音频和视频文件,包括电话语音、会议语音、…...
基于php的在线租房管理系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏:Java精选实战项目…...
如何评价 Python 语言的运行速度
Python 作为一门编程语言,其运行速度一直是业界讨论的焦点。它的简洁语法和广泛的应用使得它在开发过程中非常高效,然而,运行速度与一些更底层的编程语言相比存在一定的劣势。这是否是由于 Python 语法的简洁性所带来的代价?我们可…...
Tomcat系列漏洞复现
CVE-2017-12615——Tomcat put⽅法任意⽂件写⼊漏洞 漏洞描述 当 Tomcat运⾏在Windows操作系统时,且启⽤了HTTP PUT请求⽅法(例如,将 readonly初始化参数由默认值设置为false),攻击者将有可能可通过精⼼构造的攻击请求…...
K8S拉取本地docker中registry的镜像报错:http: server gave HTTP response to HTTPS client
本地部署了一个K8S集群,但是worker1和worker2的docker无法拉取外面的镜像,docker的daemon.json也配置了,无法下载,于是在master部署了一个docker registry。 但是pod还是无法拉取registry的镜像并报错。 我这里使用的是container…...
Leetcode 1235. 规划兼职工作
1.题目基本信息 1.1.题目描述 你打算利用空闲时间来做兼职工作赚些零花钱。 这里有 n 份兼职工作,每份工作预计从 startTime[i] 开始到 endTime[i] 结束,报酬为 profit[i]。 给你一份兼职工作表,包含开始时间 startTime,结束时…...
给菠菜网站做支付/香港头条新闻
1) 质量是价值的体现、是公司的生存之道。 2) 软件质量是企业生命线、产品的生命线。持续优化、持续改进 3) 软件质量管理是一个长期进化的历程,没有止境唯有更好,企业能走多远质量管理就有多严 4) 质量是一个系统工程,短时间很难看到成效,持续改进是最好的方法。 5) 团队时刻关…...
成品网站建设价格/绍兴seo排名收费
凭证纸尺寸自定义纸张尺寸对照表纸张类型自定义纸张大小()备 注凭 证账 簿6.02420127038202790US Std Fanfold(37782794)平PJ7.0Legal纸型或 标准法律用纸Legal纸型或 标准法律用纸系统自带,无须自定义其大小为21593556PJ7.02420115038302930PJ7.12420115038302930账…...
网站开发一般做几个适配/百度投放广告联系谁
一。参数 1.实参 2.形参 从形参的角度分类: 1)位置参数 2)默认参数 参数陷阱def func(x,l []):l.append(x)print(l) func(alex) func(wusir) # 输出结果: # [alex] # [alex, wusir] 3)动态参数(*args &…...
wordpress媒体库一直转圈/黑帽seo优化推广
1,首先要有台牛逼的主机,主机上要有英伟达的显卡 NVIDIACUDA目前在机器学习领域的地位还无人能够撼动 如何配主机请参照以下链接 https://www.zhihu.com/question/33996159/answer/102691414 2,Ubuntu系统安装好,考虑到大多数人都…...
公司网站模块制作/优化大师免费版
晚上刷到一条消息,Layui官网将在10月13日正式下线 如果你的项目中还在用Layui也不必紧张,它的下线只是限于官网,其代码依然在Github托管。 说到Layui,可能搞后端的同学不太熟悉,它是一个WebUI组件库,和Boot…...
家电网站建设/免费b站推广网站入口202
x4bx2c0有四个根a1,a2,a3,a4x^4bx^2c0有四个根a_1,a_2,a_3,a_4x4bx2c0有四个根a1,a2,a3,a4 Q条件or关系FQ(a,b){a1a20a3a40\begin{cases}a_1a_20& \text{}\\a_3a_40& \text{}\end{cases}{a1a20a3a40保持上述关系不变的置换有8个F1F(b2−4c)F1F(\s…...