From ff14bad3ddce15136852ba1732bc8bc27c196ab2 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Wed, 19 Jul 2023 17:20:41 +0800 Subject: [PATCH 1/4] :zap:update version --- openjob-common/pom.xml | 2 +- openjob-server/openjob-server-admin/pom.xml | 2 +- openjob-server/openjob-server-cluster/pom.xml | 2 +- openjob-server/openjob-server-common/pom.xml | 2 +- openjob-server/openjob-server-dispatcher/pom.xml | 2 +- openjob-server/openjob-server-log/pom.xml | 2 +- openjob-server/openjob-server-openapi/pom.xml | 2 +- openjob-server/openjob-server-repository/pom.xml | 2 +- openjob-server/openjob-server-scheduler/pom.xml | 2 +- openjob-server/openjob-server-starter/pom.xml | 2 +- openjob-server/pom.xml | 2 +- openjob-worker/openjob-worker-core/pom.xml | 2 +- openjob-worker/openjob-worker-spring-boot-starter/pom.xml | 2 +- openjob-worker/pom.xml | 4 ++-- pom.xml | 2 +- 15 files changed, 16 insertions(+), 16 deletions(-) diff --git a/openjob-common/pom.xml b/openjob-common/pom.xml index 0a490fb8..fbbbdd13 100644 --- a/openjob-common/pom.xml +++ b/openjob-common/pom.xml @@ -5,7 +5,7 @@ openjob io.openjob - 1.0.5 + 1.0.6 4.0.0 openjob-common diff --git a/openjob-server/openjob-server-admin/pom.xml b/openjob-server/openjob-server-admin/pom.xml index 6550b4ff..f765b7c2 100644 --- a/openjob-server/openjob-server-admin/pom.xml +++ b/openjob-server/openjob-server-admin/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 openjob-server-admin diff --git a/openjob-server/openjob-server-cluster/pom.xml b/openjob-server/openjob-server-cluster/pom.xml index a0649ba9..b56a9e05 100644 --- a/openjob-server/openjob-server-cluster/pom.xml +++ b/openjob-server/openjob-server-cluster/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 diff --git a/openjob-server/openjob-server-common/pom.xml b/openjob-server/openjob-server-common/pom.xml index 2219cbee..17e6a5b0 100644 --- a/openjob-server/openjob-server-common/pom.xml +++ b/openjob-server/openjob-server-common/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 openjob-server-common diff --git a/openjob-server/openjob-server-dispatcher/pom.xml b/openjob-server/openjob-server-dispatcher/pom.xml index a4743b0a..0d5fbd31 100644 --- a/openjob-server/openjob-server-dispatcher/pom.xml +++ b/openjob-server/openjob-server-dispatcher/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 diff --git a/openjob-server/openjob-server-log/pom.xml b/openjob-server/openjob-server-log/pom.xml index 4468e3e3..9c670734 100644 --- a/openjob-server/openjob-server-log/pom.xml +++ b/openjob-server/openjob-server-log/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 openjob-server-log diff --git a/openjob-server/openjob-server-openapi/pom.xml b/openjob-server/openjob-server-openapi/pom.xml index 55c2ec85..0e809582 100644 --- a/openjob-server/openjob-server-openapi/pom.xml +++ b/openjob-server/openjob-server-openapi/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 openjob-server-openapi diff --git a/openjob-server/openjob-server-repository/pom.xml b/openjob-server/openjob-server-repository/pom.xml index fae204f3..75c072e6 100644 --- a/openjob-server/openjob-server-repository/pom.xml +++ b/openjob-server/openjob-server-repository/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 openjob-server-repository diff --git a/openjob-server/openjob-server-scheduler/pom.xml b/openjob-server/openjob-server-scheduler/pom.xml index 7cdef53c..399eae09 100644 --- a/openjob-server/openjob-server-scheduler/pom.xml +++ b/openjob-server/openjob-server-scheduler/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 diff --git a/openjob-server/openjob-server-starter/pom.xml b/openjob-server/openjob-server-starter/pom.xml index f009ddf2..fd93e16d 100644 --- a/openjob-server/openjob-server-starter/pom.xml +++ b/openjob-server/openjob-server-starter/pom.xml @@ -5,7 +5,7 @@ openjob-server io.openjob - 1.0.5 + 1.0.6 4.0.0 diff --git a/openjob-server/pom.xml b/openjob-server/pom.xml index 7bf949ba..c60b6eed 100644 --- a/openjob-server/pom.xml +++ b/openjob-server/pom.xml @@ -5,7 +5,7 @@ openjob io.openjob - 1.0.5 + 1.0.6 4.0.0 openjob-server diff --git a/openjob-worker/openjob-worker-core/pom.xml b/openjob-worker/openjob-worker-core/pom.xml index 4e3cd9b3..8cb9edc5 100644 --- a/openjob-worker/openjob-worker-core/pom.xml +++ b/openjob-worker/openjob-worker-core/pom.xml @@ -5,7 +5,7 @@ openjob-worker io.openjob.worker - 1.0.5 + 1.0.6 4.0.0 diff --git a/openjob-worker/openjob-worker-spring-boot-starter/pom.xml b/openjob-worker/openjob-worker-spring-boot-starter/pom.xml index d72fd846..4f170ce7 100644 --- a/openjob-worker/openjob-worker-spring-boot-starter/pom.xml +++ b/openjob-worker/openjob-worker-spring-boot-starter/pom.xml @@ -5,7 +5,7 @@ openjob-worker io.openjob.worker - 1.0.5 + 1.0.6 4.0.0 diff --git a/openjob-worker/pom.xml b/openjob-worker/pom.xml index 5a82c958..f4c7acc0 100644 --- a/openjob-worker/pom.xml +++ b/openjob-worker/pom.xml @@ -5,7 +5,7 @@ openjob io.openjob - 1.0.5 + 1.0.6 4.0.0 @@ -25,7 +25,7 @@ io.openjob.worker openjob-worker-core - 1.0.5 + 1.0.6 diff --git a/pom.xml b/pom.xml index a683c709..ab336113 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ io.openjob openjob pom - 1.0.5 + 1.0.6 Openjob Build ${version} Openjob build with Maven From 64df970c314e200b0cdc40adfed295d10ed1e35d Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Wed, 19 Jul 2023 19:54:02 +0800 Subject: [PATCH 2/4] :bug:fixed job instance status --- .../io/openjob/common/task/BaseConsumer.java | 1 + .../executor/WorkerHeartbeatExecutor.java | 2 +- .../executor/WorkerJobInstanceExecutor.java | 49 +++++++++++++++++++ .../executor/WorkerTaskLogExecutor.java | 2 +- .../cluster/service/JobInstanceService.java | 20 ++++++-- .../cluster/task/WorkerHeartConsumer.java | 8 ++- .../task/WorkerJobInstanceConsumer.java | 49 +++++++++++++++++++ .../cluster/task/WorkerTaskLogConsumer.java | 8 ++- 8 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java create mode 100644 openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java diff --git a/openjob-common/src/main/java/io/openjob/common/task/BaseConsumer.java b/openjob-common/src/main/java/io/openjob/common/task/BaseConsumer.java index a0e71282..17ed0bf0 100644 --- a/openjob-common/src/main/java/io/openjob/common/task/BaseConsumer.java +++ b/openjob-common/src/main/java/io/openjob/common/task/BaseConsumer.java @@ -114,6 +114,7 @@ public Thread newThread(@Nonnull Runnable r) { }, new ThreadPoolExecutor.CallerRunsPolicy() ); + consumerExecutor.allowCoreThreadTimeOut(true); this.pullExecutor = new ThreadPoolExecutor( 1, diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java index 565f499d..5e82b259 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java @@ -27,7 +27,7 @@ public WorkerHeartbeatExecutor() { WorkerHeartConsumer consumer = new WorkerHeartConsumer( 0L, 1, - 8, + 16, "Openjob-heartbeat-executor", 50, "Openjob-heartbeat-consumer", diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java new file mode 100644 index 00000000..8bd3ea6d --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerJobInstanceExecutor.java @@ -0,0 +1,49 @@ +package io.openjob.server.cluster.executor; + +import io.openjob.common.request.WorkerJobInstanceStatusRequest; +import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.task.WorkerJobInstanceConsumer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author stelin swoft@qq.com + * @since 1.0.6 + */ +@Slf4j +@Component +public class WorkerJobInstanceExecutor { + private final TaskQueue queue; + + /** + * New + */ + public WorkerJobInstanceExecutor() { + this.queue = new TaskQueue<>(0L, 1024); + + //Consumer + WorkerJobInstanceConsumer consumer = new WorkerJobInstanceConsumer( + 0L, + 1, + 32, + "Openjob-heartbeat-executor", + 50, + "Openjob-heartbeat-consumer", + this.queue + ); + consumer.start(); + } + + /** + * Submit request + * + * @param request request + */ + public void submit(WorkerJobInstanceStatusRequest request) { + try { + this.queue.submit(request); + } catch (InterruptedException e) { + log.error("Worker heartbeat submit failed!", e); + } + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java index 1ccc203c..c25a7c99 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerTaskLogExecutor.java @@ -25,7 +25,7 @@ public WorkerTaskLogExecutor() { WorkerTaskLogConsumer consumer = new WorkerTaskLogConsumer( 0L, 1, - 16, + 32, "Openjob-log-executor", 50, "Openjob-log-consumer", diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java index 685d54e3..19a3fdc8 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java @@ -4,6 +4,7 @@ import io.openjob.common.request.WorkerJobInstanceLogRequest; import io.openjob.common.request.WorkerJobInstanceStatusRequest; import io.openjob.common.util.DateUtil; +import io.openjob.server.cluster.executor.WorkerJobInstanceExecutor; import io.openjob.server.repository.dao.JobInstanceDAO; import io.openjob.server.repository.dao.JobInstanceLogDAO; import io.openjob.server.repository.dao.JobInstanceTaskDAO; @@ -28,16 +29,24 @@ @Log4j2 public class JobInstanceService { private final JobInstanceTaskDAO jobInstanceTaskDAO; - private final JobInstanceLogDAO jobInstanceLogDAO; - private final JobInstanceDAO jobInstanceDAO; + private final WorkerJobInstanceExecutor workerJobInstanceExecutor; @Autowired - public JobInstanceService(JobInstanceTaskDAO jobInstanceTaskDAO, JobInstanceLogDAO jobInstanceLogDAO, JobInstanceDAO jobInstanceDAO) { + public JobInstanceService(JobInstanceTaskDAO jobInstanceTaskDAO, + JobInstanceLogDAO jobInstanceLogDAO, + JobInstanceDAO jobInstanceDAO, + WorkerJobInstanceExecutor workerJobInstanceExecutor) { this.jobInstanceTaskDAO = jobInstanceTaskDAO; this.jobInstanceLogDAO = jobInstanceLogDAO; this.jobInstanceDAO = jobInstanceDAO; + this.workerJobInstanceExecutor = workerJobInstanceExecutor; + } + + @Transactional(rollbackFor = Exception.class, timeout = 1) + public void handleInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) { + this.workerJobInstanceExecutor.submit(statusRequest); } /** @@ -46,7 +55,8 @@ public JobInstanceService(JobInstanceTaskDAO jobInstanceTaskDAO, JobInstanceLogD * @param statusRequest status request. */ @Transactional(rollbackFor = Exception.class) - public void handleInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) { + public void handleConsumerInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) { + Long start = DateUtil.timestamp(); // First page to update job instance status. if (CommonConstant.FIRST_PAGE.equals(statusRequest.getPage())) { this.jobInstanceDAO.updateStatusById(statusRequest.getJobInstanceId(), statusRequest.getStatus()); @@ -77,6 +87,8 @@ public void handleInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) { } catch (DataIntegrityViolationException | UnexpectedRollbackException exception) { log.warn("Data has been saved! {}", taskList.stream().map(JobInstanceTask::getTaskId).collect(Collectors.toList())); } + + System.out.println("timetime="+(DateUtil.timestamp()-start)); } /** diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java index ad6f4c5e..e2cee9cd 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerHeartConsumer.java @@ -5,6 +5,7 @@ import io.openjob.common.task.BaseConsumer; import io.openjob.common.task.TaskQueue; import io.openjob.server.cluster.service.WorkerHeartbeatService; +import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -12,6 +13,7 @@ * @author stelin swoft@qq.com * @since 1.0.3 */ +@Slf4j public class WorkerHeartConsumer extends BaseConsumer { public WorkerHeartConsumer(Long id, @@ -41,7 +43,11 @@ private WorkerHeartbeatConsumerRunnable(List tasks) { @Override public void run() { - OpenjobSpringContext.getBean(WorkerHeartbeatService.class).batchHeartbeat(this.tasks); + try { + OpenjobSpringContext.getBean(WorkerHeartbeatService.class).batchHeartbeat(this.tasks); + } catch (Throwable throwable) { + log.error("Worker heartbeat failed!", throwable); + } } } } diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java new file mode 100644 index 00000000..8005ad11 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerJobInstanceConsumer.java @@ -0,0 +1,49 @@ +package io.openjob.server.cluster.task; + +import io.openjob.common.OpenjobSpringContext; +import io.openjob.common.request.WorkerJobInstanceStatusRequest; +import io.openjob.common.task.BaseConsumer; +import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.service.JobInstanceService; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.6 + */ +@Slf4j +public class WorkerJobInstanceConsumer extends BaseConsumer { + public WorkerJobInstanceConsumer(Long id, + Integer consumerCoreThreadNum, + Integer consumerMaxThreadNum, + String consumerThreadName, + Integer pollSize, + String pollThreadName, + TaskQueue queues) { + super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 1000L, 1000L); + } + + @Override + public void consume(Long id, List tasks) { + this.consumerExecutor.submit(new WorkerJobInstanceConsumerRunnable(tasks)); + } + + private static class WorkerJobInstanceConsumerRunnable implements Runnable { + private final List tasks; + + private WorkerJobInstanceConsumerRunnable(List tasks) { + this.tasks = tasks; + } + + @Override + public void run() { + try { + this.tasks.forEach(r -> OpenjobSpringContext.getBean(JobInstanceService.class).handleConsumerInstanceStatus(r)); + } catch (Throwable throwable) { + log.error("Handler consumer instance status failed!", throwable); + } + } + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java index 1ae1b386..fa611969 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerTaskLogConsumer.java @@ -5,6 +5,7 @@ import io.openjob.common.task.BaseConsumer; import io.openjob.common.task.TaskQueue; import io.openjob.server.cluster.service.JobInstanceTaskLogService; +import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -12,6 +13,7 @@ * @author stelin swoft@qq.com * @since 1.0.3 */ +@Slf4j public class WorkerTaskLogConsumer extends BaseConsumer { public WorkerTaskLogConsumer(Long id, @@ -38,7 +40,11 @@ private WorkerTaskLogRunnable(List tasks) { @Override public void run() { - OpenjobSpringContext.getBean(JobInstanceTaskLogService.class).batchInstanceTaskLog(this.tasks); + try { + OpenjobSpringContext.getBean(JobInstanceTaskLogService.class).batchInstanceTaskLog(this.tasks); + } catch (Throwable throwable) { + log.error("Job instance log failed!", throwable); + } } } } From b12c0a74792af6dba2d06fca256db8585516d080 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Wed, 19 Jul 2023 20:23:57 +0800 Subject: [PATCH 3/4] :zap:update delay status --- .../actor/WorkerDelayInstanceStatusActor.java | 9 ++-- .../executor/WorkerDelayStatusExecutor.java | 49 +++++++++++++++++++ .../executor/WorkerHeartbeatExecutor.java | 2 - .../cluster/service/WorkerDelayService.java | 30 ++++++++++++ .../task/WorkerDelayStatusConsumer.java | 49 +++++++++++++++++++ .../service/DelayInstanceService.java | 2 +- 6 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerDelayStatusExecutor.java create mode 100644 openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerDelayService.java create mode 100644 openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerDelayStatusConsumer.java diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/actor/WorkerDelayInstanceStatusActor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/actor/WorkerDelayInstanceStatusActor.java index 5c9053d7..4cb4d7fb 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/actor/WorkerDelayInstanceStatusActor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/actor/WorkerDelayInstanceStatusActor.java @@ -4,6 +4,7 @@ import io.openjob.common.request.WorkerDelayStatusRequest; import io.openjob.common.response.Result; import io.openjob.common.response.ServerResponse; +import io.openjob.server.cluster.service.WorkerDelayService; import io.openjob.server.scheduler.service.DelayInstanceService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -20,11 +21,11 @@ @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class WorkerDelayInstanceStatusActor extends BaseActor { - private final DelayInstanceService delayInstanceService; + private final WorkerDelayService workerDelayService; @Autowired - public WorkerDelayInstanceStatusActor(DelayInstanceService delayInstanceService) { - this.delayInstanceService = delayInstanceService; + public WorkerDelayInstanceStatusActor(WorkerDelayService workerDelayService) { + this.workerDelayService = workerDelayService; } @Override @@ -40,7 +41,7 @@ public Receive createReceive() { * @param statusRequest statusRequest */ public void handleDelayStatus(WorkerDelayStatusRequest statusRequest) { - this.delayInstanceService.handleDelayStatus(statusRequest); + this.workerDelayService.handleDelayStatus(statusRequest); ServerResponse serverResponse = new ServerResponse(statusRequest.getDeliveryId()); getSender().tell(Result.success(serverResponse), getSelf()); diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerDelayStatusExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerDelayStatusExecutor.java new file mode 100644 index 00000000..dae00137 --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerDelayStatusExecutor.java @@ -0,0 +1,49 @@ +package io.openjob.server.cluster.executor; + +import io.openjob.common.request.WorkerDelayStatusRequest; +import io.openjob.common.task.TaskQueue; +import io.openjob.server.cluster.task.WorkerDelayStatusConsumer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author stelin swoft@qq.com + * @since 1.0.6 + */ +@Slf4j +@Component +public class WorkerDelayStatusExecutor { + private final TaskQueue queue; + + /** + * New + */ + public WorkerDelayStatusExecutor() { + this.queue = new TaskQueue<>(0L, 1024); + + //Consumer + WorkerDelayStatusConsumer consumer = new WorkerDelayStatusConsumer( + 0L, + 1, + 16, + "Openjob-heartbeat-executor", + 50, + "Openjob-heartbeat-consumer", + this.queue + ); + consumer.start(); + } + + /** + * Submit request + * + * @param request request + */ + public void submit(WorkerDelayStatusRequest request) { + try { + this.queue.submit(request); + } catch (InterruptedException e) { + log.error("Worker heartbeat submit failed!", e); + } + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java index 5e82b259..bc610495 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/executor/WorkerHeartbeatExecutor.java @@ -2,10 +2,8 @@ import io.openjob.common.request.WorkerHeartbeatRequest; import io.openjob.common.task.TaskQueue; -import io.openjob.server.cluster.service.WorkerHeartbeatService; import io.openjob.server.cluster.task.WorkerHeartConsumer; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerDelayService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerDelayService.java new file mode 100644 index 00000000..0523f5cd --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/WorkerDelayService.java @@ -0,0 +1,30 @@ +package io.openjob.server.cluster.service; + +import io.openjob.common.request.WorkerDelayStatusRequest; +import io.openjob.server.cluster.executor.WorkerDelayStatusExecutor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * @author stelin swoft@qq.com + * @since 1.0.6 + */ +@Service +public class WorkerDelayService { + + private final WorkerDelayStatusExecutor workerDelayStatusExecutor; + + @Autowired + public WorkerDelayService(WorkerDelayStatusExecutor workerDelayStatusExecutor) { + this.workerDelayStatusExecutor = workerDelayStatusExecutor; + } + + /** + * Handle delay status. + * + * @param workerDelayStatusRequest workerDelayStatusRequest + */ + public void handleDelayStatus(WorkerDelayStatusRequest workerDelayStatusRequest) { + this.workerDelayStatusExecutor.submit(workerDelayStatusRequest); + } +} diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerDelayStatusConsumer.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerDelayStatusConsumer.java new file mode 100644 index 00000000..e276d0cd --- /dev/null +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/task/WorkerDelayStatusConsumer.java @@ -0,0 +1,49 @@ +package io.openjob.server.cluster.task; + +import io.openjob.common.OpenjobSpringContext; +import io.openjob.common.request.WorkerDelayStatusRequest; +import io.openjob.common.task.BaseConsumer; +import io.openjob.common.task.TaskQueue; +import io.openjob.server.scheduler.service.DelayInstanceService; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * @author stelin swoft@qq.com + * @since 1.0.6 + */ +@Slf4j +public class WorkerDelayStatusConsumer extends BaseConsumer { + public WorkerDelayStatusConsumer(Long id, + Integer consumerCoreThreadNum, + Integer consumerMaxThreadNum, + String consumerThreadName, + Integer pollSize, + String pollThreadName, + TaskQueue queues) { + super(id, consumerCoreThreadNum, consumerMaxThreadNum, consumerThreadName, pollSize, pollThreadName, queues, 2000L, 1000L); + } + + @Override + public void consume(Long id, List tasks) { + this.consumerExecutor.submit(new WorkerDelayStatusRunnable(tasks)); + } + + private static class WorkerDelayStatusRunnable implements Runnable { + private final List tasks; + + private WorkerDelayStatusRunnable(List tasks) { + this.tasks = tasks; + } + + @Override + public void run() { + try { + this.tasks.forEach(r -> OpenjobSpringContext.getBean(DelayInstanceService.class).handleConsumerDelayStatus(r)); + } catch (Throwable throwable) { + log.error("Worker delay status consume failed!", throwable); + } + } + } +} diff --git a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/DelayInstanceService.java b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/DelayInstanceService.java index 352e04d3..9b161297 100644 --- a/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/DelayInstanceService.java +++ b/openjob-server/openjob-server-scheduler/src/main/java/io/openjob/server/scheduler/service/DelayInstanceService.java @@ -96,7 +96,7 @@ public ServerDelayTopicPullResponse pullTopicList(WorkerDelayTopicPullRequest pu * * @param workerDelayStatusRequest workerDelayStatusRequest */ - public void handleDelayStatus(WorkerDelayStatusRequest workerDelayStatusRequest) { + public void handleConsumerDelayStatus(WorkerDelayStatusRequest workerDelayStatusRequest) { List statusList = BeanMapperUtil.mapList(workerDelayStatusRequest.getTaskList(), WorkerDelayTaskRequest.class, DelayInstanceStatusRequestDTO.class); this.delayInstanceScheduler.report(statusList); From 659c2603b80da2dd920637d0f722299d84e0b3c7 Mon Sep 17 00:00:00 2001 From: stelin <794774870@qq.com> Date: Wed, 19 Jul 2023 20:29:02 +0800 Subject: [PATCH 4/4] :art:fixed style --- .../io/openjob/server/cluster/service/JobInstanceService.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java index 19a3fdc8..489b6dc0 100644 --- a/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java +++ b/openjob-server/openjob-server-cluster/src/main/java/io/openjob/server/cluster/service/JobInstanceService.java @@ -56,7 +56,6 @@ public void handleInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) { */ @Transactional(rollbackFor = Exception.class) public void handleConsumerInstanceStatus(WorkerJobInstanceStatusRequest statusRequest) { - Long start = DateUtil.timestamp(); // First page to update job instance status. if (CommonConstant.FIRST_PAGE.equals(statusRequest.getPage())) { this.jobInstanceDAO.updateStatusById(statusRequest.getJobInstanceId(), statusRequest.getStatus()); @@ -87,8 +86,6 @@ public void handleConsumerInstanceStatus(WorkerJobInstanceStatusRequest statusRe } catch (DataIntegrityViolationException | UnexpectedRollbackException exception) { log.warn("Data has been saved! {}", taskList.stream().map(JobInstanceTask::getTaskId).collect(Collectors.toList())); } - - System.out.println("timetime="+(DateUtil.timestamp()-start)); } /**