spring batch + spring boot 配置

淩亂°似流年 2022-08-10 00:51 332阅读 0赞
  1. spring batch 批处理配置

    import java.io.IOException;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecutionListener;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.batch.core.launch.support.RunIdIncrementer;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.ParseException;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.LineMapper;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.transaction.PlatformTransactionManager;

    /**

    • spring batch 配置
    • @author
      */
      @Configuration
      @EnableBatchProcessing
      public class BlackListBatchConfiguration {
  1. private static final Logger logger = LoggerFactory.getLogger(BlackListBatchConfiguration.class);
  2. /**
  3. * 读取外部文件方法
  4. * @return
  5. * @throws IOException
  6. */
  7. @Bean
  8. @StepScope
  9. public ItemReader<BlackListDO> reader(@Value("#{jobParameters[inputFileBlack]}") String inputFile) throws IOException {
  10. logger.info("inputFile:"+new ClassPathResource(inputFile).getURL().getPath());
  11. if(inputFile == null){
  12. logger.error("The blacklist reader file is null");
  13. return null;
  14. }
  15. FlatFileItemReader<BlackListDO> reader = new FlatFileItemReader<BlackListDO>();
  16. reader.setResource(new ClassPathResource(inputFile));
  17. reader.setLineMapper(lineMapper());
  18. reader.setLinesToSkip(1);
  19. reader.open(JobCompletionNotificationListener.jobExecution.getExecutionContext());
  20. return reader;
  21. }
  22. /**
  23. * 读取文本行映射POJO
  24. * @return
  25. */
  26. @Bean
  27. @StepScope
  28. public LineMapper<BlackListDO> lineMapper() {
  29. DefaultLineMapper<BlackListDO> lineMapper = new DefaultLineMapper<BlackListDO>();
  30. DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
  31. lineTokenizer.setDelimiter(",");
  32. lineTokenizer.setStrict(false);
  33. lineTokenizer.setNames(new String[] { "type","value","fraudType"});
  34. BeanWrapperFieldSetMapper<BlackListDO> fieldSetMapper = new BeanWrapperFieldSetMapper<BlackListDO>();
  35. fieldSetMapper.setTargetType(BlackListDO.class);
  36. lineMapper.setLineTokenizer(lineTokenizer);
  37. lineMapper.setFieldSetMapper(new BlackListFieldSetMapper());
  38. return lineMapper;
  39. }
  40. /**
  41. * 处理过程
  42. * @return
  43. */
  44. @Bean
  45. @StepScope
  46. public ItemProcessor<BlackListDO, BlackListDO> processor(@Value("#{jobParameters[inputFileBlack]}") String inputFile) {
  47. return new BlackListDOItemProcessor(inputFile);
  48. }
  49. /**
  50. * 写出内容
  51. * @return
  52. */
  53. @Bean
  54. @StepScope
  55. public ItemWriter<BlackListDO> writer() {
  56. return new BlackListItemWriter();
  57. }
  58. /**
  59. * 构建job
  60. * @param jobs
  61. * @param s1
  62. * @param listener
  63. * @return
  64. */
  65. @Bean
  66. public Job importFileJob(JobBuilderFactory jobs, Step step1,JobExecutionListener listener,JobRepository jobRepository) {
  67. return jobs.get("importFileJob")
  68. .incrementer(new RunIdIncrementer())
  69. .repository(jobRepository)
  70. .listener(listener)
  71. .flow(step1)
  72. .end()
  73. .build();
  74. }
  75. /**
  76. * 声明step
  77. * @param stepBuilderFactory
  78. * @param reader
  79. * @param writer
  80. * @param processor
  81. * @return
  82. */
  83. @Bean
  84. public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<BlackListDO> reader,
  85. ItemWriter<BlackListDO> writer, ItemProcessor<BlackListDO, BlackListDO> processor,PlatformTransactionManager transactionManager) {
  86. logger.error("step1");
  87. return stepBuilderFactory.get("step1")
  88. .<BlackListDO, BlackListDO> chunk(500)
  89. .reader(reader)
  90. .processor(processor)
  91. .writer(writer)
  92. .faultTolerant()
  93. .retry(Exception.class) // 重试
  94. .noRetry(ParseException.class)
  95. .retryLimit(1) //每条记录重试一次
  96. .listener(new RetryFailuireItemListener())
  97. .skip(Exception.class)
  98. .skipLimit(500) //一共允许跳过200次异常
  99. .taskExecutor(new SimpleAsyncTaskExecutor()) //设置并发方式执行
  100. .throttleLimit(10) //并发任务数为 10,默认为4
  101. .transactionManager(transactionManager)
  102. .build();
  103. }
  104. }

