|
|
@@ -1,355 +0,0 @@
|
|
|
-package com.ruoyi.interfaces.service.impl;
|
|
|
-
|
|
|
-import cn.hutool.core.util.ObjUtil;
|
|
|
-import com.aizuda.snailjob.client.job.core.enums.AllocationAlgorithmEnum;
|
|
|
-import com.aizuda.snailjob.client.job.core.enums.TriggerTypeEnum;
|
|
|
-import com.aizuda.snailjob.client.job.core.handler.add.BroadcastAddHandler;
|
|
|
-import com.aizuda.snailjob.client.job.core.handler.add.ClusterAddHandler;
|
|
|
-import com.aizuda.snailjob.client.job.core.handler.update.BroadcastUpdateHandler;
|
|
|
-import com.aizuda.snailjob.client.job.core.handler.update.ClusterUpdateHandler;
|
|
|
-import com.aizuda.snailjob.client.job.core.openapi.SnailJobOpenApi;
|
|
|
-import com.aizuda.snailjob.common.core.enums.JobBlockStrategyEnum;
|
|
|
-import com.aizuda.snailjob.common.core.enums.StatusEnum;
|
|
|
-import com.ruoyi.common.utils.DateUtils;
|
|
|
-import com.ruoyi.common.utils.StringUtils;
|
|
|
-import com.ruoyi.interfaces.domain.Job;
|
|
|
-import com.ruoyi.interfaces.domain.JobLogVo;
|
|
|
-import com.ruoyi.interfaces.domain.job.DashboardLineResponseVO;
|
|
|
-import com.ruoyi.interfaces.domain.job.JobLineQueryVo;
|
|
|
-import com.ruoyi.interfaces.domain.job.JobSummary;
|
|
|
-import com.ruoyi.interfaces.domain.job.JobTask;
|
|
|
-import com.ruoyi.interfaces.domain.vo.JobRequestVo;
|
|
|
-import com.ruoyi.interfaces.enums.DashboardLineEnum;
|
|
|
-import com.ruoyi.interfaces.enums.DateTypeEnum;
|
|
|
-import com.ruoyi.interfaces.mapper.JobSummaryMapper;
|
|
|
-import com.ruoyi.interfaces.mapper.SnailJobMapper;
|
|
|
-import com.ruoyi.interfaces.service.SnailJobService;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
-import java.time.LocalDateTime;
|
|
|
-import java.util.*;
|
|
|
-
|
|
|
-@Service
|
|
|
-public class SnailJobServiceImpl implements SnailJobService {
|
|
|
-
|
|
|
- @Value("${snail-job.group}")
|
|
|
- private String groupName;
|
|
|
-
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private SnailJobMapper snailJobMapper;
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private JobSummaryMapper jobSummaryMapper;
|
|
|
-
|
|
|
- @Override
|
|
|
- public List<Job> selectJobList(Job jobRequestVo) {
|
|
|
- jobRequestVo.setGroupName(groupName);
|
|
|
- return snailJobMapper.selectJobList(jobRequestVo);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 新增集群模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest
|
|
|
- * @return 任务id
|
|
|
- */
|
|
|
- public Long addClusterJob(JobRequestVo jobRequest) {
|
|
|
- ClusterAddHandler clusterAddHandler = SnailJobOpenApi.addClusterJob()
|
|
|
- //.setRouteKey(AllocationAlgorithmEnum.RANDOM)
|
|
|
- .setRouteKey(getAllocationAlgorithmEnum(jobRequest.getRouteKey()))
|
|
|
- .setJobName(jobRequest.getJobName())//
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setJobStatus(StatusEnum.of(jobRequest.getJobStatus())) //状态不能为空
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval());
|
|
|
-
|
|
|
- HashMap<String, Object> argsMap = jobRequest.getArgsStrMap();
|
|
|
- if (StringUtils.isNotEmpty(argsMap)) {
|
|
|
- argsMap.forEach(clusterAddHandler::addArgsStr);
|
|
|
- }
|
|
|
- return clusterAddHandler.execute();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 修改集群模式的任务
|
|
|
- */
|
|
|
- public Boolean updateClusterJob(JobRequestVo jobRequest) {
|
|
|
- ClusterUpdateHandler clusterUpdateHandler = SnailJobOpenApi.updateClusterJob(jobRequest.getId())
|
|
|
- .setRouteKey(getAllocationAlgorithmEnum(jobRequest.getRouteKey()))
|
|
|
- .setJobName(jobRequest.getJobName())//
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval());
|
|
|
-
|
|
|
- HashMap<String, Object> argsMap = jobRequest.getArgsStrMap();
|
|
|
- if (StringUtils.isNotEmpty(argsMap)) {
|
|
|
- argsMap.forEach(clusterUpdateHandler::addArgsStr);
|
|
|
- }
|
|
|
-
|
|
|
- return clusterUpdateHandler.execute();
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 新增广播模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest
|
|
|
- * @return
|
|
|
- */
|
|
|
- public Long addBroadcastJob(JobRequestVo jobRequest) {
|
|
|
- BroadcastAddHandler broadcastAddHandler = SnailJobOpenApi.addBroadcastJob()
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setJobStatus(StatusEnum.of(jobRequest.getJobStatus())) //状态不能为空
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval());
|
|
|
- HashMap<String, Object> argsMap = jobRequest.getArgsStrMap();
|
|
|
- if (StringUtils.isNotEmpty(argsMap)) {
|
|
|
- argsMap.forEach(broadcastAddHandler::addArgsStr);
|
|
|
- }
|
|
|
- return broadcastAddHandler.execute();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
- * 修改广播任务
|
|
|
- */
|
|
|
- public Boolean updateBroadcastJob(JobRequestVo jobRequest) {
|
|
|
- BroadcastUpdateHandler broadcastUpdateHandler = SnailJobOpenApi.updateBroadcastJob(jobRequest.getId())
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval());
|
|
|
- HashMap<String, Object> argsMap = jobRequest.getArgsStrMap();
|
|
|
- if (StringUtils.isNotEmpty(argsMap)) {
|
|
|
- argsMap.forEach(broadcastUpdateHandler::addArgsStr);
|
|
|
- }
|
|
|
- return broadcastUpdateHandler.execute();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 新增Sharding模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest 任务名称
|
|
|
- * @return 任务id
|
|
|
- */
|
|
|
- public Long addShardingJob(JobRequestVo jobRequest) {
|
|
|
- return SnailJobOpenApi.addShardingJob()
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .addShardingArgs(jobRequest.getShardingArgs())
|
|
|
- .setParallelNum(jobRequest.getParallelNum())
|
|
|
- .setJobStatus(StatusEnum.of(jobRequest.getJobStatus())) //状态不能为空
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval())
|
|
|
- .execute();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 修改Sharding模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest 任务名称
|
|
|
- * @return 任务id
|
|
|
- */
|
|
|
- public Boolean updateShardingJob(JobRequestVo jobRequest) {
|
|
|
- return SnailJobOpenApi.updateShardingJob(jobRequest.getId())
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .addShardingArgs(jobRequest.getShardingArgs())
|
|
|
- .setParallelNum(jobRequest.getParallelNum())
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval())
|
|
|
- .execute();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 新增Map模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest 任务名称
|
|
|
- * @return 任务id
|
|
|
- */
|
|
|
- public Long addMapJob(JobRequestVo jobRequest) {
|
|
|
- return SnailJobOpenApi.addMapJob()
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setParallelNum(jobRequest.getParallelNum())
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval())
|
|
|
- .setJobStatus(StatusEnum.of(jobRequest.getJobStatus())) //状态不能为空
|
|
|
- .execute();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 修改Map模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest
|
|
|
- * @return
|
|
|
- */
|
|
|
- public Boolean updateMapJob(JobRequestVo jobRequest) {
|
|
|
- return SnailJobOpenApi.updateMapJob(jobRequest.getId())
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setParallelNum(jobRequest.getParallelNum())
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval())
|
|
|
- .execute();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 新增MapReduce模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest
|
|
|
- * @return 任务id
|
|
|
- */
|
|
|
- public Long addMapReduceJob(JobRequestVo jobRequest) {
|
|
|
- return SnailJobOpenApi.addMapReduceJob()
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setParallelNum(jobRequest.getParallelNum())
|
|
|
- .setShardNum(jobRequest.getShardNum())
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval())
|
|
|
- .execute();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 修改MapReduce模式的任务
|
|
|
- *
|
|
|
- * @param jobRequest
|
|
|
- * @return 任务id
|
|
|
- */
|
|
|
- public Boolean updateMapReduceJob(JobRequestVo jobRequest) {
|
|
|
- return SnailJobOpenApi.updateMapReduceJob(jobRequest.getId())
|
|
|
- .setJobName(jobRequest.getJobName())
|
|
|
- .setExecutorInfo(jobRequest.getExecutorInfo())
|
|
|
- .setExecutorTimeout(jobRequest.getExecutorTimeout())
|
|
|
- .setDescription(jobRequest.getDescription())
|
|
|
- .setBlockStrategy(JobBlockStrategyEnum.valueOf(jobRequest.getBlockStrategy()))
|
|
|
- .setMaxRetryTimes(jobRequest.getMaxRetryTimes())
|
|
|
- .setTriggerType(getTriggerTypeEnum(jobRequest.getTriggerType()))
|
|
|
- .setTriggerInterval(jobRequest.getTriggerInterval())
|
|
|
- .setParallelNum(jobRequest.getParallelNum())
|
|
|
- .setShardNum(jobRequest.getShardNum())
|
|
|
- .setRetryInterval(jobRequest.getRetryInterval())
|
|
|
- .execute();
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public List<JobLogVo> listOfJobLog(JobRequestVo jobRequestVo) {
|
|
|
- return snailJobMapper.listOfJobLog(jobRequestVo);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Long> statistics() {
|
|
|
- Map<String, Long> ret = new HashMap<>();
|
|
|
- List<Job> jobs = snailJobMapper.selectJobList(null);
|
|
|
- ret.put("total", jobs.stream().count());
|
|
|
- Long jobRuns = snailJobMapper.listOfJobLogCount(null);
|
|
|
- ret.put("runTotal", jobRuns);
|
|
|
- JobRequestVo jobRequestVo = new JobRequestVo();
|
|
|
- jobRequestVo.setStartTime(new Date());
|
|
|
- Long nowJobRuns = snailJobMapper.listOfJobLogCount(jobRequestVo);
|
|
|
- ret.put("nowRunTotal", nowJobRuns);
|
|
|
- JobRequestVo jobRequestVo2 = new JobRequestVo();
|
|
|
- jobRequestVo2.setStartTime(DateUtils.parseDate(DateUtils.dateTimeBeforeDays(DateUtils.YYYY_MM_DD, 30)));
|
|
|
- Long monthJobRuns = snailJobMapper.listOfJobLogCount(jobRequestVo2);
|
|
|
- ret.put("monthRunTotal", monthJobRuns);
|
|
|
- return ret;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- public AllocationAlgorithmEnum getAllocationAlgorithmEnum(Integer type) {
|
|
|
- for (AllocationAlgorithmEnum algorithm : AllocationAlgorithmEnum.values()) {
|
|
|
- if (type.equals(algorithm.getType())) {
|
|
|
- return algorithm;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- public TriggerTypeEnum getTriggerTypeEnum(Integer type) {
|
|
|
- for (TriggerTypeEnum triggerTypeEnum : TriggerTypeEnum.values()) {
|
|
|
- if (type.equals(triggerTypeEnum.getType())) {
|
|
|
- return triggerTypeEnum;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public JobTask taskRetryJob() {
|
|
|
- JobSummary jobSummary = new JobSummary();
|
|
|
- jobSummary.setSystemTaskType(3);
|
|
|
- return jobSummaryMapper.selectJobTask(jobSummary);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public List<DashboardLineResponseVO> jobLineList(JobLineQueryVo queryVO) {
|
|
|
- // 折线图
|
|
|
- DateTypeEnum dateTypeEnum = DateTypeEnum.valueOf(queryVO.getType());
|
|
|
- LocalDateTime startDateTime = dateTypeEnum.getStartTime().apply(
|
|
|
- ObjUtil.isNotNull(queryVO.getStartDt()) ? queryVO.getStartDt() : LocalDateTime.now());
|
|
|
- LocalDateTime endDateTime = dateTypeEnum.getEndTime().apply(
|
|
|
- ObjUtil.isNotNull(queryVO.getEndDt()) ? queryVO.getEndDt() : LocalDateTime.now());
|
|
|
- JobSummary jobSummary = new JobSummary();
|
|
|
- jobSummary.setSystemTaskType(3);
|
|
|
- jobSummary.setStartTime(startDateTime);
|
|
|
- jobSummary.setEndTime(endDateTime);
|
|
|
- jobSummary.setDateFormat(DashboardLineEnum.dateFormat(queryVO.getType()));
|
|
|
- List<DashboardLineResponseVO> dashboardLineResponseDOList = jobSummaryMapper.selectJobLineList(jobSummary);
|
|
|
- dashboardLineResponseDOList.sort(Comparator.comparing(DashboardLineResponseVO::getCreateDt));
|
|
|
- return dashboardLineResponseDOList;
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-}
|