Skip to content

dd-xiaozhi/spring-batch-sync

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Spring Batch 模板框架

这是一个基于 Spring Boot 和 Spring Batch 的模板框架,旨在简化批处理任务的开发。通过实现接口,你只需要编写具体的业务逻辑,框架会自动处理任务注册和配置。

✨ 特性

  • 🚀 自动任务注册: 实现 AbstractBatchJob 接口的类会自动注册为 Spring Batch 任务
  • 📝 模板化开发: 只需实现抽象方法,无需手动配置 ItemReader、ItemProcessor、ItemWriter
  • 🎛️ 灵活配置: 支持自定义 chunk 大小、重启策略等
  • 🌐 REST 接口: 提供完整的 HTTP 接口来管理批处理任务
  • 📊 任务控制: 支持启动、停止、重启、状态查询等操作
  • 🗄️ 多数据源支持: 同时支持关系型数据库(H2/MySQL/PostgreSQL)和 MongoDB
  • 📦 开箱即用: 提供完整的示例任务,包括数据库和 MongoDB 操作
  • 定时任务支持: 支持基于 Cron 表达式的定时任务调度
  • 🔄 自动调度: 启用定时任务的批处理任务会自动按计划执行
  • 🔒 并发控制: 支持任务并发控制,可配置单实例或多实例执行模式
  • 🌍 分布式锁扩展: 默认单机锁实现,支持扩展为 Redis 等分布式锁

🚀 快速开始

环境要求

  • Java 17+
  • Maven 3.6+
  • MongoDB 4.0+ (可选,用于 MongoDB 相关任务)

1. 克隆项目

git clone <repository-url>
cd spring-batch-template

2. 启动 MongoDB (可选)

如果要使用 MongoDB 相关功能,请先启动 MongoDB 服务:

# 使用 Docker 启动 MongoDB
docker run -d --name mongodb -p 27017:27017 mongo:latest

# 或者启动本地 MongoDB 服务
mongod

3. 运行应用

mvn spring-boot:run

4. 验证安装

# 查看已注册的任务
curl http://localhost:8080/job/jobs

# 执行示例任务
curl -X POST http://localhost:8080/job/start/sampleDataSync

📖 文档

核心文档

专题指南

🔧 核心概念

AbstractBatchJob 接口

所有批处理任务都需要实现 AbstractBatchJob 接口:

@Component
public class MyDataSyncJob implements AbstractBatchJob<InputType, OutputType> {

    @Override
    public String getJobName() {
        return "myDataSync";  // 任务名称,必须唯一
    }

    @Override
    public ItemReader<InputType> createItemReader() {
        // 实现数据读取逻辑
        return new MyItemReader();
    }

    @Override
    public ItemProcessor<InputType, OutputType> createItemProcessor() {
        // 实现数据处理逻辑
        return item -> processItem(item);
    }

    @Override
    public ItemWriter<OutputType> createItemWriter() {
        // 实现数据写入逻辑
        return items -> writeItems(items);
    }
}

可选配置方法

@Override
public int getChunkSize() {
    return 100;  // 每次处理的数据量,默认10
}

@Override
public boolean isScheduleEnabled() {
    return true;  // 启用定时任务,默认false
}

@Override
public String getScheduleCron() {
    return "0 0 2 * * ?";  // 每天凌晨2点执行
}

@Override
public ConcurrencyMode getConcurrencyMode() {
    return ConcurrencyMode.SINGLE_INSTANCE_REJECT;  // 只允许单实例执行
}

🎯 示例任务

项目中包含多个示例任务:

任务名称 描述 类型 定时执行
sampleDataSync 简单的字符串处理任务 基础示例
userDataSync 用户数据同步任务 数据库操作
productSync MongoDB 产品数据同步 MongoDB
productCleanup MongoDB 产品清理任务 MongoDB ✅ 每天凌晨1点
scheduledDataSync 定时数据同步任务 定时任务 ✅ 每天凌晨3点
hourlyReport 每小时报表生成 定时任务 ✅ 每小时30分
slowTestJob 慢速测试任务 测试用途

🔌 REST API

任务管理

# 查看所有任务
GET /job/jobs

# 启动任务
POST /job/start/{jobName}

# 停止任务
POST /job/stop/{jobName}

# 查看任务状态
GET /job/status/{jobName}

# 重启任务
POST /job/restart/{jobName}

# 查看正在运行的任务
GET /job/running

定时任务管理

# 查看定时任务配置
GET /schedule/jobs

# 查看启用的定时任务
GET /schedule/enabled

# 查看定时任务统计
GET /schedule/stats

并发控制

# 查看任务锁状态
GET /job/lock/{jobName}

# 强制释放任务锁
POST /job/lock/release/{jobName}

详细的 API 文档请参考:API 接口文档

⚙️ 配置

应用配置

application.yml 中配置:

spring:
  # 关系型数据库配置 (用于 Spring Batch 元数据)
  datasource:
    url: jdbc:h2:mem:testdb
    driver-class-name: org.h2.Driver
    username: sa
    password: 
  
  # MongoDB 配置 (可选)
  data:
    mongodb:
      uri: mongodb://localhost:27017/batch_db
  
  batch:
    job:
      enabled: false  # 禁用自动启动
    jdbc:
      initialize-schema: never  # 手动管理数据库表

定时任务配置

spring:
  task:
    scheduling:
      pool:
        size: 10  # 定时任务线程池大小
      thread-name-prefix: "batch-schedule-"

🏗️ 项目结构

