我们如何在Spring批处理作业的不同步骤之间共享数据?

深入研究Spring批处理,我想知道如何在Job的不同步骤之间共享数据?

我们可以使用JobRepository吗? 如果是的话,我们该怎么做?

还有没有其他的方式来做到这一点?

作业存储库间接用于在步骤之间传递数据(Jean-Philippe是正确的,最好的方法是将数据放入StepExecutionContext ,然后使用名称StepExecutionContext ExecutionContextPromotionListener将步骤执行上下文关键字提升到JobExecutionContext

注意有一个监听器可以将JobParameter键提升到StepExecutionContext (更加详细的称为JobParameterExecutionContextCopyListener )。 如果你的工作步骤不是完全独立的,你会发现你使用了很多。

否则,您将不再使用更精细的scheme在步骤之间传递数据,如JMS队列或(天堂禁止)硬编码文件位置。

至于在上下文中传递的数据的大小,我还build议你保持它的小(但我没有任何具体的

从一个步骤,你可以把数据放入StepExecutionContext 。 然后,通过监听器,可以将数据从StepExecutionContext提升到StepExecutionContext

这个JobExecutionContext在以下所有步骤中都可用。

beccareful:数据必须很短。 这些上下文通过序列化保存在JobRepository ,长度是有限的(如果我记得的话,这个长度是2500个字符)。

所以这些上下文很好地共享string或简单的值,但不是共享集合或大量的数据。

共享大量的数据不是Spring Batch的哲学。 Spring Batch是一组独特的操作,而不是一个巨大的业务处理单元。

我会说你有3个select:

  1. 使用StepContext并将其提升到JobContext并且您可以从每个步骤访问它,您必须遵守规定的限制
  2. 创build@JobScope bean并将数据添加到该bean,@ @Autowire它在需要和使用它(缺点是它是内存中的结构,如果作业失败数据丢失,Migh导致可重启的问题)
  3. 我们需要跨越多个步骤来处理更大的数据集(读取csv中的每一行并写入数据库,读取数据库,聚合并发送到API),因此我们决定在与新增批量元表相同的数据库中对新表进行build模,在JobContextJobContext ids并在需要时进行访问,并在作业成功完成时删除该临时表。

您可以使用Java Bean对象

  1. 执行一个步骤
  2. 将结果存储在Java对象中
  3. 下一步将引用相同的java对象来获得第一步存储的结果

这样你就可以存储大量的数据

这是我所做的保存一个可以通过步骤访问的对象。

  1. 创build一个监听器来设置作业上下文中的对象
 @Component("myJobListener") public class MyJobListener implements JobExecutionListener { public void beforeJob(JobExecution jobExecution) { String myValue = someService.getValue(); jobExecution.getExecutionContext().putString("MY_VALUE", myValue); } } 
  1. 在作业上下文中定义了监听器
 <listeners> <listener ref="myJobListener"/> </listeners> 
  1. 使用BeforeStep注释消耗步骤中的值
 @BeforeStep public void initializeValues(StepExecution stepExecution) { String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE"); } 

我被赋予了一个一个地调用批处理作业的任务。每个作业都依赖于另一个作业。 第一份工作结果需要执行后续的工作计划。 我正在search如何在作业执行后传递数据。 我发现这个ExecutionContextPromotionListener派上用场。

1)我已经添加了一个“ExecutionContextPromotionListener”像下面的bean

 @Bean public ExecutionContextPromotionListener promotionListener() { ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener(); listener.setKeys( new String[] { "entityRef" } ); return listener; } 

2)然后我把一个听众连接到我的步骤

 Step step = builder.faultTolerant() .skipPolicy( policy ) .listener( writer ) .listener( promotionListener() ) .listener( skiplistener ) .stream( skiplistener ) .build(); 

3)我在Writer步骤实现中添加了stepExecution作为参考,并在Beforestep中填充

 @BeforeStep public void saveStepExecution( StepExecution stepExecution ) { this.stepExecution = stepExecution; } 

4)在我的作家步骤结束时,我将这些值填充在stepexecution中,如下面的键

 lStepContext.put( "entityRef", lMap ); 

5)作业执行后,我从lExecution.getExecutionContext()检索值并填充为作业响应。

6)从工作响应对象中,我将得到值并在剩余的工作中填充所需的值。

上面的代码是使用ExecutionContextPromotionListener将步骤中的数据提升到ExecutionContext。 它可以在任何步骤中完成。

使用`ExecutionContextPromotionListener。

 public class YourItemWriter implements ItemWriter<Object> { private StepExecution stepExecution; public void write(List<? extends Object> items) throws Exception { // Some Business Logic // put your data into stepexecution context ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put("someKey", someObject); } @BeforeStep public void saveStepExecution(Final StepExecution stepExecution) { this.stepExecution = stepExecution; } } 

现在您需要将promotionListener添加到您的工作中

 @Bean public Step step1() { return stepBuilder .get("step1")<Company,Company> chunk(10) .reader(reader()).processor(processor()).writer(writer()) .listener(promotionListner()).build(); } 

@Bean public ExecutionContextPromotionListener promotionListner(){ExecutionContextPromotionListener listner = new ExecutionContextPromotionListener(); listner.setKeys(new String [] {“someKey”}); listner.setStrict(真); 退货清单 }

现在,在第二步从作业ExecutionContext获取你的数据

 public class RetrievingItemWriter implements ItemWriter<Object> { private Object someObject; public void write(List<? extends Object> items) throws Exception { // ... } @BeforeStep public void retrieveInterstepData(StepExecution stepExecution) { JobExecution jobExecution = stepExecution.getJobExecution(); ExecutionContext jobContext = jobExecution.getExecutionContext(); this.someObject = jobContext.get("someKey"); } } 

如果你正在使用tasklets,那么使用下面的命令来获取或放置ExecutionContext

 List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");