Spring Batch实战前言在企业级应用中批量数据处理是一个非常常见的需求。比如月底的工资代发、银行对账、数据报表生成等。当数据量达到几十万甚至上百万时如何高效、可靠地处理这些数据就成了一个技术挑战。本文将以50万笔工资代发为实际场景详细介绍如何使用Spring Batch框架来处理大规模批量数据并重点讲解当处理失败时如何实现部分回滚机制确保已成功处理的数据不会因为少量失败记录而全部回滚。一、什么是Spring Batch1.1 Spring Batch简介Spring Batch是一个轻量级的、全面的批处理框架由Spring团队开发旨在帮助企业开发健壮的批处理应用程序。它于2008年首次发布经过十多年的发展已经成为Java批处理领域的事实标准。Spring Batch的核心设计理念包括Chunk-oriented Processing块级处理将大量数据分批处理避免内存溢出事务管理每个Chunk作为一个独立的事务支持部分回滚容错机制支持跳过Skip、重试Retry等容错策略作业调度支持定时任务、手动触发等多种调度方式监控与统计提供完整的执行记录和统计信息1.2 核心概念详解Job作业Job是批处理的核心概念代表一个完整的批处理任务。一个Job可以包含多个Step按顺序或并行执行。Bean public Job salaryPaymentJob() { return jobBuilderFactory.get(salaryPaymentJob) .start(step1()) .next(step2()) .build(); }Step步骤Step是Job的基本执行单元每个Step包含ItemReader读取数据ItemProcessor处理数据可选ItemWriter写入数据Bean public Step salaryPaymentStep() { return stepBuilderFactory.get(salaryPaymentStep) .SalaryPayment, SalaryPaymentchunk(1000) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }Chunk数据块Chunk是Spring Batch处理数据的基本单位。每次从Reader读取指定数量的记录处理后一起提交到数据库读取1000条 → 处理1000条 → 写入1000条 → 提交事务1.3 应用场景Spring Batch适用于以下典型场景场景描述示例数据迁移跨系统数据同步从旧系统迁移数据到新系统数据转换ETL过程从数据库读取、转换、写入数据仓库批量处理定期批量操作月底工资代发、银行对账报表生成定期生成报表每日交易汇总报表1.4 与其他框架对比特性Spring BatchQuartzScheduled Executor批量处理✅ 专用需要需要事务管理✅ 内置无无容错机制✅ 完善的Skip/Retry无无监控统计✅ 数据库持久化基础无并行处理✅ 多种模式无基础为什么需要部分回滚想象一下你需要处理50万笔工资代发如果第49万笔记录因为银行卡号错误而失败在没有部分回滚机制的情况下前面489,999笔已成功处理的数据会全部回滚这对于业务来说是不可接受的。二、系统架构设计为了实现50万笔工资代发的高效处理我们设计了如下的系统架构系统架构上图展示了Spring Batch工资代发系统的分层架构Web层提供监控面板支持Job启动/停止、实时状态监控和统计信息查询Batch控制层REST API接口包含JobLauncher和JobRepositorySpring Batch核心层Job→Step→Chunk的处理流程包含Reader、Processor、Writer三大组件数据存储层MySQL数据库、CSV文件和Job执行日志2.1 核心组件说明组件职责实现类Job整个批处理任务SalaryPaymentJobStep任务中的一个步骤SalaryPaymentStepItemReader数据读取器FlatFileItemReader读取CSVItemProcessor数据处理器SalaryPaymentProcessor数据验证ItemWriter数据写入器JdbcBatchItemWriter批量写入数据库三、部分回滚机制原理3.1 Chunk-Oriented ProcessingSpring Batch采用Chunk-Oriented Processing块级处理模式这是实现部分回滚的核心机制批处理流程上图展示了Batch处理的核心流程Reader读取数据 → Processor处理验证 → Writer批量写入形成完整的处理管道。对于50万笔数据的处理Chunk机制的工作方式如下50万笔数据 │ ├─► Chunk 1 (1-1000笔) ──► 独立事务 ──► 成功提交 ├─► Chunk 2 (1001-2000笔) ──► 独立事务 ──► 成功提交 ├─► Chunk 3 (2001-3000笔) ──► 独立事务 ──► 第2500笔失败 → 重试3次 → 跳过 → 其余999笔提交 ├─► Chunk 4 (3001-4000笔) ──► 独立事务 ──► 成功提交 ... └─► Chunk 500 (499001-500000笔) ──► 独立事务 ──► 成功提交 最终结果499,999笔成功1笔被跳过关键配置chunkSize: 1000每1000笔提交一次skipLimit: 100最多跳过100笔失败记录retryLimit: 3每笔失败重试3次3.2 事务边界与部分回滚每个Chunk是独立的事务单元这是实现部分回滚的关键事务边界与部分回滚上图清晰地展示了事务边界和部分回滚的工作机制事务规则Chunk内任意记录失败 → 整个Chunk回滚重试成功 → 继续处理重试失败且可跳过 → 跳过该记录继续处理Chunk内剩余记录跳过次数超限 → 整个Job失败实际案例假设Chunk 3中有1000笔数据第500笔验证失败Spring Batch回滚整个Chunk 3重新读取Chunk 3的1000笔数据处理到第500笔时捕获异常重试3次后仍然失败检查是否可跳过IllegalArgumentException在跳过列表中跳过第500笔继续处理501-1000笔最终Chunk 3成功提交999笔1笔被跳过3.3 容错策略配置.faultTolerant() // 启用容错 .skipLimit(100) // 最多跳过100条 .skip(IllegalArgumentException.class) // 跳过数据验证异常 .skip(NullPointerException.class) // 跳过空指针异常 .retryLimit(3) // 失败重试3次 .retry(Exception.class) // 重试所有异常四、50万笔工资代发数据处理流程在理解了部分回滚机制后我们来看完整的工资代发数据处理流程50万笔工资代发数据处理流程上图展示了从CSV文件读取到数据库写入的完整数据流包含以下关键步骤数据读取FlatFileItemReader读取CSV文件每行映射为SalaryPayment对象数据验证SalaryPaymentProcessor进行数据校验员工ID非空验证金额范围验证0.01-100万银行卡号格式验证16-19位数字状态更新设置状态为PROCESSING生成唯一交易ID批量写入JdbcBatchItemWriter批量写入数据库异常处理验证失败的记录被跳过记录到失败列表五、核心代码实现5.1 Job配置Configuration public class SalaryPaymentJobConfig { Value(${batch.chunk.size:1000}) private int chunkSize; // 每次处理的记录数 Value(${batch.skip.limit:100}) private int skipLimit; // 跳过限制 Value(${batch.retry.limit:3}) private int retryLimit; // 重试次数 Bean public Step salaryPaymentStep() { return stepBuilderFactory .get(salaryPaymentStep) .SalaryPayment, SalaryPaymentchunk(chunkSize) .reader(salaryPaymentReader()) .processor(salaryPaymentProcessor()) .writer(salaryPaymentWriter()) .faultTolerant() // 启用容错 .skipLimit(skipLimit) .skip(IllegalArgumentException.class) .skip(NullPointerException.class) .retryLimit(retryLimit) .retry(Exception.class) .listener(new SalaryItemReadListener()) .listener(new SalaryItemWriteListener()) .build(); } Bean public Job salaryPaymentJob(Step step, SalaryJobExecutionListener listener) { return jobBuilderFactory.get(salaryPaymentJob) .incrementer(new RunIdIncrementer()) .listener(listener) .start(step) .build(); } }5.2 数据读取器Bean public FlatFileItemReaderSalaryPayment salaryPaymentReader() { FlatFileItemReaderSalaryPayment reader new FlatFileItemReader(); reader.setName(salaryPaymentReader); reader.setResource(new ClassPathResource(input/salary-payments.csv)); reader.setLinesToSkip(1); // 跳过CSV标题行 // 设置列映射 DelimitedLineTokenizer tokenizer new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{ employeeId, employeeName, accountNumber, accountName, bankName, amount, currency, paymentDate, remark }); // 设置字段映射 BeanWrapperFieldSetMapperSalaryPayment mapper new BeanWrapperFieldSetMapper(); mapper.setTargetType(SalaryPayment.class); DefaultLineMapperSalaryPayment lineMapper new DefaultLineMapper(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper(mapper); reader.setLineMapper(lineMapper); return reader; }5.3 数据处理器验证逻辑public class SalaryPaymentProcessor implements ItemProcessorSalaryPayment, SalaryPayment { private static final BigDecimal MIN_AMOUNT new BigDecimal(0.01); private static final BigDecimal MAX_AMOUNT new BigDecimal(1000000); Override public SalaryPayment process(SalaryPayment item) throws Exception { // 1. 数据验证 if (item.getEmployeeId() null || item.getEmployeeId().trim().isEmpty()) { throw new IllegalArgumentException(员工ID不能为空); } // 2. 金额验证 if (item.getAmount() null) { throw new IllegalArgumentException(发放金额不能为空); } if (item.getAmount().compareTo(MIN_AMOUNT) 0) { throw new IllegalArgumentException(发放金额不能小于0.01元); } if (item.getAmount().compareTo(MAX_AMOUNT) 0) { throw new IllegalArgumentException(发放金额不能大于100万元); } // 3. 银行卡号验证 if (item.getAccountNumber() null || item.getAccountNumber().length() 16 || item.getAccountNumber().length() 19) { throw new IllegalArgumentException(银行账号长度必须在16-19位之间); } // 4. 设置处理状态 item.setStatus(PROCESSING); item.setTransactionId(SAL System.currentTimeMillis() item.getEmployeeId()); return item; } }5.4 数据写入器public class SalaryPaymentWriter implements ItemWriterSalaryPayment { private final JdbcBatchItemWriterSalaryPayment delegate; public SalaryPaymentWriter(DataSource dataSource) { this.delegate new JdbcBatchItemWriter(); this.delegate.setDataSource(dataSource); this.delegate.setSql( INSERT INTO salary_payment (employee_id, employee_name, account_number, account_name, bank_name, amount, currency, payment_date, remark, status, transaction_id, create_time, update_time) VALUES (:employeeId, :employeeName, :accountNumber, :accountName, :bankName, :amount, :currency, :paymentDate, :remark, :status, :transactionId, :createTime, :updateTime)); this.delegate.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider() ); } Override public void write(List? extends SalaryPayment items) throws Exception { delegate.write(items); } }5.5 自定义SkipPolicyComponent public class PartialRollbackHandler implements SkipPolicy { private static final int SKIP_LIMIT 100; Override public boolean shouldSkip(Throwable throwable, int skipCount) { // 超过跳过限制 if (skipCount SKIP_LIMIT) { return false; } // 文件不存在不能跳过 if (throwable instanceof FileNotFoundException) { return false; } // 数据格式错误可以跳过 if (throwable instanceof FlatFileParseException) { return true; } // 数据验证失败可以跳过 if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) { return true; } return false; } }六、监控与调度架构除了数据处理Spring Batch还提供了完善的监控和调度能力监控与调度架构上图展示了完整的监控与调度架构调度层支持三种调度方式Quartz调度器支持分布式调度适合集群环境Spring Task调度简单的定时任务轻量级选择Cron表达式灵活的时间配置执行层核心执行组件JobLauncher启动作业创建执行上下文JobOperator操作作业支持停止/重启/重试StepExecution步骤执行采用Chunk处理模式ThreadPoolExecutor线程池实现并发处理监控层监控与统计JobRepository存储元数据BATCH_JOB_INSTANCE、BATCH_JOB_EXECUTION、BATCH_STEP_EXECUTIONJobExplorer查询作业状态、获取执行历史Metrics处理记录数、执行时间、失败率统计数据层数据存储MySQL 8.0存储元数据表、业务数据表、日志记录Redis缓存执行状态缓存、计数器、分布式锁七、数据库设计7.1 工资代发表CREATE TABLE salary_payment ( id BIGINT AUTO_INCREMENT PRIMARY KEY, employee_id VARCHAR(50) NOT NULL COMMENT 员工ID, employee_name VARCHAR(100) NOT NULL COMMENT 员工姓名, account_number VARCHAR(50) NOT NULL COMMENT 银行账号, account_name VARCHAR(100) NOT NULL COMMENT 账户名称, bank_name VARCHAR(100) NOT NULL COMMENT 开户行, amount DECIMAL(18,2) NOT NULL COMMENT 发放金额, currency VARCHAR(10) NOT NULL DEFAULT CNY COMMENT 币种, payment_date DATETIME NOT NULL COMMENT 发放日期, remark VARCHAR(500) COMMENT 备注, status VARCHAR(20) NOT NULL DEFAULT PENDING COMMENT 状态, transaction_id VARCHAR(100) COMMENT 交易ID, error_message VARCHAR(1000) COMMENT 错误信息, create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_employee_id (employee_id), INDEX idx_status (status) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;7.2 Spring Batch元表Spring Batch框架会自动创建以下元表来存储Job执行信息batch_job_instance- Job实例表batch_job_execution- Job执行表batch_job_execution_params- Job参数表batch_step_execution- Step执行表batch_step_execution_context- Step上下文表八、REST API设计RestController RequestMapping(/api/batch) public class BatchJobController { // 启动Job PostMapping(/start) public ResponseEntityMapString, Object startJob( RequestParam String inputFile ) { JobParameters params new JobParametersBuilder() .addLong(startTime, System.currentTimeMillis()) .addString(inputFile, inputFile) .toJobParameters(); JobExecution execution jobLauncher.run(salaryPaymentJob, params); return ResponseEntity.ok(result); } // 获取Job状态 GetMapping(/status/{jobExecutionId}) public ResponseEntityMapString, Object getJobStatus( PathVariable Long jobExecutionId ) { JobExecution execution jobRepository.getJobExecution(jobExecutionId); // 返回执行详情 } // 停止Job PostMapping(/stop/{jobExecutionId}) public ResponseEntityMapString, Object stopJob( PathVariable Long jobExecutionId ) { JobExecution execution jobRepository.getJobExecution(jobExecutionId); execution.stop(); return ResponseEntity.ok(result); } // 获取统计信息 GetMapping(/statistics) public ResponseEntityMapString, Object getStatistics() { // 返回总数、成功数、失败数等统计 } // 健康检查 GetMapping(/health) public ResponseEntityMapString, Object health() { // 返回系统健康状态 } }九、性能优化与并行处理当数据量达到50万甚至更多时单线程处理可能成为瓶颈。Spring Batch提供了多种并行处理方式。9.1 多线程并发处理Spring Batch支持多线程并发处理大幅提升处理效率Spring Batch支持多线程并发处理上图展示了多线程并发处理的工作原理核心机制主线程创建线程池分配任务工作线程并发执行多个Step或Chunk线程安全JobRepository保证线程安全的状态管理负载均衡任务均匀分配到各个线程配置示例Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix(salary-batch-); executor.initialize(); return executor; } // 在Step中使用 .step(stepName) .chunk(chunkSize) .taskExecutor(taskExecutor()) .throttleLimit(10) // 限制并发数 .build();9.2 分区处理Partitioning对于超大数据集可以使用分区处理实现更高程度的并行分区处理上图展示了分区处理的架构核心组件Master Step负责创建和管理分区Slave Step每个分区独立执行Partitioner将数据分成多个分区TaskExecutor线程池执行分区任务配置示例Bean public Step masterStep() { return stepBuilderFactory.get(masterStep) .partitioner(slaveStep().getName(), rangePartitioner(1, 10)) .step(slaveStep()) .gridSize(10) // 分成10个分区 .taskExecutor(taskExecutor()) .build(); } Bean public Partitioner rangePartitioner(int min, int max) { return new Partitioner() { Override public MapString, ExecutionContext partition(int gridSize) { MapString, ExecutionContext result new HashMap(); int range (max - min) / gridSize; for (int i 0; i gridSize; i) { ExecutionContext context new ExecutionContext(); context.putInt(minValue, min i * range); context.putInt(maxValue, min (i 1) * range - 1); result.put(partition i, context); } return result; } }; }9.3 调优参数参数推荐值说明chunkSize1000-5000根据记录大小调整越大吞吐量越高但内存占用也越大skipLimit100-500根据数据质量设置retryLimit3-5过多会浪费时间过少可能误判暂时性故障线程池大小CPU核心数*2用于多线程处理9.4 批量写入优化使用JDBC批量操作代替单条插入// 单条插入慢 for (SalaryPayment p : payments) { jdbcTemplate.update(sql, p.getId(), p.getName(), ...); } // 批量插入快 jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() { Override public void setValues(PreparedStatement ps, int i) throws SQLException { // 设置参数 } Override public int getBatchSize() { return payments.size(); } });9.5 索引优化-- 为常用查询字段添加索引 CREATE INDEX idx_employee_id ON salary_payment(employee_id); CREATE INDEX idx_status ON salary_payment(status); CREATE INDEX idx_create_time ON salary_payment(create_time); -- 复合索引 CREATE INDEX idx_status_employee ON salary_payment(status, employee_id);十、实际应用场景场景1月底工资代发某公司月底需要为50,000名员工发放工资使用Spring Batch设置chunkSize1000分成50个Chunk处理假设第23个Chunk中第23,456号员工银行卡号错误系统重试3次后跳过该记录最终结果49,999笔成功1笔记录到失败列表供后续处理场景2银行对账文件处理银行提供100万笔交易对账文件设置chunkSize5000提高处理效率使用多线程并发处理Partitioning完成后生成对账差异报告场景3数据报表生成每天凌晨生成T1交易报表使用Spring Task定时调度读取当日交易数据生成Excel报表并发送邮件十一、常见问题与解决方案Q1: Job执行一半挂了怎么办Spring Batch支持Job重启。通过JobRepository记录的执行状态可以从上次失败的位置继续执行.job(salaryPaymentJob) .allowStartIfComplete(false) // 已完成的Job不重新执行 .restartable(true) // 允许重启Q2: 如何实现并行处理使用Partitioning方式实现多线程并行处理Bean public Step masterStep() { return stepBuilderFactory.get(masterStep) .partitioner(slaveStep().getName(), partitioner()) .step(slaveStep()) .gridSize(10) // 分成10个分区并行处理 .taskExecutor(taskExecutor()) .build(); }Q3: 处理失败的数据如何重试可以通过以下方式重试查询statusFAILED的记录修正错误数据将status改回PENDING重新执行Job十二、总结Spring Batch作为成熟的批处理框架提供了完整的解决方案来处理大规模批量数据。适用场景银行对账、清算工资代发、批量转账报表生成、数据导出注意事项合理设置chunkSize平衡内存和性能配置合适的Skip和Retry策略做好失败记录的重处理机制定期清理Job执行历史数据