Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17351: Improved handling of compacted topics in share partition (2/N) #19010

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

apoorvmittal10
Copy link
Collaborator

@apoorvmittal10 apoorvmittal10 commented Feb 22, 2025

The PR handles fetch for compacted topics. The fix was required only when complete batch disappears from the topic log, and same batch is marked re-available in Share Partition state cache. Subsequent log reads will not result the disappeared batch in read response hence respective batch will be left as avaialable in the state cache.

The PR checks for the first fetched/read batch base offset and if it's greater than the position from where the read occurred (fetch offset) then if there exists any available batches in the state cache then they will be archived.

Adds following test cases:

  1. testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset - Added to verify the existing code for compacted topics.
  2. testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData - Added to verify the existing code for compacted topics.
  3. testAcquireWhenBatchesRemovedForFetchOffset - Verifies the changes in the current PR.
  4. testAcquireWhenBatchesRemovedForFetchOffsetWithinBatch - Verifies the changes in the current PR.
  5. testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch - Verifies the changes in the current PR.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Feb 22, 2025
@apoorvmittal10 apoorvmittal10 added ci-approved and removed triage PRs from the community labels Feb 22, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just a few simple comments from an initial review. I'll take a deeper look shortly.

*
* @param memberId The member id of the client that is fetching the record.
* @param batchSize The number of records per acquired records batch.
* @param maxFetchRecords The maximum number of records that should be acquired, this is a soft
* limit and the method might acquire more records than the maxFetchRecords,
* if the records are already part of the same fetch batch.
* * @param fetchOffset The fetch offset for which the records are fetched.
* @param fetchOffset The fetch offset for which the records are fetched.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's now an out-of-date comment on the declaration of fetchOffset. You've now started to use the parameter.

* @param map The map containing the in-flight records.
* @return A boolean which indicates whether any record is archived or not.
*/
private boolean archiveAvailableRecords(long startOffset, long endOffset, NavigableMap<Long, InFlightBatch> map) {
lock.writeLock().lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just observing that all callers already hold the write lock.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants