Spring Batch
Spring Batch 是 Spring 官方批处理框架,用于处理大规模数据集。
适用场景:批量数据导入导出、ETL 数据处理、定时报表生成、大规模数据迁移。
核心组件
| 组件 | 说明 |
|---|---|
| Job | 批处理作业,由多个 Step 组成 |
| Step | 作业步骤,包含 Tasklet 或 Chunk 处理 |
| ItemReader | 读取数据源 |
| ItemProcessor | 数据转换处理 |
| ItemWriter | 写入数据目标 |
| JobLauncher | 启动作业 |
| JobRepository | 存储作业执行状态 |
Chunk 模式
每个 Step 内部采用 Chunk 模式:读取一批 → 处理 → 写入,通过事务保证数据一致性。
快速开始
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>spring:
datasource:
url: jdbc:mysql://localhost:3306/batch_db
username: root
password: password
batch:
jdbc:
initialize-schema: alwaysJob 配置
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job myJob() {
return jobBuilderFactory.get("myJob")
.start(step1()).next(step2()).next(step3())
.build();
}
}Step 配置
Chunk-oriented Step
@Bean
public Step myStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("myStep")
.<InputDO, OutputDO>chunk(100)
.reader(myItemReader())
.processor(myItemProcessor())
.writer(myItemWriter())
.build();
}Tasklet Step
@Bean
public Step taskletStep() {
return stepBuilderFactory.get("taskletStep")
.tasklet(myTasklet())
.build();
}ItemReader
数据库读取
@Bean
public JdbcPagingItemReader<Student> myReader(DataSource dataSource) {
JdbcPagingItemReader<Student> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setPageSize(100);
reader.setRowMapper(new BeanPropertyRowMapper<>(Student.class));
MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
provider.setSelectClause("SELECT id, name, age");
provider.setFromClause("FROM student");
provider.setSortKeys(Collections.singletonMap("id", Order.ASCENDING));
reader.setQueryProvider(provider);
return reader;
}CSV 读取
@Bean
public FlatFileItemReader<Student> flatFileReader() {
return new FlatFileItemReaderBuilder<Student>()
.name("studentReader")
.resource(new ClassPathResource("student.csv"))
.delimited()
.names("id", "name", "age")
.linesToSkip(1)
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{ setTargetType(Student.class); }})
.build();
}JSON 读取
@Bean
public JsonItemReader<Student> jsonReader() {
return new JsonItemReaderBuilder<Student>()
.name("studentJsonReader")
.resource(new ClassPathResource("student.json"))
.jsonObjectReader(new JacksonJsonObjectReader<>(Student.class))
.build();
}ItemWriter
数据库写入
@Bean
public JdbcBatchItemWriter<Student> myWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Student>()
.dataSource(dataSource)
.sql("INSERT INTO student(id, name, age) VALUES (:id, :name, :age)")
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.build();
}CSV 写入
@Bean
public FlatFileItemWriter<Student> flatFileWriter() {
return new FlatFileItemWriterBuilder<Student>()
.name("studentWriter")
.resource(new FileSystemResource("output/student.csv"))
.delimited()
.names("id", "name", "age")
.headerCallback(writer -> writer.write("id,name,age"))
.build();
}ItemProcessor
返回 null 表示过滤该数据:
@Bean
public ItemProcessor<Student, Student> myProcessor() {
return student -> {
student.setName(student.getName().toUpperCase());
if (student.getAge() < 18) return null; // 过滤未成年
return student;
};
}组合处理器
@Bean
public CompositeItemProcessor<Student, Student> compositeProcessor() {
List<ItemProcessor<Student, Student>> processors = new ArrayList<>();
processors.add(processor1());
processors.add(processor2());
CompositeItemProcessor<Student, Student> processor = new CompositeItemProcessor<>();
processor.setDelegates(processors);
return processor;
}Listener
// Chunk Listener
.listener(new ChunkListener() {
@Override public void beforeChunk(ChunkContext context) { }
@Override public void afterChunk(ChunkContext context) { }
@Override public void afterChunkError(ChunkContext context) { }
})
// Job Execution Listener
.listener(new JobExecutionListener() {
@Override public void beforeJob(JobExecution jobExecution) { }
@Override public void afterJob(JobExecution jobExecution) { }
})异常处理
// Skip 策略:跳过可忽略的异常
.faultTolerant().skipLimit(10).skip(Exception.class)
// Retry 策略:重试失败记录
.faultTolerant().retryLimit(3).retry(Exception.class)
// 组合策略
.faultTolerant()
.skipLimit(10).skip(Exception.class)
.retryLimit(3).retry(Exception.class)
.noSkip(SeriousException.class)JobLauncher
@Autowired
private JobLauncher jobLauncher;
public void runJob() {
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(myJob(), params);
}命令行启动:java -jar app.jar --spring.batch.job.names=myJob
数据库表结构
| 表名 | 说明 |
|---|---|
| BATCH_JOB_INSTANCE | 作业实例 |
| BATCH_JOB_EXECUTION | 作业执行记录 |
| BATCH_JOB_EXECUTION_PARAMS | 作业参数 |
| BATCH_STEP_EXECUTION | 步骤执行记录 |
| BATCH_STEP_EXECUTION_CONTEXT | 步骤上下文 |
| BATCH_JOB_EXECUTION_CONTEXT | 作业上下文 |
常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| Job 不执行 | 未添加 @EnableBatchProcessing | 添加注解,检查 job.names 配置 |
| 内存溢出 | chunk size 过大或游标未设置 | 减小 chunk size,使用 setFetchSize 游标读取 |