BlackListDOItemProcessor 处理类

  1. import java.sql.Timestamp;
  2. import java.util.Date;
  3. import java.util.UUID;
  4. import org.springframework.batch.item.ItemProcessor;
  5. import com.BlackListDO;
  6. /**
  7. * @author zhengyong
  8. *
  9. */
  10. public class BlackListDOItemProcessor implements ItemProcessor<BlackListDO, BlackListDO> {
  11. public String inputFile;
  12. public BlackListDOItemProcessor() {
  13. }
  14. public BlackListDOItemProcessor(String inputFile) {
  15. this.inputFile = inputFile;
  16. }
  17. // 数据处理
  18. public BlackListDO process(BlackListDO blackListDO) throws Exception {
  19. blackListDO.setDeleteFlag(0);
  20. blackListDO.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));
  21. return blackListDO;
  22. }
  23. }

BlackListItemWriter 写入数据库类

  1. import org.springframework.batch.item.ItemWriter;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import com.BlackListDO;
  4. public class BlackListItemWriter implements ItemWriter<BlackListDO> {
  5. @Override
  6. public void write(List<? extends BlackListDO> blackList) throws Exception {
  7. // 插入数据库操作
  8. }
  9. }

JobCompletionNotificationListener 监听任务

  1. import org.springframework.batch.core.BatchStatus;
  2. import org.springframework.batch.core.JobExecution;
  3. import org.springframework.batch.core.listener.JobExecutionListenerSupport;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
  8. @Override
  9. public void afterJob(JobExecution jobExecution) {
  10. }
  11. @Override
  12. public void beforeJob(JobExecution jobExecution) {
  13. super.beforeJob(jobExecution);
  14. }
  15. }

