本文共 19685 字,大约阅读时间需要 65 分钟。
名称 | 作用 |
---|---|
JobRepository | 用于注册 Job 的容器 |
JobLauncher | 启动 Job 接口 |
Job | 实际要执行的任务,由一个或多个 Step 构成 |
Step | 每个 Step 都包含 ItemReader、ItemProcessor 和 ItemWriter |
ItemReader | 用来读取数据的接口 |
ItemProcessor | 用来读取数据的接口 |
ItemWriter | 用来输出数据的接口 |
@Configuration@EnableBatchProcessingpublic class CsvBatchConfig { @Bean public ItemReaderreader() throws Exception { …… return reader; } @Bean public ItemProcessor processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter writer(DataSource dataSource) { //1 …… return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { …… } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { …… return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor) { return stepBuilderFactory .get("step1") . chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator csvBeanValidator() { return new CsvBeanValidator (); }}
public class CsvJobListener implements JobExecutionListener { // 自定义监听器类}// 注册监听器@Beanpublic CsvJobListener csvJobListener() { return new CsvJobListener();}@Beanpublic Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .listener(csvJobListener()) //绑定监听器 .build();}
public class CsvItemProcessor extends ValidatingItemProcessor{ @Override public Person process(Person item) throws ValidationException { super.process(item); // 具体处理数据的代码 return item; }}
// 参数绑定String path = "people.csv"; jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .addString("input.file.name", path) .toJobParameters(); jobLauncher.run(importJob,jobParameters); // 定义 Bean @Bean@StepScopepublic FlatFileItemReaderreader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception { FlatFileItemReader reader = new FlatFileItemReader (); reader.setResource(new ClassPathResource(pathToFile)); return reader;}
spring.batch.job.name= #启动时要执行的任务,默认全部spring.batch.job.enabled=true # 是否自动执行自定义的 Job,默认是spring.batch.initializer.enabled=true # 是否初始化 Spring Batch 的数据库,默认是spring.batch.schems=spring.batch.table-prefix= #设置 Spring batch 的数据库表的前缀
4.0.0 org.springframework.boot spring-boot-starter-parent 1.3.0.M2 com.pyc batch 0.0.1-SNAPSHOT batch Demo project for Spring Boot UTF-8 1.8 org.hibernate hibernate-validator com.oracle ojdbc6 11.2.0.2.0 org.springframework.boot spring-boot-starter-batch org.hsqldb hsqldb org.springframework.boot spring-boot-starter-jdbc org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.springframework.batch spring-batch-test test org.springframework.boot spring-boot-maven-plugin
spring.datasource.driverClassName=oracle.jdbc.OracleDriverspring.datasource.url=jdbc\:oracle\:thin\:@localhost\:1521\:xespring.datasource.username=pycspring.datasource.password=pycspring.batch.job.enabled=truelogging.file=log.loglogging.level.org.springframework.web = DEBUG
pyc,21,汉族,汕尾ycy,21,汉族,肇庆yqy,67,汉族,余姚lb,100,古汉族,江东qf,100,古汉族,长安
create table PERSON( id NUMBER not null primary key, name varchar(20), age number, nation varchar(20), address varchar(20));
package com.pyc.batch.domain;import javax.validation.constraints.Size;public class Person { // 用 JSR-303 注解校验数据 @Size(max = 4,min = 2) private String name; private int age; private String nation; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public void setAge(int age) { this.age = age; } public int getAge() { return age; } public void setAddress(String address) { this.address = address; } public String getAddress() { return address; } public void setNation(String nation) { this.nation = nation; } public String getNation() { return nation; }}
package com.pyc.batch.mybatch;import com.pyc.batch.domain.Person;import org.springframework.batch.item.validator.ValidatingItemProcessor;import org.springframework.batch.item.validator.ValidationException;public class CsvItemProcessor extends ValidatingItemProcessor{ @Override public Person process(Person item) throws ValidationException { super.process(item); if(item.getNation().equals("汉族")){ item.setNation("01"); }else { item.setNation("02"); } return item; }}
package com.pyc.batch.mybatch;import org.springframework.batch.item.validator.ValidationException;import org.springframework.batch.item.validator.Validator;import org.springframework.beans.factory.InitializingBean;import javax.validation.ConstraintViolation;import javax.validation.Validation;import javax.validation.ValidatorFactory;import java.util.Set;public class CsvBeanValidatorimplements Validator , InitializingBean { private javax.validation.Validator validator; @Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } @Override public void validate(T t) throws ValidationException { Set > constraintViolations = validator.validate(t); if(constraintViolations.size()>0){ StringBuilder message = new StringBuilder(); for(ConstraintViolation constraintViolation : constraintViolations){ message.append(constraintViolation.getMessage()).append("\n"); } throw new ValidationException(message.toString()); } }}
package com.pyc.batch.mybatch;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.JobExecutionListener;public class CsvJobListener implements JobExecutionListener { long startTime; long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); System.out.println("任务处理开始"); } @Override public void afterJob(JobExecution jobExecution) { endTime = System.currentTimeMillis(); System.out.println("任务处理结束"); System.out.println("耗时:" + (endTime - startTime) + "ms"); }}
@Configuration@EnableBatchProcessingpublic class CsvBatchConfig { @Bean public ItemReaderreader() throws Exception { FlatFileItemReader reader = new FlatFileItemReader (); //1 reader.setResource(new ClassPathResource("people.csv")); //2 reader.setLineMapper(new DefaultLineMapper () { { //3 setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper () { { setTargetType(Person.class); }}); }}); return reader; } @Bean public ItemProcessor processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter writer(DataSource dataSource) { //1 JdbcBatchItemWriter writer = new JdbcBatchItemWriter (); //2 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider ()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; writer.setSql(sql); //3 writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("oracle"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor) { return stepBuilderFactory .get("step1") . chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator csvBeanValidator() { return new CsvBeanValidator (); }}
package com.pyc.batch.mybatch;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.core.launch.support.SimpleJobLauncher;import org.springframework.batch.core.repository.JobRepository;import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.batch.item.validator.Validator;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import org.springframework.transaction.PlatformTransactionManager;import com.pyc.batch.domain.Person;//@Configuration@EnableBatchProcessingpublic class TriggerBatchConfig { @Bean @StepScope public FlatFileItemReaderreader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception { FlatFileItemReader reader = new FlatFileItemReader (); //1 reader.setResource(new ClassPathResource(pathToFile)); //2 reader.setLineMapper(new DefaultLineMapper () { { //3 setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper () { { setTargetType(Person.class); }}); }}); return reader; } @Bean public ItemProcessor processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter writer(DataSource dataSource) { //1 JdbcBatchItemWriter writer = new JdbcBatchItemWriter (); //2 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider ()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; writer.setSql(sql); //3 writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("oracle"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader reader, ItemWriter writer, ItemProcessor processor) { return stepBuilderFactory .get("step1") . chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator csvBeanValidator() { return new CsvBeanValidator (); }}
package com.pyc.batch.web;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.JobParametersBuilder;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class DemoController { @Autowired JobLauncher jobLauncher; @Autowired Job importJob; public JobParameters jobParameters; @RequestMapping("/read") public String imp(String fileName) throws Exception{ String path = fileName+".csv"; jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .addString("input.file.name", path) .toJobParameters(); jobLauncher.run(importJob,jobParameters); return "ok"; }}
转载地址:http://blqgn.baihongyu.com/