SpringBoot:2.6.x

jdk:1.8

0.背景
假设我的项目中,有任务可能要定时执行,但我目前还没有认识到需要执行什么东西,如何实现?

调试一个开源的springboot项目,项目中使用了EnableScheduing自动执行了定时任务,现在想这些定时任务进行可视化并实现动态的定时任务。

所谓动态定时任务=任务逻辑(做什么) + 触发规则(什么时候做)

本系列一共解决下面的问题:

1.动态启动/停止定时任务
2.动态修改执行周期
3.动态添加/删除定时任务
4.通过注解标记可配置的定时任务
5.接口获取所有可配置任务
6.接口管理任务执行
7.定时任务可视化(借助vue)
8.开机自启动定时任务(借助mysql)

本文项目代码结构:

src/main/java/com/guo/schedule/
├── annotation/  
│   └── DynamicSchedule.java --自定义注解
├── config/
│   └── SchedulerConfig.java --线程池
├── controller/
│   └── ScheduleController.java  --控制器
├── dto/
│   └── ScheduleJob.java  --定时任务对象

├── jobs/
│   └── SystemJobs.java  --需要执行的定时任务
├── service/
│   └── ScheduleTaskRegistry.java  --任务注册服务
└── Application.java

本文代码下载地址(访问密码: 4315):

schedule1.zip: https://url47.ctfile.com/f/64055047-1503076675-a5eda5?p=4315

1.创建项目

首先参考这篇文章,创建一个Springboot项目:IDEA 2024版如何创建Spring Boot项目 – 每天进步一点点

项目生成后的pom文件参考如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.guo</groupId>
    <artifactId>schedule</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>schedule</name>
    <description>schedule</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.6.13</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.guo.schedule.ScheduleApplication</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

在启动类上加入@EnableScheduling注解。

然后右击schedule文件夹,新建—>软件包

一共创建6个文件夹,annotaion、config、controller、dto、jobs、service文件夹,参考如下:

2.自定义注解

在annotation文件夹下,新定义一个接口,代码参考如下:

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DynamicSchedule {
    String jobId() default "";  //定时任务ID
    String jobName();           //定时任务名称
    String description() default "";  //定时任务描述
    String defaultCron() default "";   //定时任务默认表达式
}

这里定义主要说明了定时任务ID和定时任务名字。

3.配置线程池

我们给定时任务单独配置一个线程池

在config文件夹下新建一个类,名字为SchedulerConfig


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
public class SchedulerConfig {

    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();  //创建一个线程池任务调度实例
        scheduler.setPoolSize(10); //线程池大小为10
        scheduler.setThreadNamePrefix("dynamic-scheduler-");  //线程池前缀
        scheduler.setAwaitTerminationSeconds(60);  //设置应用关闭时等待任务完成的超时时间为 60 秒
        scheduler.setWaitForTasksToCompleteOnShutdown(true); //表示会等待正在执行的任务完成
        return scheduler;
    }
}

创建完后项目结构如下:

4.创建DTO

在DTO文件夹下创建一个类,名字为ScheduleJob,用作实体类,后期可以考虑跟数据库映射,参考代码如下:

package com.guo.schedule.dto;

public class ScheduleJob {
    private String jobId;
    private String jobName;
    private String description;
    private String className;
    private String methodName;
    private String currentCron;
    private boolean active;

    // 完整构造函数
    public ScheduleJob(String jobId, String jobName, String description,
                       String className, String methodName, String currentCron, boolean active) {
        this.jobId = jobId;
        this.jobName = jobName;
        this.description = description;
        this.className = className;
        this.methodName = methodName;
        this.currentCron = currentCron;
        this.active = active;
    }

    public String getJobId() {
        return jobId;
    }

    public void setJobId(String jobId) {
        this.jobId = jobId;
    }

    public String getJobName() {
        return jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public String getCurrentCron() {
        return currentCron;
    }

    public void setCurrentCron(String currentCron) {
        this.currentCron = currentCron;
    }

    public boolean isActive() {
        return active;
    }

    public void setActive(boolean active) {
        this.active = active;
    }

    @Override
    public String toString() {
        return "ScheduleJob{" +
                "jobId='" + jobId + '\'' +
                ", jobName='" + jobName + '\'' +
                ", description='" + description + '\'' +
                ", className='" + className + '\'' +
                ", methodName='" + methodName + '\'' +
                ", currentCron='" + currentCron + '\'' +
                ", active=" + active +
                '}';
    }
}
5.任务注册服务

在service文件夹下创建一个类,名字为ScheduleTaskRegistry,代码比较多,参考如下:


import com.guo.schedule.annotation.DynamicSchedule;
import com.guo.schedule.dto.ScheduleJob;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;

import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

@Service
public class ScheduleTaskRegistry implements ApplicationListener<ContextRefreshedEvent> {

