Skip to content

Commit

Permalink
[server][common][vpj] Introduce ComplexVenicePartitioner to materiali…
Browse files Browse the repository at this point in the history
…zed view (#1509)

* [server][common][vpj] Introduce VeniceComplexPartitioner to materialized view

The change will not work if record is actually large and chunked. Proper chunking
support is needed and will be addressed in a separate PR.

1. Introduced VeniceComplexPartitioner which extends VenicePartitioner and offer
a new API to partition by value and provide possible one-to-many partition mapping.

2. Added value provider of type Lazy<GenericRecord> to VeniceViewWriter's processRecord
API to access deserialized value if needed. e.g. when a VeniceComplexPartitioner is
involved.

3. MergeConflictResult will now provide deserialized value in a best effort manner.
This is useful when we already deserialized the value for a partial update operation
so that the deserialized value can be provided directly to the materialized view writer.

4. Refactored VeniceWriter to expose an API to write to desired partition with new
DIV. This is only used by the new method writeWithComplexPartitioner for now to handle
the partitioning and writes of the same value to multiple partitions. However, this newly
exposed API should also come handy when we build proper chunking support to forward chunks
to predetermined view topic partitions.

5. writeWithComplexPartitioner in VeniceWriter will re-chunk when writing to each partition.
This should be optimized/refactored when we build proper chunking support.
  • Loading branch information
xunyin8 authored Feb 22, 2025
1 parent e5ad505 commit 2f3a731
Show file tree
Hide file tree
Showing 36 changed files with 1,523 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
consumerRecord.getTopicPartition(),
valueManifestContainer,
beforeProcessingBatchRecordsTimestampMs));
if (hasChangeCaptureView) {
if (hasChangeCaptureView || (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) {
/**
* Since this function will update the transient cache before writing the view, and if there is
* a change capture view writer, we need to lookup first, otherwise the transient cache will be populated
Expand Down Expand Up @@ -549,7 +549,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
rmdWithValueSchemaID,
valueManifestContainer,
null,
null));
null,
(schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId)));
} else {
validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation);

Expand Down Expand Up @@ -589,7 +590,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage(
rmdWithValueSchemaID,
valueManifestContainer,
updatedValueBytes,
updatedRmdBytes));
updatedRmdBytes,
(schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId)));
}
}

Expand Down Expand Up @@ -669,6 +671,7 @@ protected void processMessageAndMaybeProduceToKafka(
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
Lazy<GenericRecord> valueProvider = mergeConflictResultWrapper.getValueProvider();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(
Expand All @@ -677,7 +680,8 @@ protected void processMessageAndMaybeProduceToKafka(
keyBytes,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord()),
mergeConflictResult.getRmdRecord(),
valueProvider),
produceToVersionTopic);
} else {
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.view.ChangeCaptureViewWriter;
import com.linkedin.davinci.store.view.MaterializedViewWriter;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.davinci.validation.PartitionTracker;
Expand Down Expand Up @@ -71,6 +72,7 @@
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ByteUtils;
Expand Down Expand Up @@ -202,8 +204,9 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {

protected final Map<String, VeniceViewWriter> viewWriters;
protected final boolean hasChangeCaptureView;
protected final boolean hasComplexVenicePartitionerMaterializedView;

protected final AvroStoreDeserializerCache storeDeserializerCache;
protected final AvroStoreDeserializerCache<GenericRecord> storeDeserializerCache;

private final AtomicLong lastSendIngestionHeartbeatTimestamp = new AtomicLong(0);

Expand Down Expand Up @@ -336,16 +339,22 @@ public LeaderFollowerStoreIngestionTask(
version.getNumber(),
schemaRepository.getKeySchema(store.getName()).getSchema());
boolean tmpValueForHasChangeCaptureViewWriter = false;
boolean tmpValueForHasComplexVenicePartitioner = false;
for (Map.Entry<String, VeniceViewWriter> viewWriter: viewWriters.entrySet()) {
if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) {
tmpValueForHasChangeCaptureViewWriter = true;
break;
} else if (viewWriter.getValue() instanceof MaterializedViewWriter) {
if (((MaterializedViewWriter) viewWriter.getValue()).isComplexVenicePartitioner()) {
tmpValueForHasComplexVenicePartitioner = true;
}
}
}
hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter;
hasComplexVenicePartitionerMaterializedView = tmpValueForHasComplexVenicePartitioner;
} else {
viewWriters = Collections.emptyMap();
hasChangeCaptureView = false;
hasComplexVenicePartitionerMaterializedView = false;
}
this.storeDeserializerCache = new AvroStoreDeserializerCache(
builder.getSchemaRepo(),
Expand Down Expand Up @@ -3196,9 +3205,26 @@ private PubSubMessageProcessedResult processMessage(
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
byte[] keyBytes = kafkaKey.getKey();
MessageType msgType = MessageType.valueOf(kafkaValue.messageType);
Lazy<GenericRecord> valueProvider;
switch (msgType) {
case PUT:
Put put = (Put) kafkaValue.payloadUnion;
// Value provider should use un-compressed data.
final ByteBuffer rawPutValue = put.putValue;
final boolean needToDecompress = !partitionConsumptionState.isEndOfPushReceived();
valueProvider = Lazy.of(() -> {
RecordDeserializer<GenericRecord> recordDeserializer =
storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId);
if (needToDecompress) {
try {
return recordDeserializer.deserialize(compressor.get().decompress(rawPutValue));
} catch (IOException e) {
throw new VeniceException("Unable to provide value due to decompression failure", e);
}
} else {
return recordDeserializer.deserialize(rawPutValue);
}
});
put.putValue = maybeCompressData(
consumerRecord.getTopicPartition().getPartitionNumber(),
put.putValue,
Expand All @@ -3221,7 +3247,7 @@ private PubSubMessageProcessedResult processMessage(
null);
}

return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false));
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false, valueProvider));

case UPDATE:
/**
Expand Down Expand Up @@ -3266,20 +3292,20 @@ private PubSubMessageProcessedResult processMessage(

final byte[] updatedValueBytes;
final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest();

WriteComputeResult writeComputeResult;
try {
long writeComputeStartTimeInNS = System.nanoTime();

// Leader nodes are the only ones which process UPDATES, so it's valid to always compress and not call
// 'maybeCompress'.
updatedValueBytes = compressor.get()
.compress(
storeWriteComputeHandler.applyWriteCompute(
currValue,
update.schemaId,
readerValueSchemaId,
update.updateValue,
update.updateSchemaId,
readerUpdateProtocolVersion));
writeComputeResult = storeWriteComputeHandler.applyWriteCompute(
currValue,
update.schemaId,
readerValueSchemaId,
update.updateValue,
update.updateSchemaId,
readerUpdateProtocolVersion);
updatedValueBytes = compressor.get().compress(writeComputeResult.getUpdatedValueBytes());
hostLevelIngestionStats
.recordWriteComputeUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(writeComputeStartTimeInNS));
} catch (Exception e) {
Expand Down Expand Up @@ -3316,16 +3342,38 @@ private PubSubMessageProcessedResult processMessage(
Put updatedPut = new Put();
updatedPut.putValue = updateValueWithSchemaId;
updatedPut.schemaId = readerValueSchemaId;
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(updatedPut, oldValueManifest, false));
return new PubSubMessageProcessedResult(
new WriteComputeResultWrapper(
updatedPut,
oldValueManifest,
false,
Lazy.of(writeComputeResult::getUpdatedValue)));
}
case DELETE:
Lazy<GenericRecord> oldValueProvider;
if (hasComplexVenicePartitionerMaterializedView) {
// Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to
// generate deletes for materialized view topic partition(s). We need to do a non-lazy lookup before, so we
// have a chance of getting the old value before the transient record cache is updated to null as part of
// processing the DELETE.
int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId();
GenericRecord oldValue = readStoredValueRecord(
partitionConsumptionState,
keyBytes,
oldValueReaderSchemaId,
consumerRecord.getTopicPartition(),
new ChunkedValueManifestContainer());
oldValueProvider = Lazy.of(() -> oldValue);
} else {
oldValueProvider = Lazy.of(() -> null);
}
/**
* For WC enabled stores update the transient record map with the latest {key,null} for similar reason as mentioned in PUT above.
*/
if (isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) {
partitionConsumptionState.setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, -1, null);
}
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false));
return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false, oldValueProvider));