src/main/java/com/example/
├── BatchApplication.java              # 主启动类
├── batch/
│   ├── factory/
│   │   └── BatchJobFactory.java       # 任务工厂类
│   ├── jobs/                          # 示例任务
│   │   ├── SampleDataSyncJob.java
│   │   ├── UserDataSyncJob.java
│   │   ├── ProductSyncJob.java
│   │   ├── ProductCleanupJob.java
│   │   ├── ScheduledDataSyncJob.java
│   │   ├── HourlyReportJob.java
│   │   └── SlowTestJob.java
│   ├── lock/                          # 并发控制
│   │   ├── JobLockStrategy.java       # 锁策略接口
│   │   ├── LocalJobLockStrategy.java  # 本地锁实现
│   │   └── ConcurrencyMode.java       # 并发模式枚举
│   └── template/
│       └── AbstractBatchJob.java      # 抽象任务接口
├── config/                            # 配置类
│   ├── BatchConfig.java
│   ├── MongoConfig.java
│   ├── ScheduleConfig.java
│   └── WebConfig.java
├── controller/                        # REST 控制器
│   ├── JobManagementController.java
│   └── ScheduleController.java
├── model/                             # 数据模型
│   ├── Product.java
│   └── ProductSummary.java
├── repository/                        # 数据访问层
│   ├── ProductRepository.java
│   └── ProductSummaryRepository.java
└── service/                           # 服务层
    ├── BatchScheduleService.java
    ├── DataInitializationService.java
    ├── JobExecutionService.java       # 任务执行服务(含并发控制)
    └── JobManagementService.java

🛠️ 开发指南

创建新任务

  1. 创建一个新的类实现 AbstractBatchJob 接口
  2. 添加 @Component 注解
  3. 实现必需的抽象方法
  4. 重启应用,任务会自动注册

启用定时任务

@Override
public boolean isScheduleEnabled() {
    return true;
}

@Override
public String getScheduleCron() {
    return "0 0 2 * * ?";  // 每天凌晨2点
}

添加 MongoDB 支持

  1. 确保 MongoDB 服务运行
  2. 配置 MongoDB 连接
  3. 使用 MongoItemReaderMongoItemWriter

详细指南请参考:MongoDB 支持指南

配置并发控制

任务支持三种并发模式:

模式 说明
ALLOW_MULTIPLE 允许多个实例同时执行(默认)
SINGLE_INSTANCE_REJECT 只允许一个实例执行,后续请求被拒绝
SINGLE_INSTANCE_WAIT 只允许一个实例执行,后续请求排队等待
@Component
public class MyBatchJob implements AbstractBatchJob<String, String> {
    
    @Override
    public ConcurrencyMode getConcurrencyMode() {
        return ConcurrencyMode.SINGLE_INSTANCE_REJECT;
    }
    
    @Override
    public long getLockTimeoutMs() {
        return 3600000L;  // 锁超时时间,默认1小时
    }
    
    // ... 其他方法
}

分布式锁扩展

默认使用本地锁(LocalJobLockStrategy),仅适用于单机环境。

分布式环境下,实现 JobLockStrategy 接口并使用 @Primary 注解覆盖默认实现:

@Component
@Primary
public class RedisJobLockStrategy implements JobLockStrategy {
    
    @Resource
    private StringRedisTemplate redisTemplate;
    
    @Override
    public boolean tryLock(String jobName, String executionId, long timeoutMs) {
        String lockKey = "batch:job:lock:" + jobName;
        return Boolean.TRUE.equals(
            redisTemplate.opsForValue().setIfAbsent(lockKey, executionId, timeoutMs, TimeUnit.MILLISECONDS)
        );
    }
    
    // ... 其他方法实现
}

参考示例:src/main/java/com/aoaojiao/batch/lock/RedisJobLockStrategy.java.example

📊 监控和管理

查看任务状态

# 查看特定任务状态
curl http://localhost:8080/job/status/sampleDataSync

# 查看所有正在运行的任务
curl http://localhost:8080/job/running

任务控制

# 启动任务
curl -X POST http://localhost:8080/job/start/sampleDataSync

# 停止任务
curl -X POST http://localhost:8080/job/stop/sampleDataSync

# 重启任务
curl -X POST http://localhost:8080/job/restart/sampleDataSync

🔍 故障排除

常见问题

  1. 任务未注册

    • 检查类是否添加了 @Component 注解
    • 确认类实现了 AbstractBatchJob 接口
    • 查看启动日志中的任务注册信息
  2. MongoDB 连接失败

    • 确认 MongoDB 服务正在运行
    • 检查连接配置是否正确
    • 查看应用日志中的错误信息
  3. 定时任务未执行

    • 检查 isScheduleEnabled() 是否返回 true
    • 验证 Cron 表达式是否正确
    • 确认时区设置
  4. 任务被拒绝执行

    • 检查任务是否配置了 SINGLE_INSTANCE_REJECT 模式
    • 使用 GET /job/lock/{jobName} 查看锁状态
    • 如需强制执行,使用 POST /job/lock/release/{jobName} 释放锁

调试技巧

  1. 启用调试日志:

    logging:
      level:
        com.aoaojiao: DEBUG
        org.springframework.batch: DEBUG
  2. 使用测试任务验证功能

  3. 查看 H2 控制台:http://localhost:8080/h2-console

🤝 贡献

欢迎提交 Issue 和 Pull Request 来改进这个项目。

📄 许可证

本项目采用 MIT 许可证。

🔗 相关链接


快速链接

About

基于 spring-batch 封装的通用数据同步器

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages