diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index c80d4e13801..a95ea1430fc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -455,7 +455,7 @@ private PubSubMessageProcessedResult processActiveActiveMessage( consumerRecord.getTopicPartition(), valueManifestContainer, beforeProcessingBatchRecordsTimestampMs)); - if (hasChangeCaptureView) { + if (hasChangeCaptureView || (hasComplexVenicePartitionerMaterializedView && msgType == MessageType.DELETE)) { /** * Since this function will update the transient cache before writing the view, and if there is * a change capture view writer, we need to lookup first, otherwise the transient cache will be populated @@ -549,7 +549,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage( rmdWithValueSchemaID, valueManifestContainer, null, - null)); + null, + (schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId))); } else { validatePostOperationResultsAndRecord(mergeConflictResult, offsetSumPreOperation, recordTimestampsPreOperation); @@ -589,7 +590,8 @@ private PubSubMessageProcessedResult processActiveActiveMessage( rmdWithValueSchemaID, valueManifestContainer, updatedValueBytes, - updatedRmdBytes)); + updatedRmdBytes, + (schemaId) -> storeDeserializerCache.getDeserializer(schemaId, schemaId))); } } @@ -669,6 +671,7 @@ protected void processMessageAndMaybeProduceToKafka( ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get(); int oldValueSchemaId = oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId(); + Lazy valueProvider = mergeConflictResultWrapper.getValueProvider(); queueUpVersionTopicWritesWithViewWriters( partitionConsumptionState, (viewWriter) -> viewWriter.processRecord( @@ -677,7 +680,8 @@ protected void processMessageAndMaybeProduceToKafka( keyBytes, mergeConflictResult.getValueSchemaId(), oldValueSchemaId, - mergeConflictResult.getRmdRecord()), + mergeConflictResult.getRmdRecord(), + valueProvider), produceToVersionTopic); } else { // This function may modify the original record in KME and it is unsafe to use the payload from KME directly diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 788152acf75..66133752db1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -31,6 +31,7 @@ import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.davinci.store.view.ChangeCaptureViewWriter; +import com.linkedin.davinci.store.view.MaterializedViewWriter; import com.linkedin.davinci.store.view.VeniceViewWriter; import com.linkedin.davinci.validation.KafkaDataIntegrityValidator; import com.linkedin.davinci.validation.PartitionTracker; @@ -71,6 +72,7 @@ import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.serialization.AvroStoreDeserializerCache; +import com.linkedin.venice.serializer.RecordDeserializer; import com.linkedin.venice.stats.StatsErrorCode; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.ByteUtils; @@ -202,8 +204,9 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask { protected final Map viewWriters; protected final boolean hasChangeCaptureView; + protected final boolean hasComplexVenicePartitionerMaterializedView; - protected final AvroStoreDeserializerCache storeDeserializerCache; + protected final AvroStoreDeserializerCache storeDeserializerCache; private final AtomicLong lastSendIngestionHeartbeatTimestamp = new AtomicLong(0); @@ -336,16 +339,22 @@ public LeaderFollowerStoreIngestionTask( version.getNumber(), schemaRepository.getKeySchema(store.getName()).getSchema()); boolean tmpValueForHasChangeCaptureViewWriter = false; + boolean tmpValueForHasComplexVenicePartitioner = false; for (Map.Entry viewWriter: viewWriters.entrySet()) { if (viewWriter.getValue() instanceof ChangeCaptureViewWriter) { tmpValueForHasChangeCaptureViewWriter = true; - break; + } else if (viewWriter.getValue() instanceof MaterializedViewWriter) { + if (((MaterializedViewWriter) viewWriter.getValue()).isComplexVenicePartitioner()) { + tmpValueForHasComplexVenicePartitioner = true; + } } } hasChangeCaptureView = tmpValueForHasChangeCaptureViewWriter; + hasComplexVenicePartitionerMaterializedView = tmpValueForHasComplexVenicePartitioner; } else { viewWriters = Collections.emptyMap(); hasChangeCaptureView = false; + hasComplexVenicePartitionerMaterializedView = false; } this.storeDeserializerCache = new AvroStoreDeserializerCache( builder.getSchemaRepo(), @@ -3196,9 +3205,26 @@ private PubSubMessageProcessedResult processMessage( KafkaMessageEnvelope kafkaValue = consumerRecord.getValue(); byte[] keyBytes = kafkaKey.getKey(); MessageType msgType = MessageType.valueOf(kafkaValue.messageType); + Lazy valueProvider; switch (msgType) { case PUT: Put put = (Put) kafkaValue.payloadUnion; + // Value provider should use un-compressed data. + final ByteBuffer rawPutValue = put.putValue; + final boolean needToDecompress = !partitionConsumptionState.isEndOfPushReceived(); + valueProvider = Lazy.of(() -> { + RecordDeserializer recordDeserializer = + storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId); + if (needToDecompress) { + try { + return recordDeserializer.deserialize(compressor.get().decompress(rawPutValue)); + } catch (IOException e) { + throw new VeniceException("Unable to provide value due to decompression failure", e); + } + } else { + return recordDeserializer.deserialize(rawPutValue); + } + }); put.putValue = maybeCompressData( consumerRecord.getTopicPartition().getPartitionNumber(), put.putValue, @@ -3221,7 +3247,7 @@ private PubSubMessageProcessedResult processMessage( null); } - return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false)); + return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(put, null, false, valueProvider)); case UPDATE: /** @@ -3266,20 +3292,20 @@ private PubSubMessageProcessedResult processMessage( final byte[] updatedValueBytes; final ChunkedValueManifest oldValueManifest = valueManifestContainer.getManifest(); - + WriteComputeResult writeComputeResult; try { long writeComputeStartTimeInNS = System.nanoTime(); + // Leader nodes are the only ones which process UPDATES, so it's valid to always compress and not call // 'maybeCompress'. - updatedValueBytes = compressor.get() - .compress( - storeWriteComputeHandler.applyWriteCompute( - currValue, - update.schemaId, - readerValueSchemaId, - update.updateValue, - update.updateSchemaId, - readerUpdateProtocolVersion)); + writeComputeResult = storeWriteComputeHandler.applyWriteCompute( + currValue, + update.schemaId, + readerValueSchemaId, + update.updateValue, + update.updateSchemaId, + readerUpdateProtocolVersion); + updatedValueBytes = compressor.get().compress(writeComputeResult.getUpdatedValueBytes()); hostLevelIngestionStats .recordWriteComputeUpdateLatency(LatencyUtils.getElapsedTimeFromNSToMS(writeComputeStartTimeInNS)); } catch (Exception e) { @@ -3316,16 +3342,38 @@ private PubSubMessageProcessedResult processMessage( Put updatedPut = new Put(); updatedPut.putValue = updateValueWithSchemaId; updatedPut.schemaId = readerValueSchemaId; - return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(updatedPut, oldValueManifest, false)); + return new PubSubMessageProcessedResult( + new WriteComputeResultWrapper( + updatedPut, + oldValueManifest, + false, + Lazy.of(writeComputeResult::getUpdatedValue))); } case DELETE: + Lazy oldValueProvider; + if (hasComplexVenicePartitionerMaterializedView) { + // Best-effort to provide the old value for delete operation in case needed by a ComplexVeniceWriter to + // generate deletes for materialized view topic partition(s). We need to do a non-lazy lookup before, so we + // have a chance of getting the old value before the transient record cache is updated to null as part of + // processing the DELETE. + int oldValueReaderSchemaId = schemaRepository.getSupersetSchema(storeName).getId(); + GenericRecord oldValue = readStoredValueRecord( + partitionConsumptionState, + keyBytes, + oldValueReaderSchemaId, + consumerRecord.getTopicPartition(), + new ChunkedValueManifestContainer()); + oldValueProvider = Lazy.of(() -> oldValue); + } else { + oldValueProvider = Lazy.of(() -> null); + } /** * For WC enabled stores update the transient record map with the latest {key,null} for similar reason as mentioned in PUT above. */ if (isWriteComputationEnabled && partitionConsumptionState.isEndOfPushReceived()) { partitionConsumptionState.setTransientRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, -1, null); } - return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false)); + return new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, false, oldValueProvider)); default: throw new VeniceMessageException( @@ -3377,9 +3425,11 @@ protected void processMessageAndMaybeProduceToKafka( Put newPut = writeComputeResultWrapper.getNewPut(); // keys will be serialized with chunk suffix during pass-through mode in L/F NR if chunking is enabled boolean isChunkedKey = isChunked() && !partitionConsumptionState.isEndOfPushReceived(); + Lazy newValueProvider = writeComputeResultWrapper.getValueProvider(); queueUpVersionTopicWritesWithViewWriters( partitionConsumptionState, - (viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey), + (viewWriter) -> viewWriter + .processRecord(newPut.putValue, keyBytes, newPut.schemaId, isChunkedKey, newValueProvider), produceToVersionTopic); } else { produceToVersionTopic.run(); @@ -3969,7 +4019,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio protected void queueUpVersionTopicWritesWithViewWriters( PartitionConsumptionState partitionConsumptionState, - Function> viewWriterRecordProcessor, + Function> viewWriterRecordProcessor, Runnable versionTopicWrite) { long preprocessingTime = System.currentTimeMillis(); CompletableFuture currentVersionTopicWrite = new CompletableFuture<>(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java index 9d2bcd22ac1..c5ded56590e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapper.java @@ -4,8 +4,11 @@ import com.linkedin.davinci.replication.merge.MergeConflictResult; import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; import com.linkedin.davinci.store.record.ByteBufferValueRecord; +import com.linkedin.venice.serializer.RecordDeserializer; import com.linkedin.venice.utils.lazy.Lazy; import java.nio.ByteBuffer; +import java.util.function.Function; +import org.apache.avro.generic.GenericRecord; /** @@ -17,9 +20,17 @@ public class MergeConflictResultWrapper { private final Lazy oldValueByteBufferProvider; private final RmdWithValueSchemaId oldRmdWithValueSchemaId; private final ChunkedValueManifestContainer oldValueManifestContainer; + + // Serialized and potentially compressed updated value bytes private final ByteBuffer updatedValueBytes; private final ByteBuffer updatedRmdBytes; + /** + * Best-effort deserialized value provider that provides the updated value for PUT/UPDATE and the old value for + * DELETE. + */ + private final Lazy valueProvider; + public MergeConflictResultWrapper( MergeConflictResult mergeConflictResult, Lazy> oldValueProvider, @@ -27,7 +38,8 @@ public MergeConflictResultWrapper( RmdWithValueSchemaId oldRmdWithValueSchemaId, ChunkedValueManifestContainer oldValueManifestContainer, ByteBuffer updatedValueBytes, - ByteBuffer updatedRmdBytes) { + ByteBuffer updatedRmdBytes, + Function> deserializerProvider) { this.mergeConflictResult = mergeConflictResult; this.oldValueProvider = oldValueProvider; this.oldValueByteBufferProvider = oldValueByteBufferProvider; @@ -35,6 +47,26 @@ public MergeConflictResultWrapper( this.oldValueManifestContainer = oldValueManifestContainer; this.updatedValueBytes = updatedValueBytes; this.updatedRmdBytes = updatedRmdBytes; + if (updatedValueBytes == null) { + // this is a DELETE + ByteBufferValueRecord oldValue = oldValueProvider.get(); + if (oldValue == null || oldValue.value() == null) { + this.valueProvider = Lazy.of(() -> null); + } else { + this.valueProvider = + Lazy.of(() -> deserializerProvider.apply(oldValue.writerSchemaId()).deserialize(oldValue.value())); + } + } else { + // this is a PUT or UPDATE + if (mergeConflictResult.getDeserializedValue().isPresent()) { + this.valueProvider = Lazy.of(() -> mergeConflictResult.getDeserializedValue().get()); + } else { + // Use mergeConflictResult.getNewValue() here since updatedValueBytes could be compressed. + this.valueProvider = Lazy.of( + () -> deserializerProvider.apply(mergeConflictResult.getValueSchemaId()) + .deserialize(mergeConflictResult.getNewValue())); + } + } } public MergeConflictResult getMergeConflictResult() { @@ -64,4 +96,14 @@ public ByteBuffer getUpdatedValueBytes() { public ByteBuffer getUpdatedRmdBytes() { return updatedRmdBytes; } + + /** + * Return a best-effort value provider with the following behaviors: + * 1. returns the new value provider for PUT and UPDATE. + * 2. returns the old value for DELETE (null for non-existent key). + * 3. returns null if the value is not available. + */ + public Lazy getValueProvider() { + return valueProvider; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java index cb3de47ab33..8343941d37f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java @@ -91,9 +91,9 @@ public StoreWriteComputeProcessor( * @param writerUpdateProtocolVersion Update protocol version used to serialize Update payload bytes. * @param readerUpdateProtocolVersion Update protocol version used to deserialize Update payload bytes. * - * @return Bytes of partially updated original value. + * @return {@link WriteComputeResult} of partially updated original value. */ - public byte[] applyWriteCompute( + public WriteComputeResult applyWriteCompute( GenericRecord currValue, int writerValueSchemaId, int readerValueSchemaId, @@ -111,9 +111,9 @@ public byte[] applyWriteCompute( // If write compute is enabled and the record is deleted, the updatedValue will be null. if (updatedValue == null) { - return null; + return new WriteComputeResult(null, null); } - return getValueSerializer(readerValueSchemaId).serialize(updatedValue); + return new WriteComputeResult(getValueSerializer(readerValueSchemaId).serialize(updatedValue), updatedValue); } private SchemaAndUniqueId getSchemaAndUniqueId(int valueSchemaId, int writeComputeSchemaId) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResult.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResult.java new file mode 100644 index 00000000000..426d7287330 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResult.java @@ -0,0 +1,29 @@ +package com.linkedin.davinci.kafka.consumer; + +import javax.annotation.Nullable; +import org.apache.avro.generic.GenericRecord; + + +/** + * Write compute result wrapper class holding the deserialized updated value and the serialized and potentially + * compressed updated value bytes. + */ +public class WriteComputeResult { + private final byte[] updatedValueBytes; + private final GenericRecord updatedValue; + + public WriteComputeResult(byte[] updatedValueBytes, GenericRecord updatedValue) { + this.updatedValueBytes = updatedValueBytes; + this.updatedValue = updatedValue; + } + + @Nullable + public byte[] getUpdatedValueBytes() { + return updatedValueBytes; + } + + @Nullable + public GenericRecord getUpdatedValue() { + return updatedValue; + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java index ea7669ef12f..a4e4a8f459c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/WriteComputeResultWrapper.java @@ -2,6 +2,8 @@ import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import com.linkedin.venice.utils.lazy.Lazy; +import org.apache.avro.generic.GenericRecord; /** @@ -14,11 +16,21 @@ public class WriteComputeResultWrapper { * This can be true when there is some delete op against a non-existing entry. */ private final boolean skipProduce; + private final Lazy valueProvider; public WriteComputeResultWrapper(Put newPut, ChunkedValueManifest oldValueManifest, boolean skipProduce) { + this(newPut, oldValueManifest, skipProduce, Lazy.of(() -> null)); + } + + public WriteComputeResultWrapper( + Put newPut, + ChunkedValueManifest oldValueManifest, + boolean skipProduce, + Lazy valueProvider) { this.newPut = newPut; this.oldValueManifest = oldValueManifest; this.skipProduce = skipProduce; + this.valueProvider = valueProvider; } public Put getNewPut() { @@ -32,4 +44,14 @@ public ChunkedValueManifest getOldValueManifest() { public boolean isSkipProduce() { return skipProduce; } + + /** + * Return a best-effort value provider with the following behaviors: + * 1. returns the new value provider for PUT and UPDATE. + * 2. returns the old value for DELETE (null for non-existent key). + * 3. returns null if the value is not available. + */ + public Lazy getValueProvider() { + return this.valueProvider; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java index 7125a33d63e..313213e63e8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -276,7 +277,12 @@ public MergeConflictResult update( final ByteBuffer updatedValueBytes = updatedValueAndRmd.getValue() == null ? null : serializeMergedValueRecord(oldValueSchemaID, updatedValueAndRmd.getValue()); - return new MergeConflictResult(updatedValueBytes, oldValueSchemaID, false, updatedValueAndRmd.getRmd()); + return new MergeConflictResult( + updatedValueBytes, + Optional.of(updatedValueAndRmd.getValue()), + oldValueSchemaID, + false, + updatedValueAndRmd.getRmd()); } private MergeConflictResult mergePutWithValueLevelTimestamp( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java index ea07eacdd50..c1d7c29daf9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResult.java @@ -1,6 +1,7 @@ package com.linkedin.davinci.replication.merge; import java.nio.ByteBuffer; +import java.util.Optional; import org.apache.avro.generic.GenericRecord; @@ -12,6 +13,7 @@ public class MergeConflictResult { private static final MergeConflictResult IGNORED_RESULT = new MergeConflictResult(); private ByteBuffer newValue; + private Optional deserializedValue; private int valueSchemaId; private final boolean updateIgnored; // Whether we should skip the incoming message since it could be a stale message. private boolean resultReusesInput; @@ -22,8 +24,18 @@ public MergeConflictResult( int valueSchemaID, boolean resultReusesInput, GenericRecord rmdRecord) { + this(newValue, Optional.empty(), valueSchemaID, resultReusesInput, rmdRecord); + } + + public MergeConflictResult( + ByteBuffer newValue, + Optional deserializedValue, + int valueSchemaID, + boolean resultReusesInput, + GenericRecord rmdRecord) { this.updateIgnored = false; this.newValue = newValue; + this.deserializedValue = deserializedValue; this.valueSchemaId = valueSchemaID; this.resultReusesInput = resultReusesInput; this.rmdRecord = rmdRecord; @@ -56,4 +68,14 @@ public boolean doesResultReuseInput() { public GenericRecord getRmdRecord() { return rmdRecord; } + + /** + * Provide the deserialized new value on a best-effort approach. Meaning that it's acceptable to return an empty + * Optional. e.g. MergeConflictResult of full PUTs will not contain deserialized new value since we don't need to + * deserialize the value to generate the MCR. + * @return deserialized new value if possible. + */ + public Optional getDeserializedValue() { + return deserializedValue; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index b1b6f8b1495..0aa187703cb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -14,9 +14,9 @@ import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; -import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.schema.rmd.RmdUtils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.ChangeCaptureView; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -59,13 +59,14 @@ public ChangeCaptureViewWriter( } @Override - public CompletableFuture processRecord( + public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, int newValueSchemaId, int oldValueSchemaId, - GenericRecord replicationMetadataRecord) { + GenericRecord replicationMetadataRecord, + Lazy valueProvider) { // TODO: not sold about having currentValue in the interface but it VASTLY simplifies a lot of things with regards // to dealing with compression/chunking/etc. in the storage layer. @@ -83,11 +84,12 @@ public CompletableFuture processRecord( } @Override - public CompletableFuture processRecord( + public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - boolean isChunkedKey) { + boolean isChunkedKey, + Lazy newValueProvider) { // No op return CompletableFuture.completedFuture(null); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java index 3c92b567e0e..ddbbe0d13f0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java @@ -6,12 +6,13 @@ import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.partitioner.ComplexVenicePartitioner; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; -import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; +import com.linkedin.venice.writer.ComplexVeniceWriter; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; import com.linkedin.venice.writer.VeniceWriterOptions; @@ -31,7 +32,7 @@ public class MaterializedViewWriter extends VeniceViewWriter { private final PubSubProducerAdapterFactory pubSubProducerAdapterFactory; private final MaterializedView internalView; private final String materializedViewTopicName; - private Lazy veniceWriter; + private Lazy veniceWriter; private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); public MaterializedViewWriter( @@ -47,25 +48,26 @@ public MaterializedViewWriter( internalView.getTopicNamesAndConfigsForVersion(version.getNumber()).keySet().stream().findAny().get(); this.veniceWriter = Lazy.of( () -> new VeniceWriterFactory(props.getCombinedProperties().toProperties(), pubSubProducerAdapterFactory, null) - .createVeniceWriter(buildWriterOptions())); + .createComplexVeniceWriter(buildWriterOptions())); } /** * package private for testing purpose */ - void setVeniceWriter(VeniceWriter veniceWriter) { + void setVeniceWriter(ComplexVeniceWriter veniceWriter) { this.veniceWriter = Lazy.of(() -> veniceWriter); } @Override - public CompletableFuture processRecord( + public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, int newValueSchemaId, int oldValueSchemaId, - GenericRecord replicationMetadataRecord) { - return processRecord(newValue, key, newValueSchemaId, false); + GenericRecord replicationMetadataRecord, + Lazy valueProvider) { + return processRecord(newValue, key, newValueSchemaId, false, valueProvider); } /** @@ -77,20 +79,22 @@ public CompletableFuture processRecord( * will assemble and re-chunk. */ @Override - public CompletableFuture processRecord( + public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - boolean isChunkedKey) { + boolean isChunkedKey, + Lazy newValueProvider) { byte[] viewTopicKey = key; if (isChunkedKey) { viewTopicKey = keyWithChunkingSuffixSerializer.getKeyFromChunkedKey(key); } + byte[] newValueBytes = newValue == null ? null : ByteUtils.extractByteArray(newValue); if (newValue == null) { // this is a delete operation - return veniceWriter.get().delete(viewTopicKey, null); + return veniceWriter.get().complexDelete(viewTopicKey, newValueProvider); } - return veniceWriter.get().put(viewTopicKey, ByteUtils.extractByteArray(newValue), newValueSchemaId); + return veniceWriter.get().complexPut(viewTopicKey, newValueBytes, newValueSchemaId, newValueProvider); } @Override @@ -113,4 +117,8 @@ public String getWriterClassName() { VeniceWriterOptions buildWriterOptions() { return setProducerOptimizations(internalView.getWriterOptionsBuilder(materializedViewTopicName, version)).build(); } + + public boolean isComplexVenicePartitioner() { + return internalView.getViewPartitioner() instanceof ComplexVenicePartitioner; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java index 49ee755041a..b10545b8600 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/VeniceViewWriter.java @@ -9,7 +9,7 @@ import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; @@ -62,18 +62,20 @@ public VeniceViewWriter( * @param newValue the incoming fully specified value which hasn't yet been committed to Venice * @param oldValue the previous value which has already been locally committed to Venice for the given key * @param key the key of the record that designates newValue and oldValue - * @param version the version of the store taking this record * @param newValueSchemaId the schemaId of the incoming record * @param oldValueSchemaId the schemaId of the old record * @param replicationMetadataRecord the associated RMD for the incoming record. + * @param valueProvider to provide the corresponding deserialized newValue for PUT and UPDATE or the old value for the + * given key for DELETE. */ - public abstract CompletableFuture processRecord( + public abstract CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, int newValueSchemaId, int oldValueSchemaId, - GenericRecord replicationMetadataRecord); + GenericRecord replicationMetadataRecord, + Lazy valueProvider); /** * To be called as a given ingestion task consumes each record. This is called prior to writing to a @@ -83,12 +85,14 @@ public abstract CompletableFuture processRecord( * @param key the key of the record that designates newValue and oldValue * @param newValueSchemaId the schemaId of the incoming record * @param isChunkedKey is the key already serialized with {@link com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer} + * @param newValueProvider to provide the deserialized new value */ - public abstract CompletableFuture processRecord( + public abstract CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - boolean isChunkedKey); + boolean isChunkedKey, + Lazy newValueProvider); /** * Called when the server encounters a control message. There isn't (today) a strict ordering diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index c3b4e17d430..f9f24237da3 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -40,7 +40,6 @@ import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubMessage; -import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.SchemaEntry; @@ -279,8 +278,9 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc MaterializedViewWriter materializedViewWriter = mock(MaterializedViewWriter.class); viewWriterMap.put("testView", materializedViewWriter); when(mockVeniceViewWriterFactory.buildStoreViewWriters(any(), anyInt(), any())).thenReturn(viewWriterMap); - CompletableFuture viewWriterFuture = new CompletableFuture<>(); - when(materializedViewWriter.processRecord(any(), any(), anyInt(), anyBoolean())).thenReturn(viewWriterFuture); + CompletableFuture viewWriterFuture = new CompletableFuture<>(); + when(materializedViewWriter.processRecord(any(), any(), anyInt(), anyBoolean(), any())) + .thenReturn(viewWriterFuture); setUp(); WriteComputeResultWrapper mockResult = mock(WriteComputeResultWrapper.class); Put put = new Put(); @@ -291,12 +291,12 @@ public void testQueueUpVersionTopicWritesWithViewWriters() throws InterruptedExc .thenReturn(CompletableFuture.completedFuture(null)); leaderFollowerStoreIngestionTask.queueUpVersionTopicWritesWithViewWriters( mockPartitionConsumptionState, - (viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1, false), + (viewWriter) -> viewWriter.processRecord(mock(ByteBuffer.class), new byte[1], 1, false, Lazy.of(() -> null)), () -> writeToVersionTopic.set(true)); verify(mockPartitionConsumptionState, times(1)).getLastVTProduceCallFuture(); ArgumentCaptor vtWriteFutureCaptor = ArgumentCaptor.forClass(CompletableFuture.class); verify(mockPartitionConsumptionState, times(1)).setLastVTProduceCallFuture(vtWriteFutureCaptor.capture()); - verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt(), anyBoolean()); + verify(materializedViewWriter, times(1)).processRecord(any(), any(), anyInt(), anyBoolean(), any()); verify(hostLevelIngestionStats, times(1)).recordViewProducerLatency(anyDouble()); verify(hostLevelIngestionStats, never()).recordViewProducerAckLatency(anyDouble()); assertFalse(writeToVersionTopic.get()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapperTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapperTest.java new file mode 100644 index 00000000000..ba981d4d59e --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/MergeConflictResultWrapperTest.java @@ -0,0 +1,84 @@ +package com.linkedin.davinci.kafka.consumer; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import com.linkedin.davinci.replication.RmdWithValueSchemaId; +import com.linkedin.davinci.replication.merge.MergeConflictResult; +import com.linkedin.davinci.storage.chunking.ChunkedValueManifestContainer; +import com.linkedin.davinci.store.record.ByteBufferValueRecord; +import com.linkedin.venice.serializer.RecordDeserializer; +import com.linkedin.venice.utils.lazy.Lazy; +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.function.Function; +import org.apache.avro.generic.GenericRecord; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class MergeConflictResultWrapperTest { + @Test + public void testValueProvider() { + MergeConflictResult mergeConflictResult = mock(MergeConflictResult.class); + RecordDeserializer deserializer = mock(RecordDeserializer.class); + GenericRecord mockDeleteOldRecord = mock(GenericRecord.class); + GenericRecord mockNewValueRecord = mock(GenericRecord.class); + ByteBuffer mockDeleteOldValueBytes = mock(ByteBuffer.class); + ByteBuffer mockNewValueBytes = mock(ByteBuffer.class); + doReturn(mockDeleteOldRecord).when(deserializer).deserialize(mockDeleteOldValueBytes); + doReturn(mockNewValueRecord).when(deserializer).deserialize(mockNewValueBytes); + RmdWithValueSchemaId rmdWithValueSchemaId = mock(RmdWithValueSchemaId.class); + ChunkedValueManifestContainer chunkedValueManifestContainer = mock(ChunkedValueManifestContainer.class); + ByteBuffer mockUpdatedRmdBytes = mock(ByteBuffer.class); + Function> deserProvider = (schemaId) -> deserializer; + // DELETE + MergeConflictResultWrapper nonExistingKeyDeleteWrapper = new MergeConflictResultWrapper( + mergeConflictResult, + Lazy.of(() -> null), + Lazy.of(() -> null), + rmdWithValueSchemaId, + chunkedValueManifestContainer, + null, + mockUpdatedRmdBytes, + deserProvider); + Assert.assertNull(nonExistingKeyDeleteWrapper.getValueProvider().get()); + ByteBufferValueRecord mockDeleteByteBufferValueRecord = mock(ByteBufferValueRecord.class); + doReturn(mockDeleteOldValueBytes).when(mockDeleteByteBufferValueRecord).value(); + MergeConflictResultWrapper existingKeyDeleteWrapper = new MergeConflictResultWrapper( + mergeConflictResult, + Lazy.of(() -> mockDeleteByteBufferValueRecord), + Lazy.of(() -> mockDeleteOldValueBytes), + rmdWithValueSchemaId, + chunkedValueManifestContainer, + null, + mockUpdatedRmdBytes, + deserProvider); + Assert.assertEquals(existingKeyDeleteWrapper.getValueProvider().get(), mockDeleteOldRecord); + // PUT/UPDATE + ByteBuffer mockUpdatedValueBytes = mock(ByteBuffer.class); + doReturn(Optional.of(mockNewValueRecord)).when(mergeConflictResult).getDeserializedValue(); + MergeConflictResultWrapper updateCachedWrapper = new MergeConflictResultWrapper( + mergeConflictResult, + Lazy.of(() -> null), + Lazy.of(() -> null), + rmdWithValueSchemaId, + chunkedValueManifestContainer, + mockUpdatedValueBytes, + mockUpdatedRmdBytes, + deserProvider); + Assert.assertEquals(updateCachedWrapper.getValueProvider().get(), mockNewValueRecord); + doReturn(Optional.empty()).when(mergeConflictResult).getDeserializedValue(); + doReturn(mockNewValueBytes).when(mergeConflictResult).getNewValue(); + MergeConflictResultWrapper updateNotCachedWrapper = new MergeConflictResultWrapper( + mergeConflictResult, + Lazy.of(() -> null), + Lazy.of(() -> null), + rmdWithValueSchemaId, + chunkedValueManifestContainer, + mockUpdatedValueBytes, + mockUpdatedRmdBytes, + deserProvider); + Assert.assertEquals(updateNotCachedWrapper.getValueProvider().get(), mockNewValueRecord); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java index fb3d03c6ba2..5d2831d389a 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriterTest.java @@ -24,6 +24,7 @@ import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.ChangeCaptureView; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; @@ -245,15 +246,21 @@ public void testProcessRecord() throws ExecutionException, InterruptedException rmdRecordWithValueLevelTimeStamp.put(TIMESTAMP_FIELD_NAME, 20L); rmdRecordWithValueLevelTimeStamp.put(REPLICATION_CHECKPOINT_VECTOR_FIELD_NAME, vectors); changeCaptureViewWriter.setVeniceWriter(mockVeniceWriter); - + Lazy dummyValueProvider = Lazy.of(() -> null); // Update Case - changeCaptureViewWriter.processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter + .processRecord(NEW_VALUE, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .get(); // Insert Case - changeCaptureViewWriter.processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter + .processRecord(NEW_VALUE, null, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .get(); // Deletion Case - changeCaptureViewWriter.processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp).get(); + changeCaptureViewWriter + .processRecord(null, OLD_VALUE, KEY, 1, 1, rmdRecordWithValueLevelTimeStamp, dummyValueProvider) + .get(); // Set up argument captors ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(byte[].class); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java index 7e11f795ff5..d2082bb8093 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/view/MaterializedViewWriterTest.java @@ -18,6 +18,7 @@ import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.kafka.consumer.PartitionConsumptionState; +import com.linkedin.davinci.utils.UnitTestComplexPartitioner; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; @@ -33,9 +34,10 @@ import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; -import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.ComplexVeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; import java.nio.ByteBuffer; import java.util.Arrays; @@ -126,7 +128,7 @@ public void testProcessControlMessage() { controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); KafkaKey kafkaKey = mock(KafkaKey.class); doReturn(KafkaKey.HEART_BEAT.getKey()).when(kafkaKey).getKey(); - VeniceWriter veniceWriter = mock(VeniceWriter.class); + ComplexVeniceWriter veniceWriter = mock(ComplexVeniceWriter.class); when(veniceWriter.sendHeartbeat(anyString(), anyInt(), any(), any(), anyBoolean(), any(), anyLong())) .thenReturn(CompletableFuture.completedFuture(null)); doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter) @@ -153,8 +155,8 @@ public void testViewWriterCanForwardChunkedKeysCorrectly() { Map viewParamsMap = viewParamsBuilder.build(); VeniceConfigLoader props = getMockProps(); MaterializedViewWriter materializedViewWriter = new MaterializedViewWriter(props, version, SCHEMA, viewParamsMap); - VeniceWriter veniceWriter = mock(VeniceWriter.class); - doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter).put(any(), any(), anyInt()); + ComplexVeniceWriter veniceWriter = mock(ComplexVeniceWriter.class); + doReturn(CompletableFuture.completedFuture(null)).when(veniceWriter).complexPut(any(), any(), anyInt(), any()); materializedViewWriter.setVeniceWriter(veniceWriter); KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer(); ByteBuffer dummyValue = mock(ByteBuffer.class); @@ -163,13 +165,42 @@ public void testViewWriterCanForwardChunkedKeysCorrectly() { for (int i = 0; i < 100; i++) { byte[] key = new byte[keySize]; RANDOM.nextBytes(key); - materializedViewWriter - .processRecord(dummyValue, keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key), 1, true); - verify(veniceWriter, times(1)).put(eq(key), any(), eq(1)); + materializedViewWriter.processRecord( + dummyValue, + keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key), + 1, + true, + Lazy.of(() -> null)); + verify(veniceWriter, times(1)).complexPut(eq(key), any(), eq(1), any()); Mockito.clearInvocations(veniceWriter); } } + @Test + public void testIsComplexVenicePartitioner() { + String storeName = "testStore"; + String viewName = "simplePartitionerView"; + Version version = mock(Version.class); + doReturn(true).when(version).isChunkingEnabled(); + doReturn(true).when(version).isRmdChunkingEnabled(); + getMockStore(storeName, 1, version); + MaterializedViewParameters.Builder viewParamsBuilder = new MaterializedViewParameters.Builder(viewName); + viewParamsBuilder.setPartitionCount(6); + viewParamsBuilder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName()); + VeniceConfigLoader props = getMockProps(); + MaterializedViewWriter materializedViewWriter = + new MaterializedViewWriter(props, version, SCHEMA, viewParamsBuilder.build()); + Assert.assertFalse(materializedViewWriter.isComplexVenicePartitioner()); + String complexViewName = "complexPartitionerView"; + MaterializedViewParameters.Builder complexViewParamsBuilder = + new MaterializedViewParameters.Builder(complexViewName); + complexViewParamsBuilder.setPartitionCount(6); + complexViewParamsBuilder.setPartitioner(UnitTestComplexPartitioner.class.getCanonicalName()); + MaterializedViewWriter complexMaterializedViewWriter = + new MaterializedViewWriter(props, version, SCHEMA, complexViewParamsBuilder.build()); + Assert.assertTrue(complexMaterializedViewWriter.isComplexVenicePartitioner()); + } + private VeniceConfigLoader getMockProps() { VeniceConfigLoader props = mock(VeniceConfigLoader.class); VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/utils/UnitTestComplexPartitioner.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/utils/UnitTestComplexPartitioner.java new file mode 100644 index 00000000000..c720330ef89 --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/utils/UnitTestComplexPartitioner.java @@ -0,0 +1,43 @@ +package com.linkedin.davinci.utils; + +import com.linkedin.venice.partitioner.ComplexVenicePartitioner; +import com.linkedin.venice.utils.PartitionUtils; +import com.linkedin.venice.utils.ReflectUtils; +import com.linkedin.venice.utils.VeniceProperties; +import java.nio.ByteBuffer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + + +/** + * Dummy complex venice partitioner used for unit tests. Cannot use private static class because {@link PartitionUtils} + * uses {@link ReflectUtils#loadClass(String)}. + */ +public class UnitTestComplexPartitioner extends ComplexVenicePartitioner { + public UnitTestComplexPartitioner() { + super(); + } + + public UnitTestComplexPartitioner(VeniceProperties props) { + super(props); + } + + public UnitTestComplexPartitioner(VeniceProperties props, Schema schema) { + super(props, schema); + } + + @Override + public int[] getPartitionId(byte[] keyBytes, GenericRecord value, int numPartitions) { + return new int[0]; + } + + @Override + public int getPartitionId(byte[] keyBytes, int numPartitions) { + return 0; + } + + @Override + public int getPartitionId(ByteBuffer keyByteBuffer, int numPartitions) { + return 0; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index f16c78fee25..5e75799d328 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -769,21 +769,6 @@ public void run() { pushJobSetting.rewindTimeInSecondsOverride = DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE; LOGGER.info("Overriding re-push rewind time in seconds to: {}", pushJobSetting.rewindTimeInSecondsOverride); } - if (pushJobSetting.repushTTLEnabled) { - // Build the full path for HDFSRmdSchemaSource: - // RMD schemas: /rmd_schemas - // Value schemas: /value_schemas - Path rmdSchemaDir = new Path(jobTmpDir, "rmd_schemas"); - HadoopUtils.createDirectoryWithPermission(rmdSchemaDir, PERMISSION_700); - Path valueSchemaDir = new Path(jobTmpDir, "value_schemas"); - HadoopUtils.createDirectoryWithPermission(valueSchemaDir, PERMISSION_700); - try (HDFSSchemaSource schemaSource = - new HDFSSchemaSource(valueSchemaDir, rmdSchemaDir, pushJobSetting.storeName)) { - schemaSource.saveSchemasOnDisk(controllerClient); - pushJobSetting.rmdSchemaDir = schemaSource.getRmdSchemaPath(); - pushJobSetting.valueSchemaDir = schemaSource.getValueSchemaPath(); - } - } } // Create new store version, topic and fetch Kafka url from backend createNewStoreVersion( @@ -825,6 +810,9 @@ public void run() { } else { // Populate any view configs to job properties configureJobPropertiesWithMaterializedViewConfigs(); + if (pushJobSetting.repushTTLEnabled || pushJobSetting.materializedViewConfigFlatMap != null) { + buildHDFSSchemaDir(); + } if (pushJobSetting.sendControlMessagesDirectly) { getVeniceWriter(pushJobSetting).broadcastStartOfPush( SORTED, @@ -930,6 +918,21 @@ public void run() { } } + private void buildHDFSSchemaDir() throws IOException { + // Build the full path for HDFSRmdSchemaSource: + // RMD schemas: /rmd_schemas + // Value schemas: /value_schemas + Path rmdSchemaDir = new Path(jobTmpDir, "rmd_schemas"); + HadoopUtils.createDirectoryWithPermission(rmdSchemaDir, PERMISSION_700); + Path valueSchemaDir = new Path(jobTmpDir, "value_schemas"); + HadoopUtils.createDirectoryWithPermission(valueSchemaDir, PERMISSION_700); + try (HDFSSchemaSource schemaSource = new HDFSSchemaSource(valueSchemaDir, rmdSchemaDir, pushJobSetting.storeName)) { + schemaSource.saveSchemasOnDisk(controllerClient); + pushJobSetting.rmdSchemaDir = schemaSource.getRmdSchemaPath(); + pushJobSetting.valueSchemaDir = schemaSource.getValueSchemaPath(); + } + } + /** * Get the set of regions that haven't been pushed yet after targeted region push. * @return a set of regions that haven't been pushed yet. diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java index faa6e3968e1..7a74eb2d981 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/jobs/DataWriterMRJob.java @@ -308,6 +308,8 @@ private void setupReducerConf(JobConf jobConf, PushJobSetting pushJobSetting) { } if (pushJobSetting.materializedViewConfigFlatMap != null) { jobConf.set(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap); + jobConf.set(VALUE_SCHEMA_DIR, pushJobSetting.valueSchemaDir); + jobConf.set(RMD_SCHEMA_DIR, pushJobSetting.rmdSchemaDir); } jobConf.setReduceSpeculativeExecution(vpjProperties.getBoolean(REDUCER_SPECULATIVE_EXECUTION_ENABLE, false)); int partitionCount = pushJobSetting.partitionCount; diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java index e45a50b30cd..9f656d239c5 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java @@ -4,42 +4,58 @@ import static com.linkedin.venice.ConfigKeys.PUSH_JOB_GUID_MOST_SIGNIFICANT_BITS; import static com.linkedin.venice.ConfigKeys.PUSH_JOB_VIEW_CONFIGS; import static com.linkedin.venice.vpj.VenicePushJobConstants.ALLOW_DUPLICATE_KEY; +import static com.linkedin.venice.vpj.VenicePushJobConstants.COMPRESSION_STRATEGY; import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_IS_DUPLICATED_KEY_ALLOWED; import static com.linkedin.venice.vpj.VenicePushJobConstants.DERIVED_SCHEMA_ID_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.ENABLE_WRITE_COMPUTE; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_TOPIC; +import static com.linkedin.venice.vpj.VenicePushJobConstants.RMD_SCHEMA_DIR; import static com.linkedin.venice.vpj.VenicePushJobConstants.STORAGE_QUOTA_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.TELEMETRY_MESSAGE_INTERVAL; import static com.linkedin.venice.vpj.VenicePushJobConstants.TOPIC_PROP; +import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_DIR; import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_ID_PROP; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.annotation.NotThreadsafe; +import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.compression.CompressorFactory; +import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.exceptions.RecordTooLargeException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceResourceAccessException; import com.linkedin.venice.guid.GuidUtils; import com.linkedin.venice.hadoop.InputStorageQuotaTracker; import com.linkedin.venice.hadoop.engine.EngineTaskConfigProvider; +import com.linkedin.venice.hadoop.input.kafka.KafkaInputUtils; +import com.linkedin.venice.hadoop.schema.HDFSSchemaSource; import com.linkedin.venice.hadoop.task.TaskTracker; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.ViewConfig; +import com.linkedin.venice.partitioner.ComplexVenicePartitioner; import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import com.linkedin.venice.serialization.DefaultSerializer; +import com.linkedin.venice.serializer.FastSerializerDeserializerFactory; +import com.linkedin.venice.serializer.RecordDeserializer; import com.linkedin.venice.utils.ByteUtils; +import com.linkedin.venice.utils.DictionaryUtils; import com.linkedin.venice.utils.PartitionUtils; import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; import com.linkedin.venice.writer.AbstractVeniceWriter; -import com.linkedin.venice.writer.CompositeVeniceWriter; import com.linkedin.venice.writer.DeleteMetadata; import com.linkedin.venice.writer.PutMetadata; import com.linkedin.venice.writer.VeniceWriter; @@ -57,6 +73,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -146,7 +164,7 @@ public int getValueSchemaId() { private Lazy veniceWriterFactory; private AbstractVeniceWriter veniceWriter = null; private VeniceWriter mainWriter = null; - private VeniceWriter[] childWriters = null; + private AbstractVeniceWriter[] childWriters = null; private int valueSchemaId = -1; private int derivedValueSchemaId = -1; private boolean enableWriteCompute = false; @@ -179,6 +197,11 @@ public int getValueSchemaId() { private boolean hasDuplicateKeyWithDistinctValue = false; private boolean hasRecordTooLargeFailure = false; private boolean isDuplicateKeyAllowed = DEFAULT_IS_DUPLICATED_KEY_ALLOWED; + private HDFSSchemaSource schemaSource; + private Map valueSchemaMap; + private Map> valueDeserializerCache; + private final Lazy compressorFactory = Lazy.of(CompressorFactory::new); + private VeniceCompressor compressor; /** * Compute engines will kill a task if it's inactive for a configured time. This time might be is too short for the @@ -390,7 +413,7 @@ private AbstractVeniceWriter createCompositeVeniceWriter boolean rmdChunkingEnabled) { try { Map viewConfigMap = ViewUtils.parseViewConfigMapString(flatViewConfigMapString); - childWriters = new VeniceWriter[viewConfigMap.size()]; + childWriters = new AbstractVeniceWriter[viewConfigMap.size()]; String storeName = Version.parseStoreFromKafkaTopicName(topicName); int versionNumber = Version.parseVersionFromKafkaTopicName(topicName); // TODO using a dummy Version to get venice writer options could be error prone. Alternatively we could change @@ -403,6 +426,31 @@ private AbstractVeniceWriter createCompositeVeniceWriter VeniceView view = ViewUtils .getVeniceView(viewConfig.getViewClassName(), new Properties(), storeName, viewConfig.getViewParameters()); String viewTopic = view.getTopicNamesAndConfigsForVersion(versionNumber).keySet().stream().findAny().get(); + if (view instanceof MaterializedView) { + MaterializedView materializedView = (MaterializedView) view; + if (materializedView.getViewPartitioner() instanceof ComplexVenicePartitioner) { + // We need to build a ComplexPartitionerWriterAdapter to handle writes with complex partitioner. + initializeSchemaSourceAndDeserCache(); + compressor = getCompressor(); + childWriters[index++] = new ComplexVeniceWriterAdapter( + viewTopic, + factory.createComplexVeniceWriter(view.getWriterOptionsBuilder(viewTopic, version).build()), + (valueBytes, valueSchemaId) -> valueDeserializerCache + .computeIfAbsent(valueSchemaId, this::getValueDeserializer) + .deserialize(valueBytes), + (valueBytes) -> { + if (compressor == null) { + return valueBytes; + } + try { + return ByteUtils.extractByteArray(compressor.decompress(valueBytes, 0, valueBytes.length)); + } catch (IOException e) { + throw new VeniceException("Unable to decompress value bytes", e); + } + }); + continue; + } + } childWriters[index++] = factory.createVeniceWriter(view.getWriterOptionsBuilder(viewTopic, version).build()); } return new CompositeVeniceWriter( @@ -417,6 +465,39 @@ private AbstractVeniceWriter createCompositeVeniceWriter } } + private VeniceCompressor getCompressor() { + if (props.containsKey(KAFKA_INPUT_TOPIC)) { + // Configure compressor using kafka input configs + String sourceVersion = props.getString(KAFKA_INPUT_TOPIC); + String kafkaInputBrokerUrl = props.getString(KAFKA_INPUT_BROKER_URL); + CompressionStrategy strategy = + CompressionStrategy.valueOf(props.getString(KAFKA_INPUT_SOURCE_COMPRESSION_STRATEGY)); + return KafkaInputUtils + .getCompressor(compressorFactory.get(), strategy, kafkaInputBrokerUrl, sourceVersion, props); + } else { + CompressionStrategy strategy = CompressionStrategy.valueOf(props.getString(COMPRESSION_STRATEGY)); + if (strategy == CompressionStrategy.ZSTD_WITH_DICT) { + String topicName = props.getString(TOPIC_PROP); + ByteBuffer dict = DictionaryUtils.readDictionaryFromKafka(topicName, props); + return compressorFactory.get() + .createVersionSpecificCompressorIfNotExist(strategy, topicName, ByteUtils.extractByteArray(dict)); + } else { + return compressorFactory.get().getCompressor(strategy); + } + } + } + + private void initializeSchemaSourceAndDeserCache() throws IOException { + schemaSource = new HDFSSchemaSource(props.getString(VALUE_SCHEMA_DIR), props.getString(RMD_SCHEMA_DIR)); + valueSchemaMap = schemaSource.fetchValueSchemas(); + valueDeserializerCache = new VeniceConcurrentHashMap<>(); + } + + private RecordDeserializer getValueDeserializer(int valueSchemaId) { + Schema schema = valueSchemaMap.get(valueSchemaId); + return FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(schema, schema); + } + private void telemetry() { if (messageSent % telemetryMessageInterval == 0) { double timeSinceLastMeasurementInSeconds = @@ -468,7 +549,7 @@ public void close() throws IOException { } if (veniceWriter instanceof CompositeVeniceWriter) { if (childWriters != null) { - for (VeniceWriter childWriter: childWriters) { + for (AbstractVeniceWriter childWriter: childWriters) { childWriter.close(shouldEndAllSegments); } } @@ -484,6 +565,12 @@ public void close() throws IOException { throw new VeniceException( "Message sent: " + messageSent + " doesn't match message completed: " + messageCompleted.get()); } + if (schemaSource != null) { + schemaSource.close(); + } + if (compressorFactory.isPresent()) { + compressorFactory.get().close(); + } } finally { Utils.closeQuietlyWithErrorLogged(duplicateKeyPrinter); taskProgressHeartbeatScheduler.shutdownNow(); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/ComplexVeniceWriterAdapter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/ComplexVeniceWriterAdapter.java new file mode 100644 index 00000000000..caf24a4b96b --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/ComplexVeniceWriterAdapter.java @@ -0,0 +1,114 @@ +package com.linkedin.venice.hadoop.task.datawriter; + +import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.pubsub.api.PubSubProducerCallback; +import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.writer.AbstractVeniceWriter; +import com.linkedin.venice.writer.ComplexVeniceWriter; +import com.linkedin.venice.writer.DeleteMetadata; +import com.linkedin.venice.writer.PutMetadata; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.avro.generic.GenericRecord; + + +/** + * Adapter class for {@link ComplexVeniceWriter} to support public APIs defined in {@link AbstractVeniceWriter} in the + * context of being called in a {@link CompositeVeniceWriter} from VPJ. This class will + * provide capabilities to deserialize the value in order to provide {@link ComplexVeniceWriter} a value provider, and + * decompression capabilities in case of a re-push (Kafka input). + */ +public class ComplexVeniceWriterAdapter extends AbstractVeniceWriter { + private final ComplexVeniceWriter internalVeniceWriter; + private final BiFunction deserializeFunction; + private final Function decompressFunction; + + public ComplexVeniceWriterAdapter( + String topicName, + ComplexVeniceWriter veniceWriter, + BiFunction deserializeFunction, + Function decompressFunction) { + super(topicName); + this.internalVeniceWriter = veniceWriter; + this.deserializeFunction = deserializeFunction; + this.decompressFunction = decompressFunction; + } + + @Override + public void close(boolean gracefulClose) throws IOException { + // no op, internal writer is initialized outside the adapter and should be closed elsewhere. + } + + @Override + public CompletableFuture put( + K key, + V value, + int valueSchemaId, + PubSubProducerCallback callback) { + return put(key, value, valueSchemaId, callback, null); + } + + /** + * The {@link PubSubProduceResult} will always be null and should not be used. This is acceptable because: + * 1. {@link ComplexVeniceWriter#complexPut(Object, Object, int, Lazy)} returns a CompletableFuture with Void + * since it could potentially write to multiple partitions resulting in multiple PubSubProduceResult. + * 2. Only the PubSubProduceResult of the main writer in {@link CompositeVeniceWriter} is used for reporting + * purpose in VPJ. + */ + @Override + public CompletableFuture put( + K key, + V value, + int valueSchemaId, + PubSubProducerCallback callback, + PutMetadata putMetadata) { + CompletableFuture wraper = new CompletableFuture<>(); + Lazy valueProvider = + Lazy.of(() -> deserializeFunction.apply(decompressFunction.apply(value), valueSchemaId)); + internalVeniceWriter.complexPut(key, value, valueSchemaId, valueProvider) + .whenCompleteAsync((ignored, writeException) -> { + if (writeException == null) { + wraper.complete(null); + } else { + wraper.completeExceptionally(writeException); + } + }); + return wraper; + } + + /** + * In VPJ, only re-push can trigger this function. During re-push the deletion to view topics are useless and should + * be ignored. + */ + @Override + public CompletableFuture delete( + K key, + PubSubProducerCallback callback, + DeleteMetadata deleteMetadata) { + // No-op, delete by key is undefined for complex partitioner. + return CompletableFuture.completedFuture(null); + } + + @Override + public Future update( + K key, + U update, + int valueSchemaId, + int derivedSchemaId, + PubSubProducerCallback callback) { + throw new UnsupportedOperationException(this.getClass().getSimpleName() + " does not support update function"); + } + + @Override + public void flush() { + internalVeniceWriter.flush(); + } + + @Override + public void close() throws IOException { + close(true); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java similarity index 80% rename from internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java rename to clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java index ba859147651..ed772d55e3b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriter.java @@ -1,12 +1,13 @@ -package com.linkedin.venice.writer; - -import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS; -import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER; +package com.linkedin.venice.hadoop.task.datawriter; import com.linkedin.venice.annotation.NotThreadsafe; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; +import com.linkedin.venice.writer.AbstractVeniceWriter; +import com.linkedin.venice.writer.DeleteMetadata; +import com.linkedin.venice.writer.PutMetadata; +import com.linkedin.venice.writer.VeniceWriter; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -23,13 +24,13 @@ @NotThreadsafe public class CompositeVeniceWriter extends AbstractVeniceWriter { private final VeniceWriter mainWriter; - private final VeniceWriter[] childWriters; + private final AbstractVeniceWriter[] childWriters; private final PubSubProducerCallback childCallback; public CompositeVeniceWriter( String topicName, VeniceWriter mainWriter, - VeniceWriter[] childWriters, + AbstractVeniceWriter[] childWriters, PubSubProducerCallback childCallback) { super(topicName); this.mainWriter = mainWriter; @@ -55,21 +56,14 @@ public CompletableFuture put( } @Override - public Future put( + public CompletableFuture put( K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) { return compositeOperation( - (writer, writeCallback) -> writer.put( - key, - value, - valueSchemaId, - writeCallback, - DEFAULT_LEADER_METADATA_WRAPPER, - APP_DEFAULT_LOGICAL_TS, - putMetadata), + (writer, writeCallback) -> writer.put(key, value, valueSchemaId, writeCallback, putMetadata), childCallback, callback); } @@ -86,8 +80,8 @@ public CompletableFuture delete( } /** - * The main use of the {@link CompositeVeniceWriter} for now is to write batch portion of a store version to VT and - * materialized view topic in the NR fabric. Updates should never go through the {@link CompositeVeniceWriter} because + * The main use of the {@link com.linkedin.venice.writer.CompositeVeniceWriter} for now is to write batch portion of a store version to VT and + * materialized view topic in the NR fabric. Updates should never go through the {@link com.linkedin.venice.writer.CompositeVeniceWriter} because * it should be written to RT (hybrid writes or incremental push) and handled by view writers in L/F or A/A SIT. */ @Override @@ -97,12 +91,12 @@ public Future update( int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) { - throw new UnsupportedOperationException(this.getClass().getSimpleName() + "does not support update function"); + throw new UnsupportedOperationException(this.getClass().getSimpleName() + " does not support update function"); } @Override public void flush() { - for (VeniceWriter writer: childWriters) { + for (AbstractVeniceWriter writer: childWriters) { writer.flush(); } mainWriter.flush(); @@ -119,14 +113,14 @@ public void close() throws IOException { * completes the mainWriterOp. */ private CompletableFuture compositeOperation( - BiFunction, PubSubProducerCallback, CompletableFuture> writerOperation, + BiFunction, PubSubProducerCallback, CompletableFuture> writerOperation, PubSubProducerCallback childWriterCallback, PubSubProducerCallback mainWriterCallback) { CompletableFuture finalFuture = new CompletableFuture<>(); CompletableFuture[] writeFutures = new CompletableFuture[childWriters.length + 1]; int index = 0; writeFutures[index++] = writerOperation.apply(mainWriter, mainWriterCallback); - for (VeniceWriter writer: childWriters) { + for (AbstractVeniceWriter writer: childWriters) { writeFutures[index++] = writerOperation.apply(writer, childWriterCallback); } CompletableFuture.allOf(writeFutures).whenCompleteAsync((ignored, writeException) -> { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java index 4595f746f20..0ececd37dc6 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java @@ -44,6 +44,7 @@ import static com.linkedin.venice.vpj.VenicePushJobConstants.STORAGE_QUOTA_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.TELEMETRY_MESSAGE_INTERVAL; import static com.linkedin.venice.vpj.VenicePushJobConstants.TOPIC_PROP; +import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_DIR; import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_ID_PROP; import static com.linkedin.venice.vpj.VenicePushJobConstants.ZSTD_COMPRESSION_LEVEL; import static com.linkedin.venice.vpj.VenicePushJobConstants.ZSTD_DICTIONARY_CREATION_REQUIRED; @@ -129,6 +130,8 @@ private void setupSparkDataWriterJobFlow(PushJobSetting pushJobSetting) { sparkSession.conf().getAll().foreach(entry -> jobProps.setProperty(entry._1, entry._2)); if (pushJobSetting.materializedViewConfigFlatMap != null) { jobProps.put(PUSH_JOB_VIEW_CONFIGS, pushJobSetting.materializedViewConfigFlatMap); + jobProps.put(VALUE_SCHEMA_DIR, pushJobSetting.valueSchemaDir); + jobProps.put(RMD_SCHEMA_DIR, pushJobSetting.rmdSchemaDir); } JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); Broadcast broadcastProperties = sparkContext.broadcast(jobProps); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index 0e33edb6a9a..83f2a81b745 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -61,6 +61,7 @@ import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; import com.linkedin.venice.controllerapi.JobStatusQueryResponse; +import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.controllerapi.RepushInfo; import com.linkedin.venice.controllerapi.RepushInfoResponse; import com.linkedin.venice.controllerapi.SchemaResponse; @@ -993,6 +994,13 @@ public void testConfigureWithMaterializedViewConfigs() throws Exception { storeInfo.setVersions(Collections.singletonList(version)); }, true); doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), eq(null)); + MultiSchemaResponse valueSchemaResponse = getMultiSchemaResponse(); + MultiSchemaResponse.Schema[] schemas = new MultiSchemaResponse.Schema[1]; + schemas[0] = getBasicSchema(); + valueSchemaResponse.setSchemas(schemas); + doReturn(valueSchemaResponse).when(client).getAllValueSchema(TEST_STORE); + doReturn(getMultiSchemaResponse()).when(client).getAllReplicationMetadataSchemas(TEST_STORE); + doReturn(getKeySchemaResponse()).when(client).getKeySchema(TEST_STORE); try (final VenicePushJob vpj = getSpyVenicePushJob(properties, client)) { skipVPJValidation(vpj); vpj.run(); @@ -1007,6 +1015,32 @@ public void testConfigureWithMaterializedViewConfigs() throws Exception { } } + private SchemaResponse getKeySchemaResponse() { + SchemaResponse response = new SchemaResponse(); + response.setId(1); + response.setCluster(TEST_CLUSTER); + response.setName(TEST_STORE); + response.setSchemaStr(KEY_SCHEMA_STR); + return response; + } + + private MultiSchemaResponse.Schema getBasicSchema() { + MultiSchemaResponse.Schema schema = new MultiSchemaResponse.Schema(); + schema.setSchemaStr(VALUE_SCHEMA_STR); + schema.setId(1); + schema.setRmdValueSchemaId(1); + return schema; + } + + private MultiSchemaResponse getMultiSchemaResponse() { + MultiSchemaResponse multiSchemaResponse = new MultiSchemaResponse(); + multiSchemaResponse.setCluster(TEST_CLUSTER); + multiSchemaResponse.setName(TEST_STORE); + multiSchemaResponse.setSuperSetSchemaId(1); + multiSchemaResponse.setSchemas(new MultiSchemaResponse.Schema[0]); + return multiSchemaResponse; + } + private JobStatusQueryResponse mockJobStatusQuery() { JobStatusQueryResponse response = new JobStatusQueryResponse(); response.setStatus(ExecutionStatus.COMPLETED.toString()); diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/ComplexVeniceWriterAdapterTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/ComplexVeniceWriterAdapterTest.java new file mode 100644 index 00000000000..c9d8b164494 --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/ComplexVeniceWriterAdapterTest.java @@ -0,0 +1,66 @@ +package com.linkedin.venice.hadoop.mapreduce.datawriter; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.linkedin.venice.hadoop.task.datawriter.ComplexVeniceWriterAdapter; +import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.writer.ComplexVeniceWriter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.avro.generic.GenericRecord; +import org.mockito.ArgumentCaptor; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class ComplexVeniceWriterAdapterTest { + private static final byte[] IGNORED_BYTES = new byte[0]; + + @Test + public void testUnsupportedOperations() throws ExecutionException, InterruptedException { + ComplexVeniceWriter mockVeniceWriter = mock(ComplexVeniceWriter.class); + GenericRecord mockRecord = mock(GenericRecord.class); + ComplexVeniceWriterAdapter complexVeniceWriterAdapter = new ComplexVeniceWriterAdapter<>( + "ignored", + mockVeniceWriter, + (ignoredId, ignoredBytes) -> mockRecord, + (inputBytes) -> inputBytes); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> complexVeniceWriterAdapter.update(IGNORED_BYTES, IGNORED_BYTES, 1, 1, null)); + complexVeniceWriterAdapter.delete(IGNORED_BYTES, null, null).get(); + verify(mockVeniceWriter, never()).complexDelete(any(), any()); + } + + @Test + public void testPut() throws ExecutionException, InterruptedException { + ComplexVeniceWriter mockVeniceWriter = mock(ComplexVeniceWriter.class); + GenericRecord mockRecord = mock(GenericRecord.class); + doReturn(CompletableFuture.completedFuture(null)).when(mockVeniceWriter).complexPut(any(), any(), anyInt(), any()); + BiFunction deserializeFunction = (inputBytes, schemaId) -> { + if (inputBytes.length >= 2) { + return mockRecord; + } else { + return null; + } + }; + Function decompressFunction = (inputBytes) -> new byte[2]; + ComplexVeniceWriterAdapter complexVeniceWriterAdapter = + new ComplexVeniceWriterAdapter<>("ignored", mockVeniceWriter, deserializeFunction, decompressFunction); + byte[] valueBytes = new byte[1]; + complexVeniceWriterAdapter.put(IGNORED_BYTES, valueBytes, 1, null, null).get(); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Lazy.class); + verify(mockVeniceWriter, times(1)).complexPut(eq(IGNORED_BYTES), eq(valueBytes), eq(1), captor.capture()); + // Verify the deserialize and decompress functions are invoked properly + Assert.assertEquals(captor.getValue().get(), mockRecord); + } +} diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java index d50b237a228..37d530f7454 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/TestVeniceReducer.java @@ -501,7 +501,7 @@ public CompletableFuture put( } @Override - public Future put( + public CompletableFuture put( Object key, Object value, int valueSchemaId, @@ -564,7 +564,7 @@ public void close() { public void testClosingReducerWithWriterException() throws IOException { AbstractVeniceWriter exceptionWriter = new AbstractVeniceWriter(TOPIC_NAME) { @Override - public Future put( + public CompletableFuture put( Object key, Object value, int valueSchemaId, diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriterTest.java similarity index 95% rename from internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java rename to clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriterTest.java index c152ee8c5b1..3bc6e84a5c5 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/CompositeVeniceWriterTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/task/datawriter/CompositeVeniceWriterTest.java @@ -1,4 +1,4 @@ -package com.linkedin.venice.writer; +package com.linkedin.venice.hadoop.task.datawriter; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -10,6 +10,9 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; +import com.linkedin.venice.writer.AbstractVeniceWriter; +import com.linkedin.venice.writer.DeleteMetadata; +import com.linkedin.venice.writer.VeniceWriter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.mockito.InOrder; diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/partitioner/ComplexVenicePartitioner.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/partitioner/ComplexVenicePartitioner.java new file mode 100644 index 00000000000..e07696773ca --- /dev/null +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/partitioner/ComplexVenicePartitioner.java @@ -0,0 +1,33 @@ +package com.linkedin.venice.partitioner; + +import com.linkedin.venice.utils.VeniceProperties; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + + +/** + * Determines partitioning but offers more complex partitioning API that could partition not just based on "immutable" + * fields of the value. In addition, it provides the option to partition a record to multiple partitions. + */ +public abstract class ComplexVenicePartitioner extends VenicePartitioner { + public ComplexVenicePartitioner() { + super(); + } + + public ComplexVenicePartitioner(VeniceProperties props) { + super(props); + } + + public ComplexVenicePartitioner(VeniceProperties props, Schema schema) { + super(props, schema); + } + + /** + * A complex partitioner API that could be used in materialized views to partition based on value. The resulting + * partition could also be an array of partition ids instead of just a single partition (one-to-many). + * @param value that will be mapped to partition(s) + * @param numPartitions of total available partitions + * @return int array containing the partition id(s) + */ + public abstract int[] getPartitionId(byte[] keyBytes, GenericRecord value, int numPartitions); +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java index 3418404064e..6736ccf0aa7 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/AbstractVeniceWriter.java @@ -38,7 +38,7 @@ public abstract CompletableFuture put( int valueSchemaId, PubSubProducerCallback callback); - public abstract Future put( + public abstract CompletableFuture put( K key, V value, int valueSchemaId, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java new file mode 100644 index 00000000000..0133795ae30 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/ComplexVeniceWriter.java @@ -0,0 +1,248 @@ +package com.linkedin.venice.writer; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.partitioner.ComplexVenicePartitioner; +import com.linkedin.venice.pubsub.api.PubSubProduceResult; +import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; +import com.linkedin.venice.pubsub.api.PubSubProducerCallback; +import com.linkedin.venice.storage.protocol.ChunkedValueManifest; +import com.linkedin.venice.utils.RedundantExceptionFilter; +import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import org.apache.avro.generic.GenericRecord; + + +/** + * Provide more complex and sophisticated writer APIs for writing to {@link com.linkedin.venice.views.MaterializedView}. + * Specifically when a {@link ComplexVenicePartitioner} is involved. Otherwise, use the + * {@link VeniceWriter} APIs. + */ +public class ComplexVeniceWriter extends VeniceWriter { + private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = + RedundantExceptionFilter.getRedundantExceptionFilter(); + private static final String SKIP_LARGE_RECORD = "SkipLargeRecord"; + private final ComplexVenicePartitioner complexPartitioner; + private final AtomicLong skippedLargeRecords = new AtomicLong(0); + + public ComplexVeniceWriter( + VeniceWriterOptions params, + VeniceProperties props, + PubSubProducerAdapter producerAdapter) { + super(params, props, producerAdapter); + if (partitioner instanceof ComplexVenicePartitioner) { + complexPartitioner = (ComplexVenicePartitioner) partitioner; + } else { + complexPartitioner = null; + } + } + + /** + * {@link ComplexVenicePartitioner} offers a more sophisticated getPartitionId API. It also takes value as a + * parameter, and could return a single, multiple or no partition(s). + */ + public CompletableFuture complexPut(K key, V value, int valueSchemaId, Lazy valueProvider) { + CompletableFuture finalCompletableFuture = new CompletableFuture<>(); + if (value == null) { + // Ignore null value + throw new VeniceException("Put value should not be null"); + } else { + // Write updated/put record to materialized view topic partition(s) + if (complexPartitioner == null) { + // No VeniceComplexPartitioner involved, perform simple put. + byte[] serializedKey = keySerializer.serialize(topicName, key); + byte[] serializedValue = valueSerializer.serialize(topicName, value); + int partition = getPartition(serializedKey); + propagateVeniceWriterFuture( + put(serializedKey, serializedValue, valueSchemaId, partition), + finalCompletableFuture); + } else { + byte[] serializedKey = keySerializer.serialize(topicName, key); + int[] partitions = complexPartitioner.getPartitionId(serializedKey, valueProvider.get(), numberOfPartitions); + if (partitions.length == 0) { + finalCompletableFuture.complete(null); + } else { + byte[] serializedValue = valueSerializer.serialize(topicName, value); + performMultiPartitionAction( + partitions, + finalCompletableFuture, + (partition) -> this.put(serializedKey, serializedValue, valueSchemaId, partition)); + } + } + } + return finalCompletableFuture; + } + + /** + * Perform "delete" on the given key. If a {@link ComplexVenicePartitioner} is involved then it will be a best effort + * attempt to delete the record using the valueProvider. It's best effort because: + * 1. Nothing we can do if value is null or not provided via valueProvider. + * 2. Previous writes + */ + public CompletableFuture complexDelete(K key, Lazy valueProvider) { + CompletableFuture finalCompletableFuture = new CompletableFuture<>(); + if (complexPartitioner == null) { + // No VeniceComplexPartitioner involved, perform simple delete. + byte[] serializedKey = keySerializer.serialize(topicName, key); + int partition = getPartition(serializedKey); + propagateVeniceWriterFuture(delete(serializedKey, null, partition), finalCompletableFuture); + } else { + GenericRecord value = valueProvider.get(); + if (value == null) { + // Ignore the delete since we cannot perform delete with VeniceComplexPartitioner without the value + finalCompletableFuture.complete(null); + } else { + byte[] serializedKey = keySerializer.serialize(topicName, key); + int[] partitions = complexPartitioner.getPartitionId(serializedKey, value, numberOfPartitions); + if (partitions.length == 0) { + finalCompletableFuture.complete(null); + } else { + performMultiPartitionAction( + partitions, + finalCompletableFuture, + (partition) -> this.delete(serializedKey, null, partition)); + } + } + } + return finalCompletableFuture; + } + + /** + * Prevent the {@link ComplexVeniceWriter} from writing any actual chunks for large values. This is because we are + * only using ComplexVeniceWriter for writing to materialized view. The consumers of materialized view do not fully + * support assembling the chunks correctly yet. The behavior is the same for both large values from VPJ and leader + * replicas. + */ + @Override + protected CompletableFuture putLargeValue( + byte[] serializedKey, + byte[] serializedValue, + int valueSchemaId, + PubSubProducerCallback callback, + int partition, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs, + PutMetadata putMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { + skippedLargeRecords.incrementAndGet(); + if (!REDUNDANT_LOGGING_FILTER.isRedundantException(topicName, SKIP_LARGE_RECORD)) { + logger.warn("Skipped writing {} large record(s) to topic: {}", skippedLargeRecords.getAndSet(0), topicName); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture put( + K key, + V value, + int valueSchemaId, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs, + PutMetadata putMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { + throw new UnsupportedOperationException("ComplexVeniceWriter should use complexPut instead of put"); + } + + @Override + public CompletableFuture delete( + K key, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs, + DeleteMetadata deleteMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { + throw new UnsupportedOperationException("ComplexVeniceWriter should use complexDelete instead of delete"); + } + + @Override + public Future update( + K key, + U update, + int valueSchemaId, + int derivedSchemaId, + PubSubProducerCallback callback, + long logicalTs) { + throw new UnsupportedOperationException("ComplexVeniceWriter does not support update"); + } + + /** + * Execute a "delete" on the key for a predetermined partition. + */ + private CompletableFuture delete( + byte[] serializedKey, + PubSubProducerCallback callback, + int partition) { + return delete( + serializedKey, + callback, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS, + null, + null, + null, + partition); + } + + /** + * Write records with new DIV to a predetermined partition. + */ + private CompletableFuture put( + byte[] serializedKey, + byte[] serializedValue, + int valueSchemaId, + int partition) { + return put( + serializedKey, + serializedValue, + partition, + valueSchemaId, + null, + DEFAULT_LEADER_METADATA_WRAPPER, + APP_DEFAULT_LOGICAL_TS, + null, + null, + null); + } + + /** + * Helper function to perform multi-partition action and configure the finalCompletableFuture to complete when the + * action is completed on all partitions. Caller is expected to check for empty partitions case to minimize work + * needed to provide the action function. + */ + private void performMultiPartitionAction( + int[] partitions, + CompletableFuture finalCompletableFuture, + Function> action) { + CompletableFuture[] partitionFutures = new CompletableFuture[partitions.length]; + int index = 0; + for (int p: partitions) { + partitionFutures[index++] = action.apply(p); + } + CompletableFuture.allOf(partitionFutures).whenCompleteAsync((ignored, writeException) -> { + if (writeException == null) { + finalCompletableFuture.complete(null); + } else { + finalCompletableFuture.completeExceptionally(writeException); + } + }); + } + + private void propagateVeniceWriterFuture( + CompletableFuture veniceWriterFuture, + CompletableFuture finalCompletableFuture) { + veniceWriterFuture.whenCompleteAsync((ignored, writeException) -> { + if (writeException == null) { + finalCompletableFuture.complete(null); + } else { + finalCompletableFuture.completeExceptionally(writeException); + } + }); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index 8032ebcf0ce..d67930b01ec 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -96,7 +96,7 @@ public class VeniceWriter extends AbstractVeniceWriter { private final ThreadPoolExecutor threadPoolExecutor; // log4j logger - private final Logger logger; + protected final Logger logger; // Config names public static final String VENICE_WRITER_CONFIG_PREFIX = "venice.writer."; @@ -151,7 +151,7 @@ public class VeniceWriter extends AbstractVeniceWriter { /** * The default for {@link #maxRecordSizeBytes} is unlimited / unset (-1) just to be safe. A more specific default value - * should be set using {@link com.linkedin.venice.ConfigKeys#CONTROLLER_DEFAULT_MAX_RECORD_SIZE_BYTES} the controller + * should be set using {@link com.linkedin.venice.ConfigKeys#DEFAULT_MAX_RECORD_SIZE_BYTES} the controller * config on the cluster level. */ public static final int UNLIMITED_MAX_RECORD_SIZE = -1; @@ -229,14 +229,14 @@ public class VeniceWriter extends AbstractVeniceWriter { // Immutable state private final PubSubMessageHeaders protocolSchemaHeaders; - private final VeniceKafkaSerializer keySerializer; - private final VeniceKafkaSerializer valueSerializer; + protected final VeniceKafkaSerializer keySerializer; + protected final VeniceKafkaSerializer valueSerializer; private final VeniceKafkaSerializer writeComputeSerializer; private final PubSubProducerAdapter producerAdapter; private final GUID producerGUID; private final Time time; - private final VenicePartitioner partitioner; - private final int numberOfPartitions; + protected final VenicePartitioner partitioner; + protected final int numberOfPartitions; private final int closeTimeOutInMs; private final CheckSumType checkSumType; private final int maxSizeForUserPayloadPerMessageInBytes; @@ -716,6 +716,26 @@ public CompletableFuture delete( ChunkedValueManifest oldRmdManifest) { byte[] serializedKey = keySerializer.serialize(topicName, key); int partition = getPartition(serializedKey); + return delete( + serializedKey, + callback, + leaderMetadataWrapper, + logicalTs, + deleteMetadata, + oldValueManifest, + oldRmdManifest, + partition); + } + + protected CompletableFuture delete( + byte[] serializedKey, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs, + DeleteMetadata deleteMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest, + int partition) { isChunkingFlagInvoked = true; @@ -795,7 +815,7 @@ public CompletableFuture put( } @Override - public Future put( + public CompletableFuture put( K key, V value, int valueSchemaId, @@ -914,6 +934,62 @@ public CompletableFuture put( byte[] serializedKey = keySerializer.serialize(topicName, key); byte[] serializedValue = valueSerializer.serialize(topicName, value); int partition = getPartition(serializedKey); + return put( + serializedKey, + serializedValue, + partition, + valueSchemaId, + callback, + leaderMetadataWrapper, + logicalTs, + putMetadata, + oldValueManifest, + oldRmdManifest); + } + + /** + * Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to + * speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer + * metadata). It's the "pass-through" mode. + * + * TODO: move pass-through supports into a server-specific extension of VeniceWriter + */ + @Deprecated + public Future put( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, + PubSubProducerCallback callback, + int upstreamPartition, + LeaderMetadataWrapper leaderMetadataWrapper) { + // Self-adjust the chunking setting in pass-through mode + verifyChunkingSetting(kafkaMessageEnvelope); + + byte[] serializedKey = kafkaKey.getKey(); + + KafkaMessageEnvelopeProvider kafkaMessageEnvelopeProvider = + getKafkaMessageEnvelopeProvider(kafkaMessageEnvelope, leaderMetadataWrapper); + + if (callback instanceof ChunkAwareCallback) { + ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null, null, null); + } + + return sendMessage(producerMetadata -> kafkaKey, kafkaMessageEnvelopeProvider, upstreamPartition, callback, false); + } + + /** + * Write a record with new DIV to a predetermined partition. + */ + protected CompletableFuture put( + byte[] serializedKey, + byte[] serializedValue, + int partition, + int valueSchemaId, + PubSubProducerCallback callback, + LeaderMetadataWrapper leaderMetadataWrapper, + long logicalTs, + PutMetadata putMetadata, + ChunkedValueManifest oldValueManifest, + ChunkedValueManifest oldRmdManifest) { int replicationMetadataPayloadSize = putMetadata == null ? 0 : putMetadata.getSerializedSize(); isChunkingFlagInvoked = true; @@ -988,35 +1064,6 @@ public CompletableFuture put( return produceResultFuture; } - /** - * Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to - * speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer - * metadata). It's the "pass-through" mode. - * - * TODO: move pass-through supports into a server-specific extension of VeniceWriter - */ - @Deprecated - public Future put( - KafkaKey kafkaKey, - KafkaMessageEnvelope kafkaMessageEnvelope, - PubSubProducerCallback callback, - int upstreamPartition, - LeaderMetadataWrapper leaderMetadataWrapper) { - // Self-adjust the chunking setting in pass-through mode - verifyChunkingSetting(kafkaMessageEnvelope); - - byte[] serializedKey = kafkaKey.getKey(); - - KafkaMessageEnvelopeProvider kafkaMessageEnvelopeProvider = - getKafkaMessageEnvelopeProvider(kafkaMessageEnvelope, leaderMetadataWrapper); - - if (callback instanceof ChunkAwareCallback) { - ((ChunkAwareCallback) callback).setChunkingInfo(serializedKey, null, null, null, null, null, null); - } - - return sendMessage(producerMetadata -> kafkaKey, kafkaMessageEnvelopeProvider, upstreamPartition, callback, false); - } - private KafkaMessageEnvelopeProvider getKafkaMessageEnvelopeProvider( KafkaMessageEnvelope kafkaMessageEnvelope, LeaderMetadataWrapper leaderMetadataWrapper) { @@ -1484,7 +1531,7 @@ private interface KafkaMessageEnvelopeProvider { /** * This function implements chunking of a large value into many small values. */ - private CompletableFuture putLargeValue( + protected CompletableFuture putLargeValue( byte[] serializedKey, byte[] serializedValue, int valueSchemaId, @@ -2002,7 +2049,7 @@ protected KafkaMessageEnvelope getKafkaMessageEnvelope( * @param key the {@link KafkaKey} for which we want to get the partition. * @return the partition number that the provided key belongs to. */ - private int getPartition(byte[] key) { + protected int getPartition(byte[] key) { return partitioner.getPartitionId(key, numberOfPartitions); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriterFactory.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriterFactory.java index 114184a3c22..38d166ba81f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriterFactory.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriterFactory.java @@ -61,35 +61,47 @@ public VeniceWriterFactory( } public VeniceWriter createVeniceWriter(VeniceWriterOptions options) { - VeniceProperties props = options.isProducerCompressionEnabled() - ? venicePropertiesLazy.get() - : venicePropertiesWithCompressionDisabledLazy.get(); + VeniceProperties props = getVeniceProperties(options); + Supplier producerAdapterSupplier = + () -> producerAdapterFactory.create(props, options.getTopicName(), options.getBrokerAddress()); + PubSubProducerAdapter producerAdapter = buildPubSubProducerAdapter(options, producerAdapterSupplier); + return new VeniceWriter<>(options, props, producerAdapter); + } + public ComplexVeniceWriter createComplexVeniceWriter(VeniceWriterOptions options) { + VeniceProperties props = getVeniceProperties(options); Supplier producerAdapterSupplier = () -> producerAdapterFactory.create(props, options.getTopicName(), options.getBrokerAddress()); + PubSubProducerAdapter producerAdapter = buildPubSubProducerAdapter(options, producerAdapterSupplier); + return new ComplexVeniceWriter<>(options, props, producerAdapter); + } + private VeniceProperties getVeniceProperties(VeniceWriterOptions options) { + return options.isProducerCompressionEnabled() + ? venicePropertiesLazy.get() + : venicePropertiesWithCompressionDisabledLazy.get(); + } + + private PubSubProducerAdapter buildPubSubProducerAdapter( + VeniceWriterOptions options, + Supplier producerAdapterSupplier) { int producerThreadCnt = options.getProducerThreadCount(); if (producerThreadCnt > 1) { - return new VeniceWriter<>( - options, - props, - new PubSubProducerAdapterConcurrentDelegator( - options.getTopicName(), - producerThreadCnt, - options.getProducerQueueSize(), - producerAdapterSupplier)); + return new PubSubProducerAdapterConcurrentDelegator( + options.getTopicName(), + producerThreadCnt, + options.getProducerQueueSize(), + producerAdapterSupplier); } - int producerCnt = options.getProducerCount(); if (producerCnt > 1) { List producers = new ArrayList<>(producerCnt); for (int i = 0; i < producerCnt; ++i) { producers.add(producerAdapterSupplier.get()); } - return new VeniceWriter<>(options, props, new PubSubProducerAdapterDelegator(producers)); + return new PubSubProducerAdapterDelegator(producers); } - - return new VeniceWriter<>(options, props, producerAdapterSupplier.get()); + return producerAdapterSupplier.get(); } // visible for testing diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/ComplexVeniceWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/ComplexVeniceWriterTest.java new file mode 100644 index 00000000000..a7e46a3ccfe --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/ComplexVeniceWriterTest.java @@ -0,0 +1,179 @@ +package com.linkedin.venice.writer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.partitioner.ComplexVenicePartitioner; +import com.linkedin.venice.partitioner.DefaultVenicePartitioner; +import com.linkedin.venice.partitioner.VenicePartitioner; +import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; +import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; +import java.nio.ByteBuffer; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.avro.generic.GenericRecord; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class ComplexVeniceWriterTest { + private static final String PARTITION_FIELD_NAME = "partition"; + private static final byte[] IGNORED_BYTES = new byte[0]; + + private static class ValuePartitionerForUT extends ComplexVenicePartitioner { + public ValuePartitionerForUT() { + super(); + } + + /** + * Return partition array based on provided PARTITION_FIELD_NAME value: + * - null : [] + * - A : [1] + * - B : [1, 2] + * - anything else : [] + */ + @Override + public int[] getPartitionId(byte[] keyBytes, GenericRecord value, int numPartitions) { + Object partitionField = value.get(PARTITION_FIELD_NAME); + if (partitionField == null) { + return new int[0]; + } + String fieldValueString = (String) partitionField; + int[] partitions; + if ("A".equals(fieldValueString)) { + partitions = new int[1]; + partitions[0] = 1; + } else if ("B".equals(fieldValueString)) { + partitions = new int[2]; + partitions[0] = 1; + partitions[1] = 2; + } else { + partitions = new int[0]; + } + return partitions; + } + + @Override + public int getPartitionId(byte[] keyBytes, int numPartitions) { + return 0; + } + + @Override + public int getPartitionId(ByteBuffer keyByteBuffer, int numPartitions) { + return 0; + } + } + + @Test + public void testUnsupportedPublicAPIs() { + PubSubProducerAdapter mockProducerAdapter = mock(PubSubProducerAdapter.class); + VeniceProperties veniceProperties = new VeniceProperties(new Properties()); + ComplexVeniceWriter complexVeniceWriter = new ComplexVeniceWriter<>( + getVeniceWriterOptions(new DefaultVenicePartitioner(), 1), + veniceProperties, + mockProducerAdapter); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> complexVeniceWriter.put(IGNORED_BYTES, IGNORED_BYTES, 1, null)); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> complexVeniceWriter.update(IGNORED_BYTES, IGNORED_BYTES, 1, 1, null)); + Assert.assertThrows(UnsupportedOperationException.class, () -> complexVeniceWriter.delete(IGNORED_BYTES, null)); + } + + @Test + public void testComplexPut() throws ExecutionException, InterruptedException { + PubSubProducerAdapter mockProducerAdapter = mock(PubSubProducerAdapter.class); + doReturn(CompletableFuture.completedFuture(null)).when(mockProducerAdapter) + .sendMessage(anyString(), anyInt(), any(), any(), any(), any()); + VeniceProperties veniceProperties = new VeniceProperties(new Properties()); + // Simple partitioner + ComplexVeniceWriter simplePartitionerWriter = new ComplexVeniceWriter<>( + getVeniceWriterOptions(new DefaultVenicePartitioner(), 1), + veniceProperties, + mockProducerAdapter); + Assert.assertThrows( + VeniceException.class, + () -> simplePartitionerWriter.complexPut(IGNORED_BYTES, null, 1, Lazy.of(() -> null))); + simplePartitionerWriter.complexPut(IGNORED_BYTES, IGNORED_BYTES, 1, Lazy.of(() -> null)).get(); + verify(mockProducerAdapter, atLeastOnce()).sendMessage(anyString(), eq(0), any(), any(), any(), any()); + // Complex partitioner + PubSubProducerAdapter mockProducerAdapterForComplexWrites = mock(PubSubProducerAdapter.class); + doReturn(CompletableFuture.completedFuture(null)).when(mockProducerAdapterForComplexWrites) + .sendMessage(anyString(), anyInt(), any(), any(), any(), any()); + ComplexVeniceWriter complexPartitionerWriter = new ComplexVeniceWriter<>( + getVeniceWriterOptions(new ValuePartitionerForUT(), 3), + veniceProperties, + mockProducerAdapterForComplexWrites); + GenericRecord mockRecord = mock(GenericRecord.class); + complexPartitionerWriter.complexPut(IGNORED_BYTES, IGNORED_BYTES, 1, Lazy.of(() -> mockRecord)).get(); + doReturn("Foo").when(mockRecord).get(PARTITION_FIELD_NAME); + complexPartitionerWriter.complexPut(IGNORED_BYTES, IGNORED_BYTES, 1, Lazy.of(() -> mockRecord)).get(); + verify(mockProducerAdapterForComplexWrites, never()).sendMessage(anyString(), anyInt(), any(), any(), any(), any()); + doReturn("A").when(mockRecord).get(PARTITION_FIELD_NAME); + complexPartitionerWriter.complexPut(IGNORED_BYTES, IGNORED_BYTES, 1, Lazy.of(() -> mockRecord)).get(); + verify(mockProducerAdapterForComplexWrites, atLeastOnce()) + .sendMessage(anyString(), eq(1), any(), any(), any(), any()); + verify(mockProducerAdapterForComplexWrites, never()).sendMessage(anyString(), eq(0), any(), any(), any(), any()); + verify(mockProducerAdapterForComplexWrites, never()).sendMessage(anyString(), eq(2), any(), any(), any(), any()); + Mockito.clearInvocations(mockProducerAdapterForComplexWrites); + doReturn("B").when(mockRecord).get(PARTITION_FIELD_NAME); + complexPartitionerWriter.complexPut(IGNORED_BYTES, IGNORED_BYTES, 1, Lazy.of(() -> mockRecord)).get(); + verify(mockProducerAdapterForComplexWrites, atLeastOnce()) + .sendMessage(anyString(), eq(1), any(), any(), any(), any()); + verify(mockProducerAdapterForComplexWrites, atLeastOnce()) + .sendMessage(anyString(), eq(2), any(), any(), any(), any()); + verify(mockProducerAdapterForComplexWrites, never()).sendMessage(anyString(), eq(0), any(), any(), any(), any()); + } + + @Test + public void testComplexDelete() throws ExecutionException, InterruptedException { + PubSubProducerAdapter mockProducerAdapter = mock(PubSubProducerAdapter.class); + doReturn(CompletableFuture.completedFuture(null)).when(mockProducerAdapter) + .sendMessage(anyString(), anyInt(), any(), any(), any(), any()); + VeniceProperties veniceProperties = new VeniceProperties(new Properties()); + // Simple partitioner + ComplexVeniceWriter simplePartitionerWriter = new ComplexVeniceWriter<>( + getVeniceWriterOptions(new DefaultVenicePartitioner(), 1), + veniceProperties, + mockProducerAdapter); + simplePartitionerWriter.complexDelete(IGNORED_BYTES, Lazy.of(() -> null)).get(); + verify(mockProducerAdapter, atLeastOnce()).sendMessage(anyString(), eq(0), any(), any(), any(), any()); + // Complex partitioner + PubSubProducerAdapter mockProducerAdapterForComplexWrites = mock(PubSubProducerAdapter.class); + doReturn(CompletableFuture.completedFuture(null)).when(mockProducerAdapterForComplexWrites) + .sendMessage(anyString(), anyInt(), any(), any(), any(), any()); + ComplexVeniceWriter complexPartitionerWriter = new ComplexVeniceWriter<>( + getVeniceWriterOptions(new ValuePartitionerForUT(), 3), + veniceProperties, + mockProducerAdapterForComplexWrites); + // Null value should be ignored + complexPartitionerWriter.complexDelete(IGNORED_BYTES, Lazy.of(() -> null)).get(); + verify(mockProducerAdapterForComplexWrites, never()).sendMessage(anyString(), anyInt(), any(), any(), any(), any()); + GenericRecord mockRecord = mock(GenericRecord.class); + doReturn("B").when(mockRecord).get(PARTITION_FIELD_NAME); + complexPartitionerWriter.complexDelete(IGNORED_BYTES, Lazy.of(() -> mockRecord)).get(); + verify(mockProducerAdapterForComplexWrites, atLeastOnce()) + .sendMessage(anyString(), eq(1), any(), any(), any(), any()); + verify(mockProducerAdapterForComplexWrites, atLeastOnce()) + .sendMessage(anyString(), eq(2), any(), any(), any(), any()); + verify(mockProducerAdapterForComplexWrites, never()).sendMessage(anyString(), eq(0), any(), any(), any(), any()); + } + + private VeniceWriterOptions getVeniceWriterOptions(VenicePartitioner partitioner, int partitionCount) { + VeniceWriterOptions.Builder configBuilder = new VeniceWriterOptions.Builder("ignored-topic"); + configBuilder.setPartitionCount(partitionCount).setPartitioner(partitioner); + return configBuilder.build(); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java index 0d379ae64bb..cddb6f84203 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestMaterializedViewEndToEnd.java @@ -24,11 +24,13 @@ import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory; import com.linkedin.venice.ConfigKeys; import com.linkedin.venice.D2.D2ClientUtils; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.integration.utils.D2TestUtils; import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceControllerWrapper; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; @@ -39,6 +41,7 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.TestUtils; @@ -46,8 +49,11 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.view.TestValueBasedVenicePartitioner; import com.linkedin.venice.views.MaterializedView; import com.linkedin.venice.views.VeniceView; +import com.linkedin.venice.writer.update.UpdateBuilder; +import com.linkedin.venice.writer.update.UpdateBuilderImpl; import io.tehuti.Metric; import io.tehuti.metrics.MetricsRepository; import it.unimi.dsi.fastutil.ints.Int2LongMap; @@ -61,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.avro.Schema; +import org.apache.samza.system.SystemProducer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -248,12 +255,15 @@ public void testBatchOnlyMaterializedViewDVCConsumer() throws IOException, Execu Properties newPushProps = TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), newPushInputDirPath, storeName); TestWriteUtils.runPushJob("Run another push job", newPushProps); - Assert.assertEquals( - getMetric( - dvcMetricsRepo, - "current_version_number.Gauge", - VeniceView.getViewStoreName(storeName, testViewName)), - (double) 2); + TestUtils.waitForNonDeterministicAssertion( + 10, + TimeUnit.SECONDS, + () -> Assert.assertEquals( + getMetric( + dvcMetricsRepo, + "current_version_number.Gauge", + VeniceView.getViewStoreName(storeName, testViewName)), + (double) 2)); // The materialized view DVC client should be able to read all the keys from the new push for (int i = 1; i <= 200; i++) { Assert.assertEquals(viewClient.get(Integer.toString(i)).get().toString(), DEFAULT_USER_DATA_VALUE_PREFIX + i); @@ -287,6 +297,99 @@ public void testBatchOnlyMaterializedViewDVCConsumer() throws IOException, Execu } } + /** + * Verification of the produced records is difficult because we don't really support complex partitioner in the + * read path. Once CC with views is supported we should use CC to verify. Perform re-push to ensure we can deserialize + * value properly during re-push. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testMaterializedViewWithComplexPartitioner() throws IOException { + File inputDir = getTempDataDirectory(); + Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToNameRecordV2Schema(inputDir); + String inputDirPath = "file:" + inputDir.getAbsolutePath(); + String storeName = Utils.getUniqueString("complexPartitionStore"); + Properties props = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString(); + String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString(); + // Use an A/A W/C enabled store to verify correct partitioning after partial update is applied. + UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(false) + .setChunkingEnabled(true) + .setCompressionStrategy(CompressionStrategy.GZIP) + .setRmdChunkingEnabled(true) + .setNativeReplicationEnabled(true) + .setNativeReplicationSourceFabric(childDatacenters.get(0).getRegionName()) + .setPartitionCount(3) + .setActiveActiveReplicationEnabled(true) + .setWriteComputationEnabled(true) + .setHybridRewindSeconds(10L) + .setHybridOffsetLagThreshold(2L); + String testViewName = "complexPartitionerView"; + try (ControllerClient controllerClient = + IntegrationTestPushUtils.createStoreForJob(clusterName, keySchemaStr, valueSchemaStr, props, storeParms)) { + MaterializedViewParameters.Builder viewParamBuilder = + new MaterializedViewParameters.Builder(testViewName).setPartitionCount(2) + .setPartitioner(TestValueBasedVenicePartitioner.class.getCanonicalName()); + UpdateStoreQueryParams updateViewParam = new UpdateStoreQueryParams().setViewName(testViewName) + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams(viewParamBuilder.build()); + controllerClient + .retryableRequest(5, controllerClient1 -> controllerClient.updateStore(storeName, updateViewParam)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, false, () -> { + Map viewConfigMap = controllerClient.getStore(storeName).getStore().getViewConfigs(); + Assert.assertEquals(viewConfigMap.size(), 1); + Assert.assertEquals( + viewConfigMap.get(testViewName).getViewClassName(), + MaterializedView.class.getCanonicalName()); + }); + } + TestWriteUtils.runPushJob("Run push job", props); + String viewTopicName = + Version.composeKafkaTopic(storeName, 1) + VIEW_NAME_SEPARATOR + testViewName + MATERIALIZED_VIEW_TOPIC_SUFFIX; + // View topic partitions should be mostly empty based on the TestValueBasedPartitioner logic. + int expectedMaxEndOffset = 6; // This may change when we introduce more CMs e.g. heartbeats + for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); + PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); + Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); + for (long endOffset: viewTopicOffsetMap.values()) { + Assert.assertTrue(endOffset <= expectedMaxEndOffset); + } + } + // Perform some partial updates in the non-NR source fabric + VeniceClusterWrapper veniceCluster = childDatacenters.get(1).getClusters().get(clusterName); + SystemProducer producer = + IntegrationTestPushUtils.getSamzaProducer(veniceCluster, storeName, Version.PushType.STREAM); + Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance() + .convertFromValueRecordSchema(recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema()); + long newTimestamp = 100000L; + for (int i = 1; i < 20; i++) { + String key = Integer.toString(i); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); + updateBuilder.setNewFieldValue("age", i); + IntegrationTestPushUtils.sendStreamingRecord(producer, storeName, key, updateBuilder.build(), newTimestamp); + } + // age 1-9 will be written to all partitions so +9 in p0 and p1 + // age 10-19 will be written to % numPartitions which will alternate so +5 in p0 and p1 + int newMinEndOffset = expectedMaxEndOffset + 9 + 5; + for (VeniceMultiClusterWrapper veniceClusterWrapper: childDatacenters) { + VeniceHelixAdmin admin = veniceClusterWrapper.getRandomController().getVeniceHelixAdmin(); + PubSubTopic viewPubSubTopic = admin.getPubSubTopicRepository().getTopic(viewTopicName); + TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, () -> { + Int2LongMap viewTopicOffsetMap = admin.getTopicManager().getTopicLatestOffsets(viewPubSubTopic); + for (long endOffset: viewTopicOffsetMap.values()) { + Assert.assertTrue(endOffset >= newMinEndOffset); + } + }); + } + // A re-push should succeed + Properties rePushProps = + TestWriteUtils.defaultVPJProps(parentControllers.get(0).getControllerUrl(), inputDirPath, storeName); + rePushProps.setProperty(SOURCE_KAFKA, "true"); + rePushProps.setProperty(KAFKA_INPUT_BROKER_URL, childDatacenters.get(0).getPubSubBrokerWrapper().getAddress()); + TestWriteUtils.runPushJob("Run push job", rePushProps); + } + private double getMetric(MetricsRepository metricsRepository, String metricName, String storeName) { Metric metric = metricsRepository.getMetric("." + storeName + "--" + metricName); assertNotNull(metric, "Expected metric " + metricName + " not found."); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestValueBasedVenicePartitioner.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestValueBasedVenicePartitioner.java new file mode 100644 index 00000000000..1ae1b67b567 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestValueBasedVenicePartitioner.java @@ -0,0 +1,50 @@ +package com.linkedin.venice.view; + +import com.linkedin.venice.partitioner.ComplexVenicePartitioner; +import com.linkedin.venice.utils.VeniceProperties; +import java.nio.ByteBuffer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + + +public class TestValueBasedVenicePartitioner extends ComplexVenicePartitioner { + public TestValueBasedVenicePartitioner() { + super(); + } + + public TestValueBasedVenicePartitioner(VeniceProperties props) { + super(props); + } + + public TestValueBasedVenicePartitioner(VeniceProperties props, Schema schema) { + super(props, schema); + } + + @Override + public int[] getPartitionId(byte[] keyBytes, GenericRecord value, int numPartitions) { + int age = (Integer) value.get("age"); + if (age < 0) { + return new int[0]; + } else if (age < 10) { + int[] partitions = new int[numPartitions]; + for (int i = 0; i < numPartitions; i++) { + partitions[i] = i; + } + return partitions; + } else { + int[] partition = new int[1]; + partition[0] = age % numPartitions; + return partition; + } + } + + @Override + public int getPartitionId(byte[] keyBytes, int numPartitions) { + return 0; + } + + @Override + public int getPartitionId(ByteBuffer keyByteBuffer, int numPartitions) { + return 0; + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java index 5d9f5dbcb8b..68be236ad16 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/view/TestViewWriter.java @@ -9,8 +9,8 @@ import com.linkedin.venice.kafka.protocol.VersionSwap; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.lazy.Lazy; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -32,24 +32,26 @@ public TestViewWriter( } @Override - public CompletableFuture processRecord( + public CompletableFuture processRecord( ByteBuffer newValue, ByteBuffer oldValue, byte[] key, int newValueSchemaId, int oldValueSchemaId, - GenericRecord replicationMetadataRecord) { + GenericRecord replicationMetadataRecord, + Lazy valueProvider) { internalView.incrementRecordCount(storeName); return CompletableFuture.completedFuture(null); } @Override - public CompletableFuture processRecord( + public CompletableFuture processRecord( ByteBuffer newValue, byte[] key, int newValueSchemaId, - boolean isChunkedKey) { + boolean isChunkedKey, + Lazy newValueProvider) { internalView.incrementRecordCount(storeName); return CompletableFuture.completedFuture(null); } diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java index 109234cebc6..e554c2d3601 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestWriteUtils.java @@ -450,6 +450,22 @@ public static Schema writeSimpleAvroFile( }); } + public static Schema writeSimpleAvroFileWithStringToNameRecordV2Schema(File parentDir) throws IOException { + String firstName = "first_name_"; + String lastName = "last_name_"; + + return writeSimpleAvroFile(parentDir, STRING_TO_NAME_RECORD_V2_SCHEMA, i -> { + GenericRecord keyValueRecord = new GenericData.Record(STRING_TO_NAME_RECORD_V2_SCHEMA); + keyValueRecord.put(DEFAULT_KEY_FIELD_PROP, String.valueOf(i)); // Key + GenericRecord valueRecord = new GenericData.Record(NAME_RECORD_V2_SCHEMA); + valueRecord.put("firstName", firstName + i); + valueRecord.put("lastName", lastName + i); + valueRecord.put("age", -1); + keyValueRecord.put(DEFAULT_VALUE_FIELD_PROP, valueRecord); // Value + return keyValueRecord; + }); + } + public static Schema writeSimpleAvroFile( File parentDir, Schema schema,