default:
throw new VeniceMessageException(
Expand Down Expand Up @@ -3377,9 +3425,11 @@ protected void processMessageAndMaybeProduceToKafka(
Put newPut = writeComputeResultWrapper.getNewPut();
// keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled
boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived();
Lazy<GenericRecord> newValueProvider = writeComputeResultWrapper.getValueProvider();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey),
(viewWriter) -> viewWriter
.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey, newValueProvider),
produceToVersionTopic);
} else {
produceToVersionTopic.run();
Expand Down Expand Up @@ -3969,7 +4019,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio

protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Function<VeniceViewWriter, CompletableFuture<Void>> viewWriterRecordProcessor,
Runnable versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import com.linkedin.davinci.replication.merge.MergeConflictResult;
import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;


/**
Expand All @@ -17,24 +20,53 @@ public class MergeConflictResultWrapper {
private final Lazy<ByteBuffer> oldValueByteBufferProvider;
private final RmdWithValueSchemaId oldRmdWithValueSchemaId;
private final ChunkedValueManifestContainer oldValueManifestContainer;

// Serialized and potentially compressed updated value bytes
private final ByteBuffer updatedValueBytes;
private final ByteBuffer updatedRmdBytes;

/**
* Best-effort deserialized value provider that provides the updated value for PUT/UPDATE and the old value for
* DELETE.
*/
private final Lazy<GenericRecord> valueProvider;

public MergeConflictResultWrapper(
MergeConflictResult mergeConflictResult,
Lazy<ByteBufferValueRecord<ByteBuffer>> oldValueProvider,
Lazy<ByteBuffer> oldValueByteBufferProvider,
RmdWithValueSchemaId oldRmdWithValueSchemaId,
ChunkedValueManifestContainer oldValueManifestContainer,
ByteBuffer updatedValueBytes,
ByteBuffer updatedRmdBytes) {
ByteBuffer updatedRmdBytes,
Function<Integer, RecordDeserializer<GenericRecord>> deserializerProvider) {
this.mergeConflictResult = mergeConflictResult;
this.oldValueProvider = oldValueProvider;
this.oldValueByteBufferProvider = oldValueByteBufferProvider;
this.oldRmdWithValueSchemaId = oldRmdWithValueSchemaId;
this.oldValueManifestContainer = oldValueManifestContainer;
this.updatedValueBytes = updatedValueBytes;
this.updatedRmdBytes = updatedRmdBytes;
if (updatedValueBytes == null) {
// this is a DELETE
ByteBufferValueRecord<ByteBuffer> oldValue = oldValueProvider.get();
if (oldValue == null || oldValue.value() == null) {
this.valueProvider = Lazy.of(() -> null);
} else {
this.valueProvider =
Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value()));
}
} else {
// this is a PUT or UPDATE
if (mergeConflictResult.getDeserializedValue().isPresent()) {
this.valueProvider = Lazy.of(() -> mergeConflictResult.getDeserializedValue().get());
} else {
// Use mergeConflictResult.getNewValue() here since updatedValueBytes could be compressed.
this.valueProvider = Lazy.of(
() -> deserializerProvider.apply(mergeConflictResult.getValueSchemaId())
.deserialize(mergeConflictResult.getNewValue()));
}
}
}

