激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Spring Batch輕量級批處理框架實戰

Spring Batch輕量級批處理框架實戰

2022-02-22 00:49Mr.Ymx Java教程

本文主要介紹了Spring Batch輕量級批處理框架實戰,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下

1 實戰前的理論基礎

1.1 Spring Batch是什么

Spring Batch 是一個輕量級、全面的批處理框架,旨在支持開發對企業系統日常運營至關重要的強大的批處理應用程序。同時使開發人員在必要時可以輕松訪問和利用更先進的企業服務。Spring Batch 不是調度框架,它旨在與調度程序一起工作,而不是取代調度程序。

1.2 Spring Batch能做什么

  • 自動化、復雜的大量信息處理,無需用戶交互即可最有效地處理。這些操作通常包括基于時間的事件(例如月末計算、通知或通信)。
  • 定期應用在非常大的數據集上重復處理的復雜業務規則(例如,保險福利確定或費率調整)。
  • 將從內部和外部系統接收的信息集成到記錄系統中,這些信息通常需要以事務方式進行格式化、驗證和處理。批處理用于每天為企業處理數十億筆交易。

業務場景:

  • 定期提交批處理
  • 并發批處理:作業的并行處理
  • 分階段的、企業消息驅動的處理
  • 大規模并行批處理
  • 失敗后手動或計劃重啟
  • 依賴步驟的順序處理(擴展到工作流驅動的批處理)
  • 部分處理:跳過記錄(例如,在回滾時)
  • 整批事務,適用于小批量或現有存儲過程/腳本的情況

總之Spring batch可以做的:

  • 從數據庫、文件或隊列中讀取大量記錄。
  • 以某種方式處理數據。
  • 以修改后的形式寫回數據。

1.3 基礎架構

Spring Batch輕量級批處理框架實戰

1.4 核心概念和抽象

Spring Batch輕量級批處理框架實戰

核心概念:一個 Job 有一對多的Step,每個步驟都正好有一個 ItemReader、一個ItemProcessor和 一個ItemWriter。需要啟動作業(使用 JobLauncher),并且需要存儲有關當前運行進程的元數據(在 中 JobRepository)。

 

2 各個組件介紹

2.1 Job

Job是封裝了整個批處理過程的實體。與其他 Spring 項目一樣,一個Job與 XML 配置文件或基于 Java 的配置連接在一起。這種配置可以被稱為“作業配置”。

Spring Batch輕量級批處理框架實戰

可配置項:

  • 作業的簡單名稱。
  • Step實例的定義和排序。
  • 作業是否可重新啟動。

2.2 Step

一個Step是一個域對象,它封裝了批處理作業的一個獨立的、連續的階段。因此,每個 Job 完全由一個或多個步驟組成。一個Step包含定義和控制實際批處理所需的所有信息。

Spring Batch輕量級批處理框架實戰

一個StepExecution代表一次嘗試執行一個Step。StepExecution 每次Step運行時都會創建一個新的,類似于JobExecution。

2.3 ExecutionContext

一個ExecutionContext表示由框架持久化和控制的鍵/值對的集合,以允許開發人員有一個地方來存儲范圍為StepExecution對象或JobExecution對象的持久狀態。

2.4 JobRepository

JobRepository是上述所有 Stereotypes 的持久性機制。它提供了CRUD操作JobLauncher,Job以及Step實現。當 Job第一次啟動,一個JobExecution被從庫中獲得,并且,執行的過程中,StepExecution和JobExecution實施方式是通過將它們傳遞到存儲庫持續。

使用 Java 配置時,@EnableBatchProcessing注解提供了一個 JobRepository作為開箱即用自動配置的組件之一。

2.5 JobLauncher

JobLauncher表示一個簡單的接口,用于Job使用給定的 集合 啟動JobParameters,如以下示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
          throws JobExecutionAlreadyRunningException, JobRestartException,
                 JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

期望實現JobExecution從 中 獲得有效JobRepository并執行Job。

