Skip to content

Commit

Permalink
[protocol] Add PartitionState schema v16 to replace long offset with …
Browse files Browse the repository at this point in the history
…PubSubPosition (#1482)

This change introduces fields in `PartitionState.avsc` to replace long offsets with  
PubSub positions and supporting termId based solution for the safe leadership handoff.  
The new fields include:  

- `lastProcessedVersionTopicPubSubPosition`: Tracks the last successfully consumed  
  position in the version topic.  
- `currentTermStartPubSubPosition`: Marks the PubSub position where the current  
  leader's term starts in the version topic.  
- `upstreamRealTimeTopicPubSubPositionMap`: A map tracking the last PubSub position  
  consumed from upstream topics, keyed by the bootstrap server URL.  
- `upstreamVersionTopicPubSubPosition`: Stores the last persisted upstream version  
  topic PubSub position; remains `null` if the batch native-replication source is the  
  same as the local region.  
- `latestObservedTermId`: Stores the most recent termId observed by this replica.
  • Loading branch information
sushantmane authored Feb 4, 2025
1 parent 76abcdd commit 0001030
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ subprojects {
doFirst {
def versionOverrides = [
// project(':internal:venice-common').file('src/main/resources/avro/StoreVersionState/v5', PathValidation.DIRECTORY)
project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v15', PathValidation.DIRECTORY)
]

def schemaDirs = [sourceDir]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
{
"name": "PartitionState",
"namespace": "com.linkedin.venice.kafka.protocol.state",
"doc": "This record holds the state necessary for a consumer to checkpoint its progress when consuming a Venice partition. When provided the state in this record, a consumer should thus be able to resume consuming midway through a stream.",
"type": "record",
"fields": [
{
"name": "offset",
"doc": "The last Kafka offset consumed successfully in this partition from version topic.",
"type": "long"
}, {
"name": "lastProcessedVersionTopicPubSubPosition",
"doc": "The last PubSub position consumed successfully in this partition from version topic",
"type": "bytes",
"default": ""
}, {
"name": "latestObservedTermId",
"doc": "The most recent termId observed by this replica.",
"type": "long",
"default": -1
}, {
"name": "currentTermStartPubSubPosition",
"doc": "The pubsub position marking the start of the current leader's term in the version topic.",
"type": "bytes",
"default": ""
}, {
"name": "offsetLag",
"doc": "The last Kafka offset lag in this partition for fast online transition in server restart.",
"type": "long",
"default": -1
}, {
"name": "endOfPush",
"doc": "Whether the EndOfPush control message was consumed in this partition.",
"type": "boolean"
}, {
"name": "lastUpdate",
"doc": "The last time this PartitionState was updated. Can be compared against the various messageTimestamp in ProducerPartitionState in order to infer lag time between producers and the consumer maintaining this PartitionState.",
"type": "long"
}, {
"name": "startOfBufferReplayDestinationOffset",
"doc": "This is the offset at which the StartOfBufferReplay control message was consumed in the current partition of the destination topic. This is not the same value as the source offsets contained in the StartOfBufferReplay control message itself. The source and destination offsets act together as a synchronization marker. N.B.: null means that the SOBR control message was not received yet.",
"type": ["null", "long"],
"default": null
}, {
"name": "databaseInfo",
"doc": "A map of string -> string to store database related info, which is necessary to checkpoint",
"type": {
"type": "map",
"values": "string"
},
"default": {}
}, {
"name": "incrementalPushInfo",
"doc": "metadata of ongoing incremental push in the partition",
"type": [
"null",
{
"name": "IncrementalPush",
"type": "record",
"fields": [
{
"name": "version",
"doc": "The version of current incremental push",
"type": "string"
}, {
"name": "error",
"doc": "The first error founded during in`gestion",
"type": ["null", "string"],
"default": null
}
]
}
],
"default": null
}, {
"name": "leaderTopic",
"type": ["null", "string"],
"doc": "The topic that leader is consuming from; for leader, leaderTopic can be different from the version topic; for follower, leaderTopic is the same as version topic.",
"default": null
}, {
"name": "leaderOffset",
"type": "long",
"doc": "The last Kafka offset consumed successfully in this partition from the leader topic. TODO: remove this field once upstreamOffsetMap is used everywhere.",
"default": -1
}, {
"name": "upstreamOffsetMap",
"type": {
"type": "map",
"values": "long",
"java-key-class": "java.lang.String",
"avro.java.string": "String"
},
"doc": "A map of upstream Kafka bootstrap server url -> the last Kafka offset consumed from upstream topic.",
"default": {}
}, {
"name": "upstreamRealTimeTopicPubSubPositionMap",
"type": {
"type": "map",
"values": "bytes",
"java-key-class": "java.lang.String",
"avro.java.string": "String"
},
"doc": "A map of upstream PubSub bootstrap server url -> the last PubSub position consumed from upstream topic.",
"default": {}
}, {
"name": "upstreamVersionTopicOffset",
"type": "long",
"doc": "The last upstream version topic offset persisted to disk; if the batch native-replication source is the same as local region, this value will always be -1",
"default": -1
}, {
"name": "upstreamVersionTopicPubSubPosition",
"type": "bytes",
"doc": "The last upstream version topic PubSub position persisted to disk; if the batch native-replication source is the same as local region, this value will always be null",
"default": ""
}, {
"name": "leaderGUID",
"type": [
"null",
{
"namespace": "com.linkedin.venice.kafka.protocol",
"name": "GUID",
"type": "fixed",
"size": 16
}
],
"doc": "This field is deprecated since GUID is no longer able to identify the split-brain issue once 'pass-through' mode is enabled in venice writer. The field is superseded by leaderHostId and will be removed in the future ",
"default": null
}, {
"name": "leaderHostId",
"type": ["null", "string"],
"doc": "An unique identifier (such as host name) that stands for each host. It's used to identify if there is a split-brain happened while the leader(s) re-produce records",
"default": null
}, {
"name": "producerStates",
"doc": "A map of producer GUID -> producer state.",
"type": {
"type": "map",
"values": {
"name": "ProducerPartitionState",
"doc": "A record containing the state pertaining to the data sent by one upstream producer into one partition.",
"type": "record",
"fields": [
{
"name": "segmentNumber",
"doc": "The current segment number corresponds to the last (highest) segment number for which we have seen a StartOfSegment control message.",
"type": "int"
}, {
"name": "segmentStatus",
"doc": "The status of the current segment: 0 => NOT_STARTED, 1 => IN_PROGRESS, 2 => END_OF_INTERMEDIATE_SEGMENT, 3 => END_OF_FINAL_SEGMENT.",
"type": "int"
}, {
"name": "isRegistered",
"doc": "Whether the segment is registered. i.e. received Start_Of_Segment to initialize the segment.",
"type": "boolean",
"default": false
}, {
"name": "messageSequenceNumber",
"doc": "The current message sequence number, within the current segment, which we have seen for this partition/producer pair.",
"type": "int"
}, {
"name": "messageTimestamp",
"doc": "The timestamp included in the last message we have seen for this partition/producer pair.",
"type": "long"
}, {
"name": "checksumType",
"doc": "The current mapping is the following: 0 => None, 1 => MD5, 2 => Adler32, 3 => CRC32.",
"type": "int"
}, {
"name": "checksumState",
"doc": "The value of the checksum computed since the last StartOfSegment ControlMessage.",
"type": "bytes"
}, {
"name": "aggregates",
"doc": "The aggregates that have been computed so far since the last StartOfSegment ControlMessage.",
"type": {
"type": "map",
"values": "long"
}
}, {
"name": "debugInfo",
"doc": "The debug info received as part of the last StartOfSegment ControlMessage.",
"type": {
"type": "map",
"values": "string"
}
}
]
}
}
}, {
"name": "previousStatuses",
"doc": "A map of string -> string which stands for previous PartitionStatus",
"type": {
"type": "map",
"values": "string"
},
"default": {}
}, {
"name": "pendingReportIncrementalPushVersions",
"doc": "A list of string which stands for incremental push versions which have received EOIP but not yet reported prior to lag caught up, they will be reported in batch",
"type": {
"type": "array",
"items": "string"
},
"default": []
}, {
"name": "realtimeTopicProducerStates",
"doc": "A map that maps upstream Kafka bootstrap server url -> to a map of producer GUID -> producer state for real-time data.",
"type": {
"type": "map",
"values": {
"type": "map",
"values": "com.linkedin.venice.kafka.protocol.state.ProducerPartitionState"
},
"java-key-class": "java.lang.String",
"avro.java.string": "String"
},
"default": {}
},
{
"name": "recordTransformerClassHash",
"doc": "An integer hash code used by the DaVinciRecordTransformer to detect changes to the user's class during bootstrapping.",
"type": [
"null",
"int"
],
"default": null
}
]
}

0 comments on commit 0001030

Please sign in to comment.