    private final ApplicationContext context;  //上下文
    private final TaskScheduler taskScheduler; //任务调度器
    private final Map<String, ScheduleJob> jobDefinitions = new ConcurrentHashMap<>(); //任务定义存储
    private final Map<String, ScheduledFuture<?>> activeTasks = new ConcurrentHashMap<>();  //激活任务存储

    //构造函数
    @Autowired
    public ScheduleTaskRegistry(ApplicationContext context, TaskScheduler taskScheduler) {
        this.context = context;
        this.taskScheduler = taskScheduler;
    }

    /**
     * 延迟任务扫描到所有Bean初始化完成后进行
     * @param event
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext().getParent() == null) {
            scanAnnotatedMethods();
        }
    }

    /**
     * 获取所有Bean定义名称
     * 检查每个方法是否带有 @DynamicSchedule 注解
     */
    private void scanAnnotatedMethods() {
        String[] beanNames = context.getBeanDefinitionNames();
        for (String beanName : beanNames) {
            Object bean = context.getBean(beanName);
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            for (Method method : targetClass.getDeclaredMethods()) {
                if (method.isAnnotationPresent(DynamicSchedule.class)) {
                    processMethodAnnotation(bean, targetClass, method);
                }
            }
        }
    }

    /**
     * 优先使用注解配置的jobId,否则自动生成,使用并发Map存储任务定义
     * @param bean
     * @param targetClass
     * @param method
     */
    private void processMethodAnnotation(Object bean, Class<?> targetClass, Method method) {
        DynamicSchedule annotation = method.getAnnotation(DynamicSchedule.class);
        String jobId = annotation.jobId().isEmpty() ?
                generateJobId(targetClass, method) : annotation.jobId();

        ScheduleJob job = new ScheduleJob(
                jobId,
                annotation.jobName(),
                annotation.description(),
                targetClass.getName(),
                method.getName(),
                annotation.defaultCron(),
                false
        );
        jobDefinitions.put(jobId, job);
    }

    private String generateJobId(Class<?> clazz, Method method) {
        return clazz.getSimpleName() + "#" + method.getName();
    }

    public List<ScheduleJob> getAllJobDefinitions() {
        return new ArrayList<>(jobDefinitions.values());
    }

    public List<ScheduleJob> getActiveJobs() {
        List<ScheduleJob> activeJobs = new ArrayList<>();
        for (ScheduleJob job : jobDefinitions.values()) {
            if (job.isActive()) {
                activeJobs.add(job);
            }
        }
        return activeJobs;
    }

    /**
     * 任务激活逻辑
     * @param jobId  任务ID
     * @param cron 表达式
     * @return
     */
    public boolean activateJob(String jobId, String cron) {
        if (!jobDefinitions.containsKey(jobId)) return false;
        if (activeTasks.containsKey(jobId)) return false;

        ScheduleJob job = jobDefinitions.get(jobId);
        try {
            Object bean = context.getBean(Class.forName(job.getClassName()));
            Method method = bean.getClass().getMethod(job.getMethodName());

            Runnable task = wrapTask(bean, method);
            ScheduledFuture<?> future = scheduleTask(cron, task);

            updateJobStatus(jobId, cron, future);
            return true;
        } catch (Exception e) {
            throw new RuntimeException("Job activation failed", e);
        }
    }

    /**
     * 任务包装方法
     * @param bean
     * @param method
     * @return
     */
    private Runnable wrapTask(Object bean, Method method) {
        return () -> {
            try {
                method.invoke(bean);
            } catch (Exception e) {
                throw new RuntimeException("Task execution failed", e);
            }
        };
    }

    private ScheduledFuture<?> scheduleTask(String cron, Runnable task) {
        return taskScheduler.schedule(task, new CronTrigger(cron));
    }

    private void updateJobStatus(String jobId, String cron, ScheduledFuture<?> future) {
        ScheduleJob job = jobDefinitions.get(jobId);
        job.setCurrentCron(cron);
        job.setActive(true);
        activeTasks.put(jobId, future);
    }