public MergeConflictResult getMergeConflictResult() {
Expand Down Expand Up @@ -64,4 +96,14 @@ public ByteBuffer getUpdatedValueBytes() {
public ByteBuffer getUpdatedRmdBytes() {
return updatedRmdBytes;
}

/**
* Return a best-effort value provider with the following behaviors:
* 1. returns the new value provider for PUT and UPDATE.
* 2. returns the old value for DELETE (null for non-existent key).
* 3. returns null if the value is not available.
*/
public Lazy<GenericRecord> getValueProvider() {
return valueProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public StoreWriteComputeProcessor(
* @param writerUpdateProtocolVersion Update protocol version used to serialize Update payload bytes.
* @param readerUpdateProtocolVersion Update protocol version used to deserialize Update payload bytes.
*
* @return Bytes of partially updated original value.
* @return {@link WriteComputeResult} of partially updated original value.
*/
public byte[] applyWriteCompute(
public WriteComputeResult applyWriteCompute(
GenericRecord currValue,
int writerValueSchemaId,
int readerValueSchemaId,
Expand All @@ -111,9 +111,9 @@ public byte[] applyWriteCompute(

// If write compute is enabled and the record is deleted, the updatedValue will be null.
if (updatedValue == null) {
return null;
return new WriteComputeResult(null, null);
}
return getValueSerializer(readerValueSchemaId).serialize(updatedValue);
return new WriteComputeResult(getValueSerializer(readerValueSchemaId).serialize(updatedValue), updatedValue);
}

private SchemaAndUniqueId getSchemaAndUniqueId(int valueSchemaId, int writeComputeSchemaId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.davinci.kafka.consumer;

import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;


/**
* Write compute result wrapper class holding the deserialized updated value and the serialized and potentially
* compressed updated value bytes.
*/
public class WriteComputeResult {
private final byte[] updatedValueBytes;
private final GenericRecord updatedValue;

public WriteComputeResult(byte[] updatedValueBytes, GenericRecord updatedValue) {
this.updatedValueBytes = updatedValueBytes;
this.updatedValue = updatedValue;
}

@Nullable
public byte[] getUpdatedValueBytes() {
return updatedValueBytes;
}

@Nullable
public GenericRecord getUpdatedValue() {
return updatedValue;
}
}
Loading

0 comments on commit 2f3a731

Please sign in to comment.