KAFKA-17351: Improved handling of compacted topics in share partition (2/N) #19010
+435
−50
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The PR handles fetch for
compacted
topics. The fix was required only when complete batch disappears from the topic log, and same batch is marked re-available in Share Partition state cache. Subsequent log reads will not result the disappeared batch in read response hence respective batch will be left as avaialable in the state cache.The PR checks for the first fetched/read batch base offset and if it's greater than the position from where the read occurred (fetch offset) then if there exists any
available
batches in the state cache then they will be archived.Adds following test cases:
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset
- Added to verify the existing code for compacted topics.testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData
- Added to verify the existing code for compacted topics.testAcquireWhenBatchesRemovedForFetchOffset
- Verifies the changes in the current PR.testAcquireWhenBatchesRemovedForFetchOffsetWithinBatch
- Verifies the changes in the current PR.testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch
- Verifies the changes in the current PR.