2.6 Item Reader

ItemReader是一種抽象,表示一次檢索Step一個項目的輸入。當ItemReader用完它可以提供的項目時,它通過返回來表明這一點null。

2.7 Item Writer

ItemWriter是一種抽象,表示一次一個Step、一批或一大塊項目的輸出。通常, anItemWriter不知道它接下來應該接收的輸入,并且只知道在其當前調用中傳遞的項目。

2.8 Item Processor

ItemProcessor是表示項目的業務處理的抽象。當ItemReader讀取一個項目并ItemWriter寫入它們時,它 ItemProcessor提供了一個訪問點來轉換或應用其他業務處理。如果在處理該項目時確定該項目無效,則返回 null表示不應寫出該項目。

 

3 Spring Batch實戰

下面就利用我們所學的理論實現一個最簡單的Spring Batch批處理項目

3.1 依賴和項目結構以及配置文件

依賴

<!--Spring batch-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- web依賴-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <version>1.18.20</version>
</dependency>
<!--  mysql-->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.47</version>
</dependency>
<!--  mybatis-->
<dependency>
  <groupId>com.baomidou</groupId>
  <artifactId>mybatis-plus-boot-starter</artifactId>
  <version>3.2.0</version>
</dependency>

項目結構

Spring Batch輕量級批處理框架實戰

配置文件

server.port=9000
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=12345
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

3.2 代碼和數據表

數據表

CREATE TABLE `student` (
  `id` int(100) NOT NULL AUTO_INCREMENT,
  `name` varchar(45) DEFAULT NULL,
  `age` int(2) DEFAULT NULL,
  `address` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT

Student實體類

/**
* @desc: Student實體類
* @author: YanMingXin
* @create: 2021/10/15-12:17
**/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@ToString
@TableName("student")
public class Student {

  @TableId(value = "id", type = IdType.AUTO)
  private Long sId;

  @TableField("name")
  private String sName;

  @TableField("age")
  private Integer sAge;

  @TableField("address")
  private String sAddress;

}

Mapper層

/**
* @desc: Mapper層
* @author: YanMingXin
* @create: 2021/10/15-12:17
**/
@Mapper
@Repository
public interface StudentDao extends BaseMapper<Student> {
}

模擬數據庫(文件)中讀取類

/**
* @desc: 模擬數據庫中讀取
* @author: YanMingXin
* @create: 2021/10/16-10:13
**/
public class StudentVirtualDao {

  /**
   * 模擬從數據庫中讀取
   *
   * @return
   */
  public List<Student> getStudents() {
      ArrayList<Student> students = new ArrayList<>();
      students.add(new Student(1L, "zs", 23, "Beijing"));
      students.add(new Student(2L, "ls", 23, "Beijing"));
      students.add(new Student(3L, "ww", 23, "Beijing"));
      students.add(new Student(4L, "zl", 23, "Beijing"));
      students.add(new Student(5L, "mq", 23, "Beijing"));
      students.add(new Student(6L, "gb", 23, "Beijing"));
      students.add(new Student(7L, "lj", 23, "Beijing"));
      students.add(new Student(8L, "ss", 23, "Beijing"));
      students.add(new Student(9L, "zsdd", 23, "Beijing"));
      students.add(new Student(10L, "zss", 23, "Beijing"));
      return students;
  }
}

Service層接口

/**
* @desc:
* @author: YanMingXin
* @create: 2021/10/15-12:16
**/
public interface StudentService {

  List<Student> selectStudentsFromDB();

  void insertStudent(Student student);
}

Service層實現類

/**
* @desc: Service層實現類
* @author: YanMingXin
* @create: 2021/10/15-12:16
**/
@Service
public class StudentServiceImpl implements StudentService {

  @Autowired
  private StudentDao studentDao;

  @Override
  public List<Student> selectStudentsFromDB() {
      return studentDao.selectList(null);
  }

  @Override
  public void insertStudent(Student student) {
      studentDao.insert(student);
  }
}

最核心的配置類BatchConfiguration

/**
* @desc: BatchConfiguration
* @author: YanMingXin
* @create: 2021/10/15-12:25
**/
@Configuration
@EnableBatchProcessing
@SuppressWarnings("all")
public class BatchConfiguration {

  /**
   * 注入JobBuilderFactory
   */
  @Autowired
  public JobBuilderFactory jobBuilderFactory;

  /**
   * 注入StepBuilderFactory
   */
  @Autowired
  public StepBuilderFactory stepBuilderFactory;

  /**
   * 注入JobRepository
   */
  @Autowired
  public JobRepository jobRepository;

  /**
   * 注入JobLauncher
   */
  @Autowired
  private JobLauncher jobLauncher;

  /**
   * 注入自定義StudentService
   */
  @Autowired
  private StudentService studentService;

  /**
   * 注入自定義job
   */
  @Autowired
  private Job studentJob;

  /**
   * 封裝writer bean
   *
   * @return
   */
  @Bean
  public ItemWriter<Student> writer() {
      ItemWriter<Student> writer = new ItemWriter() {
          @Override
          public void write(List list) throws Exception {
              //debug發現是嵌套的List reader的線程List嵌套真正的List
              list.forEach((stu) -> {
                  for (Student student : (ArrayList<Student>) stu) {
                      studentService.insertStudent(student);
                  }
              });
          }
      };
      return writer;
  }

  /**
   * 封裝reader bean
   *
   * @return
   */
  @Bean
  public ItemReader<Student> reader() {
      ItemReader<Student> reader = new ItemReader() {
          @Override
          public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
              //模擬數據獲取
              StudentVirtualDao virtualDao = new StudentVirtualDao();
              return virtualDao.getStudents();
          }
      };
      return reader;
  }

  /**
   * 封裝processor bean
   *
   * @return
   */
  @Bean
  public ItemProcessor processor() {
      ItemProcessor processor = new ItemProcessor() {
          @Override
          public Object process(Object o) throws Exception {
              //debug發現o就是reader單次單線程讀取的數據
              return o;
          }
      };
      return processor;
  }

  /**
   * 封裝自定義step
   *
   * @return
   */
  @Bean
  public Step studentStepOne() {
      return stepBuilderFactory.get("studentStepOne")
          .chunk(1)
          .reader(reader()) //加入reader
          .processor(processor())  //加入processor
          .writer(writer())//加入writer
          .build();
  }

  /**
   * 封裝自定義job
   *
   * @return
   */
  @Bean
  public Job studentJob() {
      return jobBuilderFactory.get("studentJob")
          .flow(studentStepOne())//加入step
          .end()
          .build();
  }


  /**
   * 使用spring 定時任務執行
   */
  @Scheduled(fixedRate = 5000)
  public void printMessage() {
      try {
          JobParameters jobParameters = new JobParametersBuilder()
              .addLong("time", System.currentTimeMillis())
              .toJobParameters();
          jobLauncher.run(studentJob, jobParameters);
      } catch (Exception e) {
          e.printStackTrace();
      }
  }
}

3.3 測試

Spring Batch輕量級批處理框架實戰

項目啟動1s之后

Spring Batch輕量級批處理框架實戰

看數據庫,除了我們實體類定義的表以外多出來這么多表,這些表都是spring batch自帶的記錄日志和錯誤的表,具體的字段含義的有待研究

Spring Batch輕量級批處理框架實戰

 

4 實戰后的總結

Spring Batch有非常快的寫入和讀取速度,但是帶來的影響就是非常耗費內存和數據庫連接池的資源如果使用不好的話還會發生異常,因此我們要進行正確的配置,接下來我們進行簡單的源碼探究:

4.1 JobBuilderFactory

job的獲取使用了簡單工廠模式和建造者模式JobBuilderFactory獲取JobBuilder在經過配置返回一個job對象的實例,該實例就是Spring Batch中最頂級的組件,包含了n和step

public class JobBuilderFactory {

 private JobRepository jobRepository;

 public JobBuilderFactory(JobRepository jobRepository) {
    this.jobRepository = jobRepository;
 }
 //返回JobBuilder
 public JobBuilder get(String name) {
    JobBuilder builder = new JobBuilder(name).repository(jobRepository);
    return builder;
 }
}

jobBuilder類

public class JobBuilder extends JobBuilderHelper<JobBuilder> {

 /**
  * 為指定名稱的作業創建一個新的構建器
  */
 public JobBuilder(String name) {
    super(name);
 }

 /**
  * 創建將執行步驟或步驟序列的新作業構建器。
  */
 public SimpleJobBuilder start(Step step) {
    return new SimpleJobBuilder(this).start(step);
 }

 /**
  * 創建將執行流的新作業構建器。
  */
 public JobFlowBuilder start(Flow flow) {
    return new FlowJobBuilder(this).start(flow);
 }

 /**
  * 創建將執行步驟或步驟序列的新作業構建器
  */
 public JobFlowBuilder flow(Step step) {
    return new FlowJobBuilder(this).start(step);
 }
}

4.2 StepBuilderFactory

直接看StepBuilder類

public class StepBuilder extends StepBuilderHelper<StepBuilder> {

 public StepBuilder(String name) {
    super(name);
 }

 /**
  * 用自定義微線程構建步驟,不一定是項處理。  
  */
 public TaskletStepBuilder tasklet(Tasklet tasklet) {
    return new TaskletStepBuilder(this).tasklet(tasklet);
 }

 /**
  * 構建一個步驟,按照提供的大小以塊的形式處理項。為了將這一步擴展到容錯,
  * 在構建器上調用SimpleStepBuilder的 faultolerant()方法。
  * @param <I> 輸入類型
  * @param <O> 輸出類型
  */
 public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
    return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
 }

 public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
    return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
 }

 public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
    return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
 }

 public PartitionStepBuilder partitioner(Step step) {
    return new PartitionStepBuilder(this).step(step);
 }

 public JobStepBuilder job(Job job) {
    return new JobStepBuilder(this).job(job);
 }

 /**
  * 創建將執行流的新步驟構建器。
  */
 public FlowStepBuilder flow(Flow flow) {
    return new FlowStepBuilder(this).flow(flow);
 }
}

 

參考文檔:

https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/index.html

https://www.jdon.com/springbatch.html

到此這篇關于Spring Batch輕量級批處理框架實戰的文章就介紹到這了,更多相關Spring Batch批處理內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://blog.csdn.net/Mr_YanMingXin/article/details/120798271

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 国产在线看一区 | 欧美日韩精品一区二区三区在线观看 | 成年人黄色免费网站 | 女18一级大黄毛片免费女人 | 亚洲第一成人在线视频 | 欧美一级黄视频 | 在线观看第一区 | 国产精品一二区 | 国产 一区 精品 | 久久草在线看 | 曰批全过程40分钟免费视频多人 | 精品国产一二区 | 成人在线观看免费高清 | 国产91九色 | 男人久久天堂 | 韩国精品久久久 | 毛片免费视频观看 | 免费在线观看国产精品 | 在线免费观看毛片视频 | 夜间福利网站 | 中文字幕一区二区三区久久 | www噜噜偷拍在线视频 | 欧美一级黄色录相 | 久久97超碰| 黑人日比| 日操操夜操操 | 久久免费视频5 | 久久精品视频5 | 日韩视| wwwxxx视频| 成人午夜在线免费观看 | aa国产视频一区二区 | 91精品国产九九九久久久亚洲 | 国内精品久久久久久久星辰影视 | 成年免费观看视频 | 亚洲国产精品二区 | 久久影院一区二区三区 | 亚洲成人入口 | 日日操操 | 中文黄色一级片 | av在线免费看片 |