    /**
     * 停用任务
     * @param jobId  任务id
     * @return
     */
    public boolean deactivateJob(String jobId) {
        if (!activeTasks.containsKey(jobId)) return false;

        ScheduledFuture<?> future = activeTasks.remove(jobId);
        future.cancel(true);
        jobDefinitions.get(jobId).setActive(false);
        return true;
    }

    /**
     * 更新任务调度
     * @param jobId 任务id
     * @param newCron 新表达式
     * @return
     */
    public boolean updateJobSchedule(String jobId, String newCron) {
        if (!deactivateJob(jobId)) return false;
        return activateJob(jobId, newCron);
    }
}
6.控制器

在controller文件夹下创建一个类,名字为ScheduleController,参考如下:

import com.guo.schedule.dto.ScheduleJob;
import com.guo.schedule.service.ScheduleTaskRegistry;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/api/schedule")
public class ScheduleController {

    private final ScheduleTaskRegistry taskRegistry;

    public ScheduleController(ScheduleTaskRegistry taskRegistry) {
        this.taskRegistry = taskRegistry;
    }

    @GetMapping("/jobs")
    public ResponseEntity<List<ScheduleJob>> getAllJobs() {
        return ResponseEntity.ok(taskRegistry.getAllJobDefinitions());
    }

    @GetMapping("/active-jobs")
    public ResponseEntity<List<ScheduleJob>> getActiveJobs() {
        return ResponseEntity.ok(taskRegistry.getActiveJobs());
    }

    @PostMapping("/activate/{jobId}")
    public ResponseEntity<String> activateJob(
            @PathVariable String jobId,
            @RequestParam String cron) {
        try {
            if (taskRegistry.activateJob(jobId, cron)) {
                return ResponseEntity.ok("Job activated");
            }
            return ResponseEntity.badRequest().body("Invalid job ID or already active");
        } catch (Exception e) {
            return ResponseEntity.internalServerError().body(e.getMessage());
        }
    }

    @PostMapping("/deactivate/{jobId}")
    public ResponseEntity<String> deactivateJob(@PathVariable String jobId) {
        if (taskRegistry.deactivateJob(jobId)) {
            return ResponseEntity.ok("Job deactivated");
        }
        return ResponseEntity.badRequest().body("Invalid job ID or not active");
    }

    @PutMapping("/{jobId}/schedule")
    public ResponseEntity<String> updateSchedule(
            @PathVariable String jobId,
            @RequestParam String newCron) {
        if (taskRegistry.updateJobSchedule(jobId, newCron)) {
            return ResponseEntity.ok("Schedule updated");
        }
        return ResponseEntity.badRequest().body("Update failed");
    }
}
7.添加定时任务

框架构建完毕,现在添加需要执行的定时任务,在jobs文件夹底下创建一个SystemJobs类,代码参考如下:

import com.guo.schedule.annotation.DynamicSchedule;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class SystemJobs {

    @DynamicSchedule(
            jobId = "cleanup",
            jobName = "系统清理",
            description = "每日凌晨清理临时文件",
            defaultCron = "0 0 3 * * ?"
    )
    public void performCleanup() {
        System.out.println("[Cleanup] 执行清理任务...");
        System.out.println(new Date().toString());
    }

    @DynamicSchedule(
            jobName = "数据统计",
            description = "每小时生成统计报告",
            defaultCron = "*/3 * * * * *"
    )
    public void generateStatisticsReport() {
        System.out.println("[Statistics] 生成统计报表...");
        System.out.println(new Date().toString());
    }
}
8.启动测试

一些测试的接口如下:

//查询所有可用任务
http://localhost:8080/api/schedule/jobs
//激活清理任务(每5秒执行):
curl -X POST "http://localhost:8080/api/schedule/activate/cleanup?cron=0/5 * * * * ?"
//查看活动任务
http://localhost:8080/api/schedule/active-jobs
//修改调度频率
curl -X PUT "http://localhost:8080/api/schedule/cleanup/schedule?newCron=0/10 * * * * ?"
//停止任务
curl -X POST http://localhost:8080/api/schedule/deactivate/cleanup

我们启动项目,在浏览器中首先输入下面的地址,

http://localhost:8080/api/schedule/jobs

可以看到,扫描到了定时任务

我们用api工具,启动一个定时任务后,就可以看到定时任务在控制台按照时间隔进行打印。

同时,也可以获取到正在允许的定时任务。