詳解批處理框架之Spring Batch
Spring Batch的分層架構圖如下:
可以看到它分為三層,分別是:
Application應用層:包含了所有任務batch jobs和開發人員自定義的代碼,主要是根據項目需要開發的業務流程等。 Batch Core核心層:包含啟動和管理任務的運行環境類,如JobLauncher等。 Batch Infrastructure基礎層:上面兩層是建立在基礎層之上的,包含基礎的讀入reader和寫出writer、重試框架等。1.2、關鍵概念理解下圖所涉及的概念至關重要,不然很難進行后續開發和問題分析。
專門負責與數據庫打交道,對整個批處理的新增、更新、執行進行記錄。所以Spring Batch是需要依賴數據庫來管理的。
1.2.2、任務啟動器JobLauncher負責啟動任務Job。
1.2.3、任務JobJob是封裝整個批處理過程的單位,跑一個批處理任務,就是跑一個Job所定義的內容。
上圖介紹了Job的一些相關概念:
Job:封裝處理實體,定義過程邏輯。 JobInstance:Job的運行實例,不同的實例,參數不同,所以定義好一個Job后可以通過不同參數運行多次。 JobParameters:與JobInstance相關聯的參數。 JobExecution:代表Job的一次實際執行,可能成功、可能失敗。所以,開發人員要做的事情,就是定義Job。
1.2.4、步驟StepStep是對Job某個過程的封裝,一個Job可以包含一個或多個Step,一步步的Step按特定邏輯執行,才代表Job執行完成。
通過定義Step來組裝Job可以更靈活地實現復雜的業務邏輯。
1.2.5、輸入——處理——輸出所以,定義一個Job關鍵是定義好一個或多個Step,然后把它們組裝好即可。而定義Step有多種方法,但有一種常用的模型就是輸入——處理——輸出,即Item Reader、Item Processor和Item Writer。比如通過Item Reader從文件輸入數據,然后通過Item Processor進行業務處理和數據轉換,最后通過Item Writer寫到數據庫中去。
Spring Batch為我們提供了許多開箱即用的Reader和Writer,非常方便。
二、代碼實例理解了基本概念后,就直接通過代碼來感受一下吧。整個項目的功能是從多個csv文件中讀數據,處理后輸出到一個csv文件。
2.1、基本框架添加依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope></dependency>
需要添加Spring Batch的依賴,同時使用H2作為內存數據庫比較方便,實際生產肯定是要使用外部的數據庫,如Oracle、PostgreSQL。
入口主類:
@SpringBootApplication@EnableBatchProcessingpublic class PkslowBatchJobMain { public static void main(String[] args) {SpringApplication.run(PkslowBatchJobMain.class, args); }}
也很簡單,只是在Springboot的基礎上添加注解@EnableBatchProcessing。
領域實體類Employee:
package com.pkslow.batch.entity;public class Employee { String id; String firstName; String lastName;}
對應的csv文件內容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
2.2、輸入——處理——輸出2.2.1、讀取ItemReader因為有多個輸入文件,所以定義如下:
@Value('input/inputData*.csv')private Resource[] inputResources;@Beanpublic MultiResourceItemReader<Employee> multiResourceItemReader(){ MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader;}@Beanpublic FlatFileItemReader<Employee> reader(){ FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳過csv文件第一行,為表頭 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() {{ //字段名 setNames(new String[] { 'id', 'firstName', 'lastName' });} }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() {{ //轉換化后的目標類 setTargetType(Employee.class);} }); } }); return reader;}
這里使用了FlatFileItemReader,方便我們從文件讀取數據。
2.2.2、處理ItemProcessor為了簡單演示,處理很簡單,就是把最后一列轉為大寫:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; };}2.2.3、輸出ItremWriter
比較簡單,代碼及注釋如下:
private Resource outputResource = new FileSystemResource('output/outputData.csv');@Beanpublic FlatFileItemWriter<Employee> writer(){ FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否為追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //設置分割符 setDelimiter(','); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() {{ //設置字段 setNames(new String[] { 'id', 'firstName', 'lastName' });} }); } }); return writer;}2.3、Step
有了Reader-Processor-Writer后,就可以定義Step了:
@Beanpublic Step csvStep() { return stepBuilderFactory.get('csvStep').<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build();}
這里有一個chunk的設置,值為5,意思是5條記錄后再提交輸出,可以根據自己需求定義。
2.4、Job完成了Step的編碼,定義Job就容易了:
@Beanpublic Job pkslowCsvJob() { return jobBuilderFactory .get('pkslowCsvJob') .incrementer(new RunIdIncrementer()) .start(csvStep()) .build();}2.5、運行
完成以上編碼后,執行程序,結果如下:
成功讀取數據,并將最后字段轉為大寫,并輸出到outputData.csv文件。
三、監聽Listener可以通過Listener接口對特定事件進行監聽,以實現更多業務功能。比如如果處理失敗,就記錄一條失敗日志;處理完成,就通知下游拿數據等。
我們分別對Read、Process和Write事件進行監聽,對應分別要實現ItemReadListener接口、ItemProcessListener接口和ItemWriteListener接口。因為代碼比較簡單,就是打印一下日志,這里只貼出ItemWriteListener的實現代碼:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) {logger.info('beforeWrite: ' + list); } @Override public void afterWrite(List<? extends Employee> list) {logger.info('afterWrite: ' + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) {logger.info('onWriteError: ' + list); }}
把實現的監聽器listener整合到Step中去:
@Beanpublic Step csvStep() { return stepBuilderFactory.get('csvStep').<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build();}
執行后看一下日志:
這里就能明顯看到之前設置的chunk的作用了。Writer每次是處理5條記錄,如果一條輸出一次,會對IO造成壓力。
以上就是詳解Spring Batch入門之優秀的批處理框架的詳細內容,更多關于Spring Batch 批處理框架的資料請關注好吧啦網其它相關文章!
相關文章:
1. ASP中常用的22個FSO文件操作函數整理2. 無線標記語言(WML)基礎之WMLScript 基礎第1/2頁3. ASP調用WebService轉化成JSON數據,附json.min.asp4. .Net core 的熱插拔機制的深入探索及卸載問題求救指南5. SharePoint Server 2019新特性介紹6. html清除浮動的6種方法示例7. 讀大數據量的XML文件的讀取問題8. ASP.NET Core 5.0中的Host.CreateDefaultBuilder執行過程解析9. React+umi+typeScript創建項目的過程10. Vue+elementUI下拉框自定義顏色選擇器方式
