Skip to content

Revisit the chunk-oriented processing model implementation #3950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
3 tasks
fmbenhassine opened this issue Jun 25, 2021 · 4 comments
Open
3 tasks

Revisit the chunk-oriented processing model implementation #3950

fmbenhassine opened this issue Jun 25, 2021 · 4 comments

Comments

@fmbenhassine
Copy link
Contributor

fmbenhassine commented Jun 25, 2021

Problem statement

The current implementation of the chunk-oriented processing model works in most cases, but has several issues related to transaction management and fault-tolerance [1] as well as concurrency [2].

Transaction management and fault-tolerance

I believe the main reason for that is the current structure of the code that uses multiple nested repeat/retry/transaction callbacks as summarized in #1189 (comment). When unfolded, the code looks like the following (this is a copy/paste of the current code, but unfolded):

// TaskletStep#doExecute -> ChunkOrientedTasklet#execute -> (ChunkProvider#provide + ChunkProcessor#process)

protected void doExecute(StepExecution stepExecution) throws Exception {
	stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
	stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

	stream.update(stepExecution.getExecutionContext());
	getJobRepository().updateExecutionContext(stepExecution);

	// Shared semaphore per step execution, so other step executions can run
	// in parallel without needing the lock
	final Semaphore semaphore = createSemaphore();

	stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

		@Override
		public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
				throws Exception {

			StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

			// Before starting a new transaction, check for
			// interruption.
			interruptionPolicy.checkInterrupted(stepExecution);

			RepeatStatus result;
			try {
				TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager, transactionAttribute);
				//result = transactionTemplate.execute(new ChunkTransactionCallback(chunkContext, semaphore));
				// begin transactionTemplate.execute
				TransactionSynchronizationManager.registerSynchronization(this);

				RepeatStatus result = RepeatStatus.CONTINUABLE;

				StepContribution contribution = stepExecution.createStepContribution();

				chunkListener.beforeChunk(chunkContext);

				// In case we need to push it back to its old value
				// after a commit fails...
				oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
				copy(stepExecution, oldVersion);

				try {

					try {
						try {
							result = null; // tasklet.execute(contribution, chunkContext);
							// begin tasklet.execute
							Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
							if (inputs == null) {
								inputs = chunkProvider.provide(contribution);
								// begin chunkProvider.provide
								final Chunk<I> inputs = new Chunk<>();
								repeatOperations.iterate(new RepeatCallback() {

									@Override
									public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
										I item = null;
										Timer.Sample sample = Timer.start(Metrics.globalRegistry);
										String status = BatchMetrics.STATUS_SUCCESS;
										try {
											item = read(contribution, inputs);
										}
										catch (SkipOverflowException e) {
											// read() tells us about an excess of skips by throwing an
											// exception
											status = BatchMetrics.STATUS_FAILURE;
											return RepeatStatus.FINISHED;
										}
										finally {
											stopTimer(sample, contribution.getStepExecution(), status);
										}
										if (item == null) {
											inputs.setEnd();
											return RepeatStatus.FINISHED;
										}
										inputs.add(item);
										contribution.incrementReadCount();
										return RepeatStatus.CONTINUABLE;
									}

								});

								return inputs;
								// end chunkProvider.provide
								if (buffering) {
									chunkContext.setAttribute(INPUTS_KEY, inputs);
								}
							}

							//chunkProcessor.process(contribution, inputs);
							//chunkProvider.postProcess(contribution, inputs);
							// begin chunkProcessor.process
							
							// Process items
							
							Chunk<O> outputs = new Chunk<>();
							final FaultTolerantChunkProcessor.UserData<O> data = (FaultTolerantChunkProcessor.UserData<O>) inputs.getUserData();
							final Chunk<O> cache = data.getOutputs();
							final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<>(cache.getItems()).iterator();
							final AtomicInteger count = new AtomicInteger(0);

							// final int scanLimit = processorTransactional && data.scanning() ? 1 :
							// 0;

							for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {

								final I item = iterator.next();

								RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {

									@Override
									public O doWithRetry(RetryContext context) throws Exception {
										Timer.Sample sample = BatchMetrics.createTimerSample();
										String status = BatchMetrics.STATUS_SUCCESS;
										O output = null;
										try {
											count.incrementAndGet();
											O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
											if (cached != null && !processorTransactional) {
												output = cached;
											}
											else {
												output = doProcess(item);
												if (output == null) {
													data.incrementFilterCount();
												} else if (!processorTransactional && !data.scanning()) {
													cache.add(output);
												}
											}
										}
										catch (Exception e) {
											status = BatchMetrics.STATUS_FAILURE;
											if (rollbackClassifier.classify(e)) {
												// Default is to rollback unless the classifier
												// allows us to continue
												throw e;
											}
											else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
												// If we are not re-throwing then we should check if
												// this is skippable
												contribution.incrementProcessSkipCount();
												logger.debug("Skipping after failed process with no rollback", e);
												// If not re-throwing then the listener will not be
												// called in next chunk.
												callProcessSkipListener(item, e);
											}
											else {
												// If it's not skippable that's an error in
												// configuration - it doesn't make sense to not roll
												// back if we are also not allowed to skip
												throw new NonSkippableProcessException(
														"Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
														e);
											}
										}
										finally {
											stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
										}
										if (output == null) {
											// No need to re-process filtered items
											iterator.remove();
										}
										return output;
									}

								};

								RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {

									@Override
									public O recover(RetryContext context) throws Exception {
										Throwable e = context.getLastThrowable();
										if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
											iterator.remove(e);
											contribution.incrementProcessSkipCount();
											logger.debug("Skipping after failed process", e);
											return null;
										}
										else {
											if (rollbackClassifier.classify(e)) {
												// Default is to rollback unless the classifier
												// allows us to continue
												throw new RetryException("Non-skippable exception in recoverer while processing", e);
											}
											iterator.remove(e);
											return null;
										}
									}

								};

								O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
										getInputKey(item), rollbackClassifier));
								if (output != null) {
									outputs.add(output);
								}

								/*
								 * We only want to process the first item if there is a scan for a
								 * failed item.
								 */
								if (data.scanning()) {
									while (cacheIterator != null && cacheIterator.hasNext()) {
										outputs.add(cacheIterator.next());
									}
									// Only process the first item if scanning
									break;
								}
							}
							
							// Write items

							final FaultTolerantChunkProcessor.UserData<O> data = (FaultTolerantChunkProcessor.UserData<O>) inputs.getUserData();
							final AtomicReference<RetryContext> contextHolder = new AtomicReference<>();

							RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
								@Override
								public Object doWithRetry(RetryContext context) throws Exception {
									contextHolder.set(context);

									if (!data.scanning()) {
										chunkMonitor.setChunkSize(inputs.size());
										Timer.Sample sample = BatchMetrics.createTimerSample();
										String status = BatchMetrics.STATUS_SUCCESS;
										try {
											doWrite(outputs.getItems());
										}
										catch (Exception e) {
											status = BatchMetrics.STATUS_FAILURE;
											if (rollbackClassifier.classify(e)) {
												throw e;
											}
											/*
											 * If the exception is marked as no-rollback, we need to
											 * override that, otherwise there's no way to write the
											 * rest of the chunk or to honour the skip listener
											 * contract.
											 */
											throw new ForceRollbackForWriteSkipException(
													"Force rollback on skippable exception so that skipped item can be located.", e);
										}
										finally {
											stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
										}
										contribution.incrementWriteCount(outputs.size());
									}
									else {
										scan(contribution, inputs, outputs, chunkMonitor, false);
									}
									return null;

								}
							};

							if (!buffering) {

								RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {

									@Override
									public Object recover(RetryContext context) throws Exception {

										Throwable e = context.getLastThrowable();
										if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
											throw new RetryException("Invalid retry state during write caused by "
													+ "exception that does not classify for rollback: ", e);
										}

										Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
										for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {

											inputIterator.next();
											outputIterator.next();

											checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
											if (!rollbackClassifier.classify(e)) {
												throw new RetryException(
														"Invalid retry state during recovery caused by exception that does not classify for rollback: ",
														e);
											}

										}

										return null;

									}

								};

								batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
										BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));

							}
							else {

								RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {

									@Override
									public Object recover(RetryContext context) throws Exception {
										/*
										 * If the last exception was not skippable we don't need to
										 * do any scanning. We can just bomb out with a retry
										 * exhausted.
										 */
										if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
											throw new ExhaustedRetryException(
													"Retry exhausted after last attempt in recovery path, but exception is not skippable.",
													context.getLastThrowable());
										}

										inputs.setBusy(true);
										data.scanning(true);
										scan(contribution, inputs, outputs, chunkMonitor, true);
										return null;
									}

								};

								if (logger.isDebugEnabled()) {
									logger.debug("Attempting to write: " + inputs);
								}
								try {
									batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
											rollbackClassifier));
								}
								catch (Exception e) {
									RetryContext context = contextHolder.get();
									if (!batchRetryTemplate.canRetry(context)) {
										/*
										 * BATCH-1761: we need advance warning of the scan about to
										 * start in the next transaction, so we can change the
										 * processing behaviour.
										 */
										data.scanning(true);
									}
									throw e;
								}

							}

							callSkipListeners(inputs, outputs);
							
							// end chunkProcessor.process

							// Allow a message coming back from the processor to say that we
							// are not done yet
							if (inputs.isBusy()) {
								logger.debug("Inputs still busy");
								return RepeatStatus.CONTINUABLE;
							}

							chunkContext.removeAttribute(INPUTS_KEY);
							chunkContext.setComplete();

							if (logger.isDebugEnabled()) {
								logger.debug("Inputs not busy, ended: " + inputs.isEnd());
							}
							return RepeatStatus.continueIf(!inputs.isEnd());
							// end tasklet.execute
							if (result == null) {
								result = RepeatStatus.FINISHED;
							}
						}
						catch (Exception e) {
							if (transactionAttribute.rollbackOn(e)) {
								chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
								throw e;
							}
						}
					}
					finally {

						// If the step operations are asynchronous then we need
						// to synchronize changes to the step execution (at a
						// minimum). Take the lock *before* changing the step
						// execution.
						try {
							semaphore.acquire();
							locked = true;
						}
						catch (InterruptedException e) {
							logger.error("Thread interrupted while locking for repository update");
							stepExecution.setStatus(BatchStatus.STOPPED);
							stepExecution.setTerminateOnly();
							Thread.currentThread().interrupt();
						}

						// Apply the contribution to the step
						// even if unsuccessful
						if (logger.isDebugEnabled()) {
							logger.debug("Applying contribution: " + contribution);
						}
						stepExecution.apply(contribution);

					}

					stepExecutionUpdated = true;

					stream.update(stepExecution.getExecutionContext());

					try {
						// Going to attempt a commit. If it fails this flag will
						// stay false and we can use that later.
						getJobRepository().updateExecutionContext(stepExecution);
						stepExecution.incrementCommitCount();
						if (logger.isDebugEnabled()) {
							logger.debug("Saving step execution before commit: " + stepExecution);
						}
						getJobRepository().update(stepExecution);
					}
					catch (Exception e) {
						// If we get to here there was a problem saving the step
						// execution and we have to fail.
						String msg = "JobRepository failure forcing rollback";
						logger.error(msg, e);
						throw new FatalStepExecutionException(msg, e);
					}
				}
				catch (Error e) {
					if (logger.isDebugEnabled()) {
						logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
					}
					rollback(stepExecution);
					throw e;
				}
				catch (RuntimeException e) {
					if (logger.isDebugEnabled()) {
						logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
					}
					rollback(stepExecution);
					throw e;
				}
				catch (Exception e) {
					if (logger.isDebugEnabled()) {
						logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
					}
					rollback(stepExecution);
					// Allow checked exceptions
					throw new UncheckedTransactionException(e);
				}

				return result;

			 }
             // end transactionTemplate.execute
			}
			catch (UncheckedTransactionException e) {
				// Allow checked exceptions to be thrown inside callback
				throw (Exception) e.getCause();
			}

			chunkListener.afterChunk(chunkContext);

			// Check for interruption after transaction as well, so that
			// the interrupted exception is correctly propagated up to
			// caller
			interruptionPolicy.checkInterrupted(stepExecution);

			return result == null ? RepeatStatus.FINISHED : result;
		}

	});

}