RetryFailuireItemListener 重试监听

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.retry.RetryCallback;
  4. import org.springframework.retry.RetryContext;
  5. import org.springframework.retry.RetryListener;
  6. public class RetryFailuireItemListener implements RetryListener{
  7. private static final Logger logger = LoggerFactory.getLogger(RetryFailuireItemListener.class);
  8. @Override
  9. public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
  10. return true;
  11. }
  12. @Override
  13. public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
  14. Throwable throwable) {
  15. }
  16. @Override
  17. public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
  18. Throwable throwable) {
  19. logger.error("【重试异常】:"+throwable.getMessage());
  20. }
  21. }
  1. spring boot 配置

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.builder.SpringApplicationBuilder;
    import org.springframework.boot.context.web.SpringBootServletInitializer;
    import org.springframework.context.annotation.ComponentScan;

    @ComponentScan(“com.syncclient”)
    @SpringBootApplication
    public class SpringBootJspApplication extends SpringBootServletInitializer{

    1. /**
    2. * 500一批
    3. * oracle : 单条插入基本每分钟2.5W条(50W,19.5min) ,批量插入基本每分钟10W条(50W,4.7mim)
    4. * mysql : 单条插入基本每分钟2.5W条(50W,11.4min) ,批量插入基本每分钟40W条(50W,1.3min)
    5. */
    6. @Override
    7. protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
    8. return application.sources(SpringBootJspApplication.class);
    9. }
    10. public static void main(String[] args) throws Exception {
    11. SpringApplication.run(SpringBootJspApplication.class,new String[]{"appStart=true"});
    12. }

    }

  2. 配置文件

    mysql config

    spring.boot.database = mysql

    //127.0.0.1:3306/spring_batch?useUnicode=true&characterEncoding=utf8" class="reference-link">spring.datasource.url = jdbc:mysql://127.0.0.1:3306/spring_batch?useUnicode=true&characterEncoding=utf8

    spring.datasource.username = admin

    spring.datasource.password = 123456

    spring.datasource.driverClassName = com.mysql.jdbc.Driver

    spring.batch.schema = classpath:/org/springframework/batch/core/schema-mysql.sql

    spring.batch.drop = classpath:/org/springframework/batch/core/schema-drop-mysql.sql

    oracle config

    spring.boot.database = oracle
    spring.datasource.url = jdbc:oracle:thin:@127.0.0.1:1521:spring_batch
    spring.datasource.username = admin
    spring.datasource.password = 123456
    spring.datasource.driverClassName = oracle.jdbc.driver.OracleDriver
    spring.batch.schema = classpath:org/springframework/batch/core/schema-oracle10g.sql
    spring.batch.drop = classpath:org/springframework/batch/core/schema-drop-oracle10g.sql

    batch config

    spring.batch.job.names = importFileJob
    spring.batch.job.enabled = true
    spring.batch.initializer.enabled=true

  3. mybatis 配置

    import javax.sql.DataSource;

    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    import org.springframework.transaction.PlatformTransactionManager;

    import com.alibaba.druid.pool.DruidDataSource;

    import com.BlackListDao;
    import com.ProperitesUtil;

    /**

    • mybatis配置
      /
      @Configuration
      public class MyBatisConfiguration {

      private static final Logger logger = LoggerFactory.getLogger(MyBatisConfiguration.class);

      @Autowired
      SqlSessionFactory sqlSessionFactory;
      @Autowired
      SqlSessionTemplate sessionTemplate;

      @Bean
      public SqlSessionTemplate sqlSessionTemplate() {

      1. return new SqlSessionTemplate(sqlSessionFactory);

      }

      @Bean
      public BlackListDao blackListMapper() {

      1. return sessionTemplate.getMapper(BlackListDao.class);

      }

      @Bean
      @Primary
      @ConfigurationProperties(prefix = “spring.datasource”)
      public DataSource dataSource() {

      1. logger.debug("初始化dataSource");
      2. return new DruidDataSource();

      }

      @Bean
      public PlatformTransactionManager transactionManager(DataSource dataSource) {

      1. PlatformTransactionManager transactionManager = new DataSourceTransactionManager(dataSource);
      2. return transactionManager;

      }

      @Bean
      public SqlSessionFactory sqlSessionFactory() throws Exception {

      1. SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
      2. sqlSessionFactoryBean.setDataSource(dataSource());
      3. PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
      4. // 获取数据库类型
      5. String dataBaseType = ProperitesUtil.getPropertyValue("spring.boot.database") == null ? "mysql"
      6. : ProperitesUtil.getPropertyValue("spring.boot.database");
      7. String directory = "classpath:/mapper/" + dataBaseType.trim().toLowerCase() + "/*.xml";
      8. sqlSessionFactoryBean.setMapperLocations(resolver.getResources(directory));
      9. return sqlSessionFactoryBean.getObject();

      }

      }

  1. pom.xml文件


    4.0.0

    com.spring.batch
    batch-test
    0.0.1-SNAPSHOT
    jar

    batch-test
    http://maven.apache.org


    org.springframework.boot
    spring-boot-starter-parent
    1.2.5.RELEASE




    org.springframework.boot
    spring-boot-starter-web


    org.springframework.boot
    spring-boot-starter-batch


    org.springframework.batch
    spring-batch-core
    3.0.4.RELEASE


    org.springframework.boot
    spring-boot-starter-tomcat
    provided


    org.apache.tomcat.embed
    tomcat-embed-jasper
    provided


    javax.servlet
    jstl


    org.springframework.boot
    spring-boot-starter-test
    test



发表评论

表情:
评论列表 (有 0 条评论,332人围观)

还没有评论,来说两句吧...

相关阅读

    相关 spring batch

            springBatch一个轻量级,全面的批处理框架,旨在开发强大的批处理应用程序,对于企业系统的日常运营至关重要。提供可重复使用的功能,这些功能在处理大量记录

    相关 Spring Boot整合Spring Batch

    引言 >   Spring Batch是处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定的处理后输出指定的形式。比如我们可以将csv文件中的数据(数据量几百