博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Javaweb——Spring Boot 系列(17)数据批量入库
阅读量:3933 次
发布时间:2019-05-23

本文共 19685 字,大约阅读时间需要 65 分钟。

数据批量入库

  • 在实际生活中,Web 应用面对的用户绝不止一两个,产生的数据也不知几十条,往往数据量是以批为单位的,为了解决大量数据写入数据库的问题,Spring 提供了一个 Spring Batch 用于处理大量数据。

一、Spring Batch

1、何为 Spring Batch

  • Spring Batch 是 Spring 中一个用于处理大量数据的框架,主要用于读取大量数据然后进行一定的处理后输出成指定的形式。

2、Spring Batch 的组成部分

  • Spring Batch 的组成大致可以分为下面几个部分:
    名称 作用
    JobRepository 用于注册 Job 的容器
    JobLauncher 启动 Job 接口
    Job 实际要执行的任务,由一个或多个 Step 构成
    Step 每个 Step 都包含 ItemReader、ItemProcessor 和 ItemWriter
    ItemReader 用来读取数据的接口
    ItemProcessor 用来读取数据的接口
    ItemWriter 用来输出数据的接口
  • 使用 Spring Batch 的时候,在配置类上用 @EnableBatchProcessing 开启批处理支持,然后在该类里编写 Bean 组成 Spring Batch。
  • 示意配置类如下:
    @Configuration@EnableBatchProcessingpublic class CsvBatchConfig {
    @Bean public ItemReader
    reader() throws Exception {
    …… return reader; } @Bean public ItemProcessor
    processor() {
    CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter
    writer(DataSource dataSource) {
    //1 …… return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
    …… } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
    …… return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) {
    return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader
    reader, ItemWriter
    writer, ItemProcessor
    processor) { return stepBuilderFactory .get("step1") .
    chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator
    csvBeanValidator() { return new CsvBeanValidator
    (); }}

3、Job 监听

  • 通常当程序带有任务执行的时候,我们都会编写一个监听类,用于监听任务的执行情况,对于批处理也是如此。
  • 通常为了对批处理进行监听,我们自定义一个监听类并实现 JobExecutionListener,然后再在定义 Job 的 Bean 上绑定该监听器,示例如下:
    public class CsvJobListener implements JobExecutionListener {
    // 自定义监听器类}// 注册监听器@Beanpublic CsvJobListener csvJobListener() {
    return new CsvJobListener();}@Beanpublic Job importJob(JobBuilderFactory jobs, Step s1) {
    return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) .end() .listener(csvJobListener()) //绑定监听器 .build();}

4、数据读取和输出

  • 数据读取并不需要我们编辑自定义类,因为 Spring Batch 为我们提供了读取不同数据源的大量 Item Reader;对于数据输入也是如此,Spring Batch 提供了大量 ItemWriter 来满足将数据输出到不同目的地。

5、数据处理和校验

  • 数据处理和校验都需要继承 ItemProcessor 接口,具体做法就是自定义一个 ItemProcessor 并实现 VaildatingItemProcessor 接口,从而既能够进行数据处理也能进行数据校验,示例如下:
    public class CsvItemProcessor extends ValidatingItemProcessor
    {
    @Override public Person process(Person item) throws ValidationException {
    super.process(item); // 具体处理数据的代码 return item; }}

6、参数后置绑定

  • 为了避免参数硬编码在 Bean 初始化中的这种方式的局限,Spring Batch 支持参数后置绑定。
  • 要实现参数后置绑定,可以在 JobParamters 中绑定参数,在 Bean 定义的时候使用一个特殊的 Bean 生命周期注解 @StepScope,然后通过 @Value 注入参数,示例如下:
    // 参数绑定String path = "people.csv";    jobParameters = new JobParametersBuilder()            .addLong("time", System.currentTimeMillis())            .addString("input.file.name", path)            .toJobParameters();    jobLauncher.run(importJob,jobParameters);   // 定义 Bean   @Bean@StepScopepublic FlatFileItemReader
    reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception {
    FlatFileItemReader
    reader = new FlatFileItemReader
    (); reader.setResource(new ClassPathResource(pathToFile)); return reader;}

7、Spring Batch 配置

  • 在 Spring Boot 项目中,可以在 applicaton.properties 使用前缀为 spring.batch 对 Spring Batch 进行相关配置,常用配置如下:
    spring.batch.job.name= #启动时要执行的任务,默认全部spring.batch.job.enabled=true # 是否自动执行自定义的 Job,默认是spring.batch.initializer.enabled=true # 是否初始化 Spring Batch 的数据库,默认是spring.batch.schems=spring.batch.table-prefix= #设置 Spring batch 的数据库表的前缀

二、项目示例

  • 示例读取 csv 文件的数据并用 JDBC 批处理方式写入数据库。

1、新建项目

  • 新建一个 Spring Boot 项目,初始依赖选择 JDBC、Batch 和 Web。
  • 等编辑器初始号好项目后,先去除 Spring Batch 加载的 hsqldb,然后再添加 ojdbc6.jar 依赖以及 hibernate-validator 依赖,后者用于数据校验。
  • POM 文件具体内容如下:
    4.0.0
    org.springframework.boot
    spring-boot-starter-parent
    1.3.0.M2
    com.pyc
    batch
    0.0.1-SNAPSHOT
    batch
    Demo project for Spring Boot
    UTF-8
    1.8
    org.hibernate
    hibernate-validator
    com.oracle
    ojdbc6
    11.2.0.2.0
    org.springframework.boot
    spring-boot-starter-batch
    org.hsqldb
    hsqldb
    org.springframework.boot
    spring-boot-starter-jdbc
    org.springframework.boot
    spring-boot-starter-web
    org.springframework.boot
    spring-boot-starter-test
    test
    org.junit.vintage
    junit-vintage-engine
    org.springframework.batch
    spring-batch-test
    test
    org.springframework.boot
    spring-boot-maven-plugin

2、项目配置

  • 在 application.properties 编辑如下内容,配置项目:
    spring.datasource.driverClassName=oracle.jdbc.OracleDriverspring.datasource.url=jdbc\:oracle\:thin\:@localhost\:1521\:xespring.datasource.username=pycspring.datasource.password=pycspring.batch.job.enabled=truelogging.file=log.loglogging.level.org.springframework.web = DEBUG

3、数据表定义和测试数据准备

  • 在 resources 目录下新建一个 people.csv 文件,编辑如下数据用于测试:
    pyc,21,汉族,汕尾ycy,21,汉族,肇庆yqy,67,汉族,余姚lb,100,古汉族,江东qf,100,古汉族,长安
  • 再新建一个 schema.sql 文件,编辑如下 SQL 语句
    create table PERSON(    id NUMBER not null primary key,    name varchar(20),    age number,    nation varchar(20),    address varchar(20));
  • 同样的在第一次运行成功后将文件删除或者改名。

4、领域模型类

  • 代码如下:
    package com.pyc.batch.domain;import javax.validation.constraints.Size;public class Person {
    // 用 JSR-303 注解校验数据 @Size(max = 4,min = 2) private String name; private int age; private String nation; private String address; public String getName() {
    return name; } public void setName(String name) {
    this.name = name; } public void setAge(int age) {
    this.age = age; } public int getAge() {
    return age; } public void setAddress(String address) {
    this.address = address; } public String getAddress() {
    return address; } public void setNation(String nation) {
    this.nation = nation; } public String getNation() {
    return nation; }}

5、数据处理和校验

  • 首先是数据处理的类,代码如下:
    package com.pyc.batch.mybatch;import com.pyc.batch.domain.Person;import org.springframework.batch.item.validator.ValidatingItemProcessor;import org.springframework.batch.item.validator.ValidationException;public class CsvItemProcessor extends ValidatingItemProcessor
    {
    @Override public Person process(Person item) throws ValidationException {
    super.process(item); if(item.getNation().equals("汉族")){
    item.setNation("01"); }else {
    item.setNation("02"); } return item; }}
  • 校验的类
    package com.pyc.batch.mybatch;import org.springframework.batch.item.validator.ValidationException;import org.springframework.batch.item.validator.Validator;import org.springframework.beans.factory.InitializingBean;import javax.validation.ConstraintViolation;import javax.validation.Validation;import javax.validation.ValidatorFactory;import java.util.Set;public class CsvBeanValidator
    implements Validator
    , InitializingBean {
    private javax.validation.Validator validator; @Override public void afterPropertiesSet() throws Exception {
    ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } @Override public void validate(T t) throws ValidationException {
    Set
    > constraintViolations = validator.validate(t); if(constraintViolations.size()>0){
    StringBuilder message = new StringBuilder(); for(ConstraintViolation
    constraintViolation : constraintViolations){
    message.append(constraintViolation.getMessage()).append("\n"); } throw new ValidationException(message.toString()); } }}

6、Job 监听

  • 监听 Job 的完成情况
    package com.pyc.batch.mybatch;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.JobExecutionListener;public class CsvJobListener implements JobExecutionListener {
    long startTime; long endTime; @Override public void beforeJob(JobExecution jobExecution) {
    startTime = System.currentTimeMillis(); System.out.println("任务处理开始"); } @Override public void afterJob(JobExecution jobExecution) {
    endTime = System.currentTimeMillis(); System.out.println("任务处理结束"); System.out.println("耗时:" + (endTime - startTime) + "ms"); }}

7、配置

  • 这里准备了两个配置类,一个是用于 JDBC 自动批处理的配置类,一个用于手动触动批处理的类。
  • 自动批处理的配置类
    @Configuration@EnableBatchProcessingpublic class CsvBatchConfig {
    @Bean public ItemReader
    reader() throws Exception {
    FlatFileItemReader
    reader = new FlatFileItemReader
    (); //1 reader.setResource(new ClassPathResource("people.csv")); //2 reader.setLineMapper(new DefaultLineMapper
    () {
    {
    //3 setLineTokenizer(new DelimitedLineTokenizer() {
    {
    setNames(new String[] {
    "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper
    () {
    {
    setTargetType(Person.class); }}); }}); return reader; } @Bean public ItemProcessor
    processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter
    writer(DataSource dataSource) { //1 JdbcBatchItemWriter
    writer = new JdbcBatchItemWriter
    (); //2 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider
    ()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; writer.setSql(sql); //3 writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("oracle"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader
    reader, ItemWriter
    writer, ItemProcessor
    processor) { return stepBuilderFactory .get("step1") .
    chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator
    csvBeanValidator() { return new CsvBeanValidator
    (); }}
  • 手动触动批处理的配置类,代码如下:
    package com.pyc.batch.mybatch;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.core.launch.support.RunIdIncrementer;import org.springframework.batch.core.launch.support.SimpleJobLauncher;import org.springframework.batch.core.repository.JobRepository;import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.batch.item.validator.Validator;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.ClassPathResource;import org.springframework.transaction.PlatformTransactionManager;import com.pyc.batch.domain.Person;//@Configuration@EnableBatchProcessingpublic class TriggerBatchConfig {
    @Bean @StepScope public FlatFileItemReader
    reader(@Value("#{jobParameters['input.file.name']}") String pathToFile) throws Exception {
    FlatFileItemReader
    reader = new FlatFileItemReader
    (); //1 reader.setResource(new ClassPathResource(pathToFile)); //2 reader.setLineMapper(new DefaultLineMapper
    () {
    {
    //3 setLineTokenizer(new DelimitedLineTokenizer() {
    {
    setNames(new String[] {
    "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper
    () {
    {
    setTargetType(Person.class); }}); }}); return reader; } @Bean public ItemProcessor
    processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } @Bean public ItemWriter
    writer(DataSource dataSource) { //1 JdbcBatchItemWriter
    writer = new JdbcBatchItemWriter
    (); //2 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider
    ()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; writer.setSql(sql); //3 writer.setDataSource(dataSource); return writer; } @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("oracle"); return jobRepositoryFactoryBean.getObject(); } @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader
    reader, ItemWriter
    writer, ItemProcessor
    processor) { return stepBuilderFactory .get("step1") .
    chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator
    csvBeanValidator() { return new CsvBeanValidator
    (); }}
  • 两个配置类大部分内容是一样的,不同的就是一个 ItemReader。
  • 在使用时,想用自动配置类,就把自动配置类上的 @Configuration 注释取消掉,然后把手动配置类上的 @Configuration 注释;反之,想使用手动配置类那就开启 @Configuration 注解,并在 application.properties 文件中将 spring.batch.job.enabled 的值设置为 false。

8、Controller

  • 用于手动触发批处理时,代码如下:
    package com.pyc.batch.web;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.JobParametersBuilder;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class DemoController {
    @Autowired JobLauncher jobLauncher; @Autowired Job importJob; public JobParameters jobParameters; @RequestMapping("/read") public String imp(String fileName) throws Exception{
    String path = fileName+".csv"; jobParameters = new JobParametersBuilder() .addLong("time", System.currentTimeMillis()) .addString("input.file.name", path) .toJobParameters(); jobLauncher.run(importJob,jobParameters); return "ok"; }}

9、测试

  • 首先测试自动批处理,项目直接运行即可。
  • 手动触发批处理,访问 localhost:8080/read?fileName=people。
  • 无论是自动还是手动,控制台都有如下关键信息:
    在这里插入图片描述
  • 查看数据库生成七个表
    在这里插入图片描述
  • 其中 Person 表的数据如下:
    在这里插入图片描述

转载地址:http://blqgn.baihongyu.com/

你可能感兴趣的文章
矩阵积分
查看>>
laplacian matrix
查看>>
cotangent matrix or laplacian mesh operator
查看>>
Minimizing quadratic energies with constant constraints
查看>>
Python-第三方库requests详解
查看>>
暴力破解黄巴登录网站
查看>>
python多线程
查看>>
read selection
查看>>
optimization on macOS
查看>>
Template-Based 3D Model Fitting Using Dual-Domain Relaxation
查看>>
install libfreenect2 on ubuntu 16.04
查看>>
how to use automake to build files
查看>>
using matlab drawing line graph for latex
查看>>
How package finding works
查看>>
build opencv3.3.0 with VTK8.0, CUDA9.0 on ubuntu9.0
查看>>
how to compile kinfu_remake with cuda 9.0 opencv2.4.13.4
查看>>
qtcreator4.4.1中cmake 与cmake3.5.1本身generate出来的setting是有区别的解决方法
查看>>
CMake Useful Variables/Logging Useful Variables
查看>>
使用cmake建立工程链接OPENNI2
查看>>
ubuntu下解决csdn网页打不开的问题
查看>>