This structure makes the implementation complex, hard to test and to think about, which limits the maintainability of the code in the long term.

Concurrency model

The current approach to concurrency with a "parallel iteration" concept (based on TaskExecutorRepeatTemplate, ResultHolderResultQueue, RepeatSynchronizationManager, TransactionSynchronizationManager, etc) is not friendly to concurrent executions as it requires a lot of state synchronization at different levels (Step level with a semaphore, chunk level with ThreadLocal for execution contexts, and item level with locks). This results in several issues like maxItemCount not being honored in a multi-threaded step, inconsistent state when a transaction is rolled-back leading to optimistic locking issues, throttling issues, poor performance, etc [2].

Possible solution

The goal here is to analyse and refactor the internal implementation of the chunk-oriented processing model with minimal changes to the current behaviour. I believe using a for loop (similar to the pseudo-code mentioned in the docs) coupled with an implementation of the producer-consumer pattern for concurrency would make the code easier to think about, test and maintain in the long term. As a side note, several open source, JSR-352 compliant implementations do use a for loop to implement this chunk-oriented model, so the current code structure and concurrency approach could be reviewed without compromising the correctness of the end result.

Execution plan suggestion

In order to minimize behavioural changes, this issue should be addressed as follows:

  • write a black-box test suite where assertions are only based on end results (no stubs, no mocks, no reflection-based assertions on internal state)
  • create a ChunkOrientedStep class that implements Step (ideally not based on Tasklet step). This class should not deal with any fault-tolerance features
  • create a FaultTolerantChunkOrientedStep class that extends ChunkOrientedStep to add fault-tolerance features

