我们如何在Spring批处理作业的不同步骤之间共享数据?
深入研究Spring批处理,我想知道如何在Job的不同步骤之间共享数据?
我们可以使用JobRepository吗? 如果是的话,我们该怎么做?
还有没有其他的方式来做到这一点?
作业存储库间接用于在步骤之间传递数据(Jean-Philippe是正确的,最好的方法是将数据放入StepExecutionContext
,然后使用名称StepExecutionContext
ExecutionContextPromotionListener
将步骤执行上下文关键字提升到JobExecutionContext
。
注意有一个监听器可以将JobParameter
键提升到StepExecutionContext
(更加详细的称为JobParameterExecutionContextCopyListener
)。 如果你的工作步骤不是完全独立的,你会发现你使用了很多。
否则,您将不再使用更精细的scheme在步骤之间传递数据,如JMS队列或(天堂禁止)硬编码文件位置。
至于在上下文中传递的数据的大小,我还build议你保持它的小(但我没有任何具体的
从一个步骤,你可以把数据放入StepExecutionContext
。 然后,通过监听器,可以将数据从StepExecutionContext
提升到StepExecutionContext
。
这个JobExecutionContext
在以下所有步骤中都可用。
beccareful:数据必须很短。 这些上下文通过序列化保存在JobRepository
,长度是有限的(如果我记得的话,这个长度是2500个字符)。
所以这些上下文很好地共享string或简单的值,但不是共享集合或大量的数据。
共享大量的数据不是Spring Batch的哲学。 Spring Batch是一组独特的操作,而不是一个巨大的业务处理单元。
我会说你有3个select:
- 使用
StepContext
并将其提升到JobContext
并且您可以从每个步骤访问它,您必须遵守规定的限制 - 创build
@JobScope
bean并将数据添加到该bean,@@Autowire
它在需要和使用它(缺点是它是内存中的结构,如果作业失败数据丢失,Migh导致可重启的问题) - 我们需要跨越多个步骤来处理更大的数据集(读取csv中的每一行并写入数据库,读取数据库,聚合并发送到API),因此我们决定在与新增批量元表相同的数据库中对新表进行build模,在
JobContext
中JobContext
ids
并在需要时进行访问,并在作业成功完成时删除该临时表。
您可以使用Java Bean对象
- 执行一个步骤
- 将结果存储在Java对象中
- 下一步将引用相同的java对象来获得第一步存储的结果
这样你就可以存储大量的数据
这是我所做的保存一个可以通过步骤访问的对象。
- 创build一个监听器来设置作业上下文中的对象
@Component("myJobListener") public class MyJobListener implements JobExecutionListener { public void beforeJob(JobExecution jobExecution) { String myValue = someService.getValue(); jobExecution.getExecutionContext().putString("MY_VALUE", myValue); } }
- 在作业上下文中定义了监听器
<listeners> <listener ref="myJobListener"/> </listeners>
- 使用BeforeStep注释消耗步骤中的值
@BeforeStep public void initializeValues(StepExecution stepExecution) { String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE"); }
我被赋予了一个一个地调用批处理作业的任务。每个作业都依赖于另一个作业。 第一份工作结果需要执行后续的工作计划。 我正在search如何在作业执行后传递数据。 我发现这个ExecutionContextPromotionListener派上用场。
1)我已经添加了一个“ExecutionContextPromotionListener”像下面的bean
@Bean public ExecutionContextPromotionListener promotionListener() { ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener(); listener.setKeys( new String[] { "entityRef" } ); return listener; }
2)然后我把一个听众连接到我的步骤
Step step = builder.faultTolerant() .skipPolicy( policy ) .listener( writer ) .listener( promotionListener() ) .listener( skiplistener ) .stream( skiplistener ) .build();
3)我在Writer步骤实现中添加了stepExecution作为参考,并在Beforestep中填充
@BeforeStep public void saveStepExecution( StepExecution stepExecution ) { this.stepExecution = stepExecution; }
4)在我的作家步骤结束时,我将这些值填充在stepexecution中,如下面的键
lStepContext.put( "entityRef", lMap );
5)作业执行后,我从lExecution.getExecutionContext()
检索值并填充为作业响应。
6)从工作响应对象中,我将得到值并在剩余的工作中填充所需的值。
上面的代码是使用ExecutionContextPromotionListener将步骤中的数据提升到ExecutionContext。 它可以在任何步骤中完成。
使用`ExecutionContextPromotionListener。
public class YourItemWriter implements ItemWriter<Object> { private StepExecution stepExecution; public void write(List<? extends Object> items) throws Exception { // Some Business Logic // put your data into stepexecution context ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put("someKey", someObject); } @BeforeStep public void saveStepExecution(Final StepExecution stepExecution) { this.stepExecution = stepExecution; } }
现在您需要将promotionListener添加到您的工作中
@Bean public Step step1() { return stepBuilder .get("step1")<Company,Company> chunk(10) .reader(reader()).processor(processor()).writer(writer()) .listener(promotionListner()).build(); }
@Bean public ExecutionContextPromotionListener promotionListner(){ExecutionContextPromotionListener listner = new ExecutionContextPromotionListener(); listner.setKeys(new String [] {“someKey”}); listner.setStrict(真); 退货清单 }
现在,在第二步从作业ExecutionContext获取你的数据
public class RetrievingItemWriter implements ItemWriter<Object> { private Object someObject; public void write(List<? extends Object> items) throws Exception { // ... } @BeforeStep public void retrieveInterstepData(StepExecution stepExecution) { JobExecution jobExecution = stepExecution.getJobExecution(); ExecutionContext jobContext = jobExecution.getExecutionContext(); this.someObject = jobContext.get("someKey"); } }
如果你正在使用tasklets,那么使用下面的命令来获取或放置ExecutionContext
List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");