配置SpringBatch批处理失败重试

 1. 引言

创新互联建站主要从事成都网站制作、做网站、网页设计、企业做网站、公司建网站等业务。立足成都服务宜黄,十载网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18980820575

默认情况下,Spring批处理作业在执行过程中出现任何错误都会失败。然而有些时候,为了提高应用程序的弹性,我们就需要处理这类间歇性的故障。在这篇短文中,我们就来一起探讨 如何在Spring批处理框架中配置重试逻辑。

如果对spring batch不了解,可以参考以前的一篇文章:

开车!Spring Batch 入门级示例教程!

2. 简单举例

假设有一个批处理作业,它读取一个CSV文件作为输入:

 
 
 
 
  1. username, userid, transaction_date, transaction_amount 
  2. sammy, 1234, 31/10/2015, 10000 
  3. john, 9999, 3/12/2015, 12321 

然后,它通过访问REST端点来处理每条记录,获取用户的 age 和 postCode 属性:

 
 
 
 
  1. public class RetryItemProcessor implements ItemProcessor { 
  2.      
  3.     @Override 
  4.     public Transaction process(Transaction transaction) throws IOException { 
  5.         log.info("RetryItemProcessor, attempting to process: {}", transaction); 
  6.         HttpResponse response = fetchMoreUserDetails(transaction.getUserId()); 
  7.         //parse user's age and postCode from response and update transaction 
  8.         ... 
  9.         return transaction; 
  10.     } 
  11.     ... 

最后,它生成并输出一个合并的XML:

 
 
 
 
  1.  
  2.      
  3.         10000.0 
  4.         2015-10-31 00:00:00 
  5.         1234 
  6.         sammy 
  7.         10 
  8.         430222 
  9.      
  10.     ... 
  11.  

3. ItemProcessor 中添加重试

现在假设,如果到REST端点的连接由于某些网络速度慢而超时,该怎么办?如果发生这种情况,则我们的批处理工作将失败。

在这种情况下,我们希望失败的 item 处理重试几次。因此,接下来我将批处理作业配置为:在出现故障时执行最多三次重试:

 
 
 
 
  1. @Bean 
  2. public Step retryStep( 
  3.   ItemProcessor processor, 
  4.   ItemWriter writer) throws ParseException { 
  5.     return stepBuilderFactory 
  6.       .get("retryStep") 
  7.       .chunk(10) 
  8.       .reader(itemReader(inputCsv)) 
  9.       .processor(processor) 
  10.       .writer(writer) 
  11.       .faultTolerant() 
  12.       .retryLimit(3) 
  13.       .retry(ConnectTimeoutException.class) 
  14.       .retry(DeadlockLoserDataAccessException.class) 
  15.       .build(); 

这里调用了 faultTolerant() 来启用重试功能。另外,我们使用 retry 和 retryLimit 分别定义符合重试条件的异常和 item 的最大重试次数。

4. 测试重试次数

假设我们有一个测试场景,其中返回 age 和 postCode 的REST端点关闭了一段时间。在这个测试场景中,我们只对前两个 API 调用获取一个 ConnectTimeoutException ,而第三个调用将成功:

 
 
 
 
  1. @Test 
  2. public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception { 
  3.     FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT); 
  4.     FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT); 
  5.  
  6.     when(httpResponse.getEntity()) 
  7.       .thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }")); 
  8.   
  9.     //fails for first two calls and passes third time onwards 
  10.     when(httpClient.execute(any())) 
  11.       .thenThrow(new ConnectTimeoutException("Timeout count 1")) 
  12.       .thenThrow(new ConnectTimeoutException("Timeout count 2")) 
  13.       .thenReturn(httpResponse); 
  14.  
  15.     JobExecution jobExecution = jobLauncherTestUtils 
  16.       .launchJob(defaultJobParameters()); 
  17.     JobInstance actualJobInstance = jobExecution.getJobInstance(); 
  18.     ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); 
  19.  
  20.     assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); 
  21.     assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED")); 
  22.     AssertFile.assertFileEquals(expectedResult, actualResult); 

在这里,我们的工作成功地完成了。另外,从日志中可以明显看出 第一条记录 id=1234 失败了两次,最后在第三次重试时成功了:

 
 
 
 
  1. 19:06:57.742 [main] INFO  o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep] 
  2. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 
  3. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 
  4. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234 
  5. 19:06:57.758 [main] INFO  o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999 
  6. 19:06:57.773 [main] INFO  o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms 

同样,看下另一个测试用例,当所有重试次数都用完时会发生什么:

 
 
 
 
  1. @Test 
  2. public void whenEndpointAlwaysFail_thenJobFails() throws Exception { 
  3.     when(httpClient.execute(any())) 
  4.       .thenThrow(new ConnectTimeoutException("Endpoint is down")); 
  5.  
  6.     JobExecution jobExecution = jobLauncherTestUtils 
  7.       .launchJob(defaultJobParameters()); 
  8.     JobInstance actualJobInstance = jobExecution.getJobInstance(); 
  9.     ExitStatus actualJobExitStatus = jobExecution.getExitStatus(); 
  10.  
  11.     assertThat(actualJobInstance.getJobName(), is("retryBatchJob")); 
  12.     assertThat(actualJobExitStatus.getExitCode(), is("FAILED")); 
  13.     assertThat(actualJobExitStatus.getExitDescription(), 
  14.       containsString("org.apache.http.conn.ConnectTimeoutException")); 

在这个测试用例中,在作业因 ConnectTimeoutException 而失败之前,会尝试对第一条记录重试三次。

5. 使用XML配置重试

最后,让我们看一下与上述配置等价的XML:

 
 
 
 
  1.  
  2.      
  3.          
  4.             
  5.               processor="retryItemProcessor" commit-interval="10" 
  6.               retry-limit="3"> 
  7.                  
  8.                      
  9.                      
  10.                  
  11.              
  12.          
  13.      
  14.  

6. 简单总结

在本文中,我们学习了如何在Spring批处理中配置重试逻辑,其中包括使用Java和XML配置。以及使用单元测试来观察重试在实践中是如何工作的。

本文转载自微信公众号「锅外的大佬」,可以通过以下二维码关注。转载本文请联系锅外的大佬公众号。

网站名称:配置SpringBatch批处理失败重试
URL标题:http://www.shufengxianlan.com/qtweb/news36/539786.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联