It would not be possible to change the current builders to accept the old and new implementations at the same time. Therefore, we would need to introduce the new implementation as experimental in v5 and phase the old implementation out over time. The new implementation can be registered directly in job definitions without using the builders.


References:

[1]: Transaction management / Fault tolerance issues

[2]: Concurrency issues

@fmbenhassine fmbenhassine added this to the 5.0.0 milestone Jun 25, 2021
@fmbenhassine fmbenhassine modified the milestones: 5.0.0, 5.0.0-M6 Aug 31, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M6, 5.0.0-M7 Sep 21, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M7, 5.0.0-M8 Oct 4, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0-M8, 5.0.0 Oct 12, 2022
fmbenhassine added a commit that referenced this issue Oct 12, 2022
@fmbenhassine fmbenhassine modified the milestones: 5.0.0, 5.1.0 Nov 24, 2022
@bwgjoseph
Copy link

Hi, is there any plan to revisit this any time soon? In particular, I am looking at #1101

@fmbenhassine
Copy link
Contributor Author

@bwgjoseph This is in progress. The PoC of the new implementation was announced in v5.1 last year: https://docs.spring.io/spring-batch/reference/whatsnew.html#new-chunk-oriented-step-implementation. If you want to help, I would be grateful if you give the experimental feature a try and share your feedback (not here, but on the experimental repo's issue tracker as explained here).

@bwgjoseph
Copy link

It's been awhile, let me try to see if there's anything that I can try out. No promises.

Thanks!

@bwgjoseph
Copy link

I've checked, and what's useful for my team is addressing fault-tolerance, but is not the case for the current implementation as what was described in the repo.

The new implementation does not address fault-tolerance and concurrency features for the moment. Those will be addressed incrementally in future versions. Our main focus for now is correctness, ie simplify the code with minimal to no behavioral changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants