Skip to content

Commit

Permalink
[server][controller][vpj][dvc] Cleanup and refactor producer adapter …
Browse files Browse the repository at this point in the history
…configurations (#1538)

- Removed redundant VeniceWriterFactory for meta store writes in KafkaStoreIngestionTask  
- Moved serialization out of Kafka producer to Venice layer, added API for raw byte messages  
- Deprecated old create API in PubSubProducerAdapterFactory, added a new single-argument API  
- Removed unused serialization configs from Admin tool message dumper  
- Added PubSubProduceResult::getPubSubPosition to return PubSubPosition  
- Deleted SharedKafkaProducerConfig and other dead code from MetaStoreWriter  
- Introduced PubSubMessageSerializer to pass down serializers for KafkaKey and KME  
- Added PubSubProducerAdapterContext to centralize common configs for producer adapters  
- Refactored producer compression config handling in VeniceWriterFactory  
- Moved broker address resolution up in VeniceWriterFactory
  • Loading branch information
sushantmane authored Feb 28, 2025
1 parent b36fd44 commit 8b6951b
Show file tree
Hide file tree
Showing 65 changed files with 1,046 additions and 515 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ private void asyncStart() {
VeniceProperties veniceProperties = veniceServerConfig.getClusterProperties();
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
veniceServerConfig.getPubSubClientsFactory().getProducerAdapterFactory();
/**
* TODO: Remove this VW factory creation from here and replace it with the one created in {@link KafkaStoreIngestionService}
*/
VeniceWriterFactory writerFactory =
new VeniceWriterFactory(veniceProperties.toProperties(), pubSubProducerAdapterFactory, null);
SchemaEntry valueSchemaEntry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,6 @@ public KafkaStoreIngestionService(
producerAdapterFactory = pubSubClientsFactory.getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory =
new VeniceWriterFactory(veniceWriterProperties, producerAdapterFactory, metricsRepository);
VeniceWriterFactory veniceWriterFactoryForMetaStoreWriter =
new VeniceWriterFactory(veniceWriterProperties, producerAdapterFactory, null);
this.adaptiveThrottlerSignalService = adaptiveThrottlerSignalService;
this.ingestionThrottler = new IngestionThrottler(
isDaVinciClient,
Expand Down Expand Up @@ -301,7 +299,7 @@ public KafkaStoreIngestionService(
if (zkSharedSchemaRepository.isPresent()) {
this.metaStoreWriter = new MetaStoreWriter(
topicManagerRepository.getLocalTopicManager(),
veniceWriterFactoryForMetaStoreWriter,
veniceWriterFactory,
zkSharedSchemaRepository.get(),
pubSubTopicRepository,
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,9 @@ public void methodSetUp() throws Exception {

private VeniceWriter getVeniceWriter(String topic, PubSubProducerAdapter producerAdapter) {
VeniceWriterOptions veniceWriterOptions =
new VeniceWriterOptions.Builder(topic).setKeySerializer(new DefaultSerializer())
.setValueSerializer(new DefaultSerializer())
.setWriteComputeSerializer(new DefaultSerializer())
new VeniceWriterOptions.Builder(topic).setKeyPayloadSerializer(new DefaultSerializer())
.setValuePayloadSerializer(new DefaultSerializer())
.setWriteComputePayloadSerializer(new DefaultSerializer())
.setPartitioner(getVenicePartitioner())
.setTime(SystemTime.INSTANCE)
.build();
Expand All @@ -603,9 +603,9 @@ private VenicePartitioner getVenicePartitioner() {

private VeniceWriter getVeniceWriter(PubSubProducerAdapter producerAdapter) {
VeniceWriterOptions veniceWriterOptions =
new VeniceWriterOptions.Builder(topic).setKeySerializer(new DefaultSerializer())
.setValueSerializer(new DefaultSerializer())
.setWriteComputeSerializer(new DefaultSerializer())
new VeniceWriterOptions.Builder(topic).setKeyPayloadSerializer(new DefaultSerializer())
.setValuePayloadSerializer(new DefaultSerializer())
.setWriteComputePayloadSerializer(new DefaultSerializer())
.setPartitioner(new SimplePartitioner())
.setTime(SystemTime.INSTANCE)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;


/**
Expand Down Expand Up @@ -133,9 +131,6 @@ public static Properties getPubSubConsumerProperties(String kafkaUrl, Properties
pubSubConsumerProperties.setProperty(KAFKA_BOOTSTRAP_SERVERS, kafkaUrl);
pubSubConsumerProperties.setProperty(KAFKA_AUTO_OFFSET_RESET_CONFIG, "earliest");
pubSubConsumerProperties.setProperty(KAFKA_ENABLE_AUTO_COMMIT_CONFIG, "false");
pubSubConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
pubSubConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return pubSubConsumerProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.venice.ConfigKeys.CLIENT_PRODUCER_THREAD_NUM;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.KAFKA_OVER_SSL;
import static com.linkedin.venice.ConfigKeys.PUBSUB_BROKER_ADDRESS;
import static com.linkedin.venice.ConfigKeys.SSL_KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;

Expand Down Expand Up @@ -114,8 +115,10 @@ private VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(VersionCreationResp
if (versionCreationResponse.isEnableSSL()) {
writerProps.put(KAFKA_OVER_SSL, "true");
writerProps.put(SSL_KAFKA_BOOTSTRAP_SERVERS, versionCreationResponse.getKafkaBootstrapServers());
writerProps.put(PUBSUB_BROKER_ADDRESS, versionCreationResponse.getKafkaBootstrapServers());
} else {
writerProps.put(KAFKA_BOOTSTRAP_SERVERS, versionCreationResponse.getKafkaBootstrapServers());
writerProps.put(PUBSUB_BROKER_ADDRESS, versionCreationResponse.getKafkaBootstrapServers());
}

return getVeniceWriter(versionCreationResponse, writerProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.CommonConfigKeys.SSL_FACTORY_CLASS_NAME;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.PUBSUB_BROKER_ADDRESS;
import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SSL_CONFIGURATOR_CLASS_CONFIG;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME;
Expand Down Expand Up @@ -66,6 +67,7 @@ public static VeniceProperties getConsumerProperties(JobConf config) {
*/
consumerFactoryProperties.setProperty(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, Long.toString(4 * 1024 * 1024));
consumerFactoryProperties.setProperty(KAFKA_BOOTSTRAP_SERVERS, config.get(KAFKA_INPUT_BROKER_URL));
consumerFactoryProperties.setProperty(PUBSUB_BROKER_ADDRESS, config.get(KAFKA_INPUT_BROKER_URL));

ApacheKafkaProducerConfig.copyKafkaSASLProperties(HadoopUtils.getProps(config), consumerFactoryProperties, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,17 @@ protected AbstractVeniceWriter<byte[], byte[], byte[]> createBasicVeniceWriter()
VenicePartitioner partitioner = PartitionUtils.getVenicePartitioner(props);

String topicName = props.getString(TOPIC_PROP);
VeniceWriterOptions options = new VeniceWriterOptions.Builder(topicName).setKeySerializer(new DefaultSerializer())
.setValueSerializer(new DefaultSerializer())
.setWriteComputeSerializer(new DefaultSerializer())
.setChunkingEnabled(chunkingEnabled)
.setRmdChunkingEnabled(rmdChunkingEnabled)
.setTime(SystemTime.INSTANCE)
.setPartitionCount(getPartitionCount())
.setPartitioner(partitioner)
.setMaxRecordSizeBytes(Integer.parseInt(maxRecordSizeBytesStr))
.build();
VeniceWriterOptions options =
new VeniceWriterOptions.Builder(topicName).setKeyPayloadSerializer(new DefaultSerializer())
.setValuePayloadSerializer(new DefaultSerializer())
.setWriteComputePayloadSerializer(new DefaultSerializer())
.setChunkingEnabled(chunkingEnabled)
.setRmdChunkingEnabled(rmdChunkingEnabled)
.setTime(SystemTime.INSTANCE)
.setPartitionCount(getPartitionCount())
.setPartitioner(partitioner)
.setMaxRecordSizeBytes(Integer.parseInt(maxRecordSizeBytesStr))
.build();
String flatViewConfigMapString = props.getString(PUSH_JOB_VIEW_CONFIGS, "");
if (!flatViewConfigMapString.isEmpty()) {
mainWriter = veniceWriterFactoryFactory.createVeniceWriter(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private void pushSyntheticData() throws ExecutionException, InterruptedException
VeniceWriterFactory vwFactory = IntegrationTestPushUtils
.getVeniceWriterFactory(veniceCluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory);
try (VeniceWriter<Object, byte[], byte[]> veniceWriter = vwFactory.createVeniceWriter(
new VeniceWriterOptions.Builder(pushVersionTopic).setKeySerializer(keySerializer).build())) {
new VeniceWriterOptions.Builder(pushVersionTopic).setKeyPayloadSerializer(keySerializer).build())) {
veniceWriter.broadcastStartOfPush(Collections.emptyMap());
Future[] writerFutures = new Future[ENTRY_COUNT];
for (int i = 0; i < ENTRY_COUNT; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ private ConfigKeys() {
*/
public static final String PUBSUB_CLIENT_CONFIG_PREFIX = PubSubConstants.PUBSUB_CLIENT_CONFIG_PREFIX;

public static final String PUBSUB_BROKER_ADDRESS = PubSubConstants.PUBSUB_BROKER_ADDRESS;

public static final String KAFKA_CONFIG_PREFIX = ApacheKafkaProducerConfig.KAFKA_CONFIG_PREFIX;
public static final String KAFKA_BOOTSTRAP_SERVERS = ApacheKafkaProducerConfig.KAFKA_BOOTSTRAP_SERVERS;
public static final String SSL_KAFKA_BOOTSTRAP_SERVERS = ApacheKafkaProducerConfig.SSL_KAFKA_BOOTSTRAP_SERVERS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,49 @@
*/
public class PubSubConstants {
public static final String PUBSUB_CLIENT_CONFIG_PREFIX = "pubsub.";
public static final String PUBSUB_BROKER_ADDRESS = PUBSUB_CLIENT_CONFIG_PREFIX + "broker.address";

// If true, the producer will use default configuration values for optimized high throughput
// producing if they are not explicitly set.
public static final String PUBSUB_PRODUCER_USE_HIGH_THROUGHPUT_DEFAULTS =
"pubsub.producer.use.high.throughput.defaults";
PUBSUB_CLIENT_CONFIG_PREFIX + "producer.use.high.throughput.defaults";

// Timeout for consumer APIs which do not have a timeout parameter
public static final String PUBSUB_CONSUMER_API_DEFAULT_TIMEOUT_MS = "pubsub.consumer.api.default.timeout.ms";
public static final String PUBSUB_CONSUMER_API_DEFAULT_TIMEOUT_MS =
PUBSUB_CLIENT_CONFIG_PREFIX + "consumer.api.default.timeout.ms";
public static final int PUBSUB_CONSUMER_API_DEFAULT_TIMEOUT_MS_DEFAULT_VALUE = 60_000; // 1 minute

// Number of times to retry poll() upon failure
public static final String PUBSUB_CONSUMER_POLL_RETRY_TIMES = "pubsub.consumer.poll.retry.times";
public static final String PUBSUB_CONSUMER_POLL_RETRY_TIMES =
PUBSUB_CLIENT_CONFIG_PREFIX + "consumer.poll.retry.times";
public static final int PUBSUB_CONSUMER_POLL_RETRY_TIMES_DEFAULT_VALUE = 3;
// Backoff time in milliseconds between poll() retries
public static final String PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS = "pubsub.consumer.poll.retry.backoff.ms";
public static final String PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS =
PUBSUB_CLIENT_CONFIG_PREFIX + "consumer.poll.retry.backoff.ms";
public static final int PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS_DEFAULT_VALUE = 0;

public static final String PUBSUB_CONSUMER_POSITION_RESET_STRATEGY = "pubsub.consumer.position.reset.strategy";
public static final String PUBSUB_CONSUMER_POSITION_RESET_STRATEGY =
PUBSUB_CLIENT_CONFIG_PREFIX + "consumer.position.reset.strategy";
public static final String PUBSUB_CONSUMER_POSITION_RESET_STRATEGY_DEFAULT_VALUE = "earliest";

public static final long PUBSUB_ADMIN_GET_TOPIC_CONFIG_RETRY_IN_SECONDS_DEFAULT_VALUE = 300;
public static final long PUBSUB_TOPIC_UNKNOWN_RETENTION = Long.MIN_VALUE;

public static final String PUBSUB_CONSUMER_TOPIC_QUERY_RETRY_TIMES = "pubsub.consumer.topic.query.retry.times";
public static final String PUBSUB_CONSUMER_TOPIC_QUERY_RETRY_TIMES =
PUBSUB_CLIENT_CONFIG_PREFIX + "consumer.topic.query.retry.times";
public static final int PUBSUB_CONSUMER_TOPIC_QUERY_RETRY_TIMES_DEFAULT_VALUE = 5;

public static final String PUBSUB_CONSUMER_TOPIC_QUERY_RETRY_INTERVAL_MS =
"pubsub.consumer.topic.query.retry.interval.ms";
PUBSUB_CLIENT_CONFIG_PREFIX + "consumer.topic.query.retry.interval.ms";
public static final int PUBSUB_CONSUMER_TOPIC_QUERY_RETRY_INTERVAL_MS_DEFAULT_VALUE = 1000;

// PubSub admin APIs default timeout
public static final String PUBSUB_ADMIN_API_DEFAULT_TIMEOUT_MS = "pubsub.admin.api.default.timeout.ms";
public static final String PUBSUB_ADMIN_API_DEFAULT_TIMEOUT_MS =
PUBSUB_CLIENT_CONFIG_PREFIX + "admin.api.default.timeout.ms";
public static final int PUBSUB_ADMIN_API_DEFAULT_TIMEOUT_MS_DEFAULT_VALUE = 120_000; // 2 minutes

public static final String PUBSUB_CONSUMER_CHECK_TOPIC_EXISTENCE = "pubsub.consumer.check.topic.existence";
public static final String PUBSUB_CONSUMER_CHECK_TOPIC_EXISTENCE =
PUBSUB_CLIENT_CONFIG_PREFIX + "consumer.check.topic.existence";
public static final boolean PUBSUB_CONSUMER_CHECK_TOPIC_EXISTENCE_DEFAULT_VALUE = false;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.pubsub;

import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapterContext;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.Closeable;

Expand All @@ -23,7 +24,12 @@ public interface PubSubProducerAdapterFactory<ADAPTER extends PubSubProducerAdap
* If this value is null, local broker address present in veniceProperties will be used.
* @return Returns an instance of a producer adapter
*/
@Deprecated
ADAPTER create(VeniceProperties veniceProperties, String producerName, String targetBrokerAddress);

default ADAPTER create(PubSubProducerAdapterContext context) {
return create(context.getVeniceProperties(), context.getProducerName(), context.getBrokerAddress());
}

String getName();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.pubsub.adapter;

import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;


Expand All @@ -10,12 +11,23 @@ public class SimplePubSubProduceResultImpl implements PubSubProduceResult {
private final String topic;
private final int partition;
private final long offset;
private final PubSubPosition pubSubPosition;
private final int serializedSize;

public SimplePubSubProduceResultImpl(String topic, int partition, long offset, int serializedSize) {
this(topic, partition, offset, null, serializedSize);
}

public SimplePubSubProduceResultImpl(
String topic,
int partition,
long offset,
PubSubPosition pubSubPosition,
int serializedSize) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.pubSubPosition = pubSubPosition;
this.serializedSize = serializedSize;
}

Expand All @@ -24,6 +36,11 @@ public long getOffset() {
return offset;
}

@Override
public PubSubPosition getPubSubPosition() {
return pubSubPosition;
}

@Override
public int getSerializedSize() {
return serializedSize;
Expand All @@ -41,7 +58,7 @@ public int getPartition() {

@Override
public String toString() {
return "[Topic: " + topic + "," + "Partition: " + partition + "," + "Offset: " + offset + "," + "SerializedSize: "
+ serializedSize + "]";
return "[Topic: " + topic + ", " + "Partition: " + partition + ", " + "Offset: " + offset + ", "
+ "PubSubPosition: " + pubSubPosition + ", " + "SerializedSize: " + serializedSize + "]";
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import org.apache.kafka.clients.producer.RecordMetadata;

Expand All @@ -14,6 +15,7 @@ public ApacheKafkaProduceResult(RecordMetadata recordMetadata) {
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
new ApacheKafkaOffsetPosition(recordMetadata.offset()),
recordMetadata.serializedKeySize() + recordMetadata.serializedValueSize());
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.linkedin.venice.pubsub.adapter.kafka.producer;

import com.linkedin.venice.annotation.VisibleForTesting;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaUtils;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubMessageSerializer;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.exceptions.PubSubClientException;
import com.linkedin.venice.pubsub.api.exceptions.PubSubClientRetriableException;
import com.linkedin.venice.pubsub.api.exceptions.PubSubOpTimeoutException;
Expand Down Expand Up @@ -40,8 +43,9 @@
public class ApacheKafkaProducerAdapter implements PubSubProducerAdapter {
private static final Logger LOGGER = LogManager.getLogger(ApacheKafkaProducerAdapter.class);

private KafkaProducer<KafkaKey, KafkaMessageEnvelope> producer;
private KafkaProducer<byte[], byte[]> producer;
private final ApacheKafkaProducerConfig producerConfig;
private final PubSubMessageSerializer messageSerializer;

/**
* @param producerConfig contains producer configs
Expand All @@ -50,9 +54,11 @@ public ApacheKafkaProducerAdapter(ApacheKafkaProducerConfig producerConfig) {
this(producerConfig, new KafkaProducer<>(producerConfig.getProducerProperties()));
}

ApacheKafkaProducerAdapter(ApacheKafkaProducerConfig cfg, KafkaProducer<KafkaKey, KafkaMessageEnvelope> producer) {
@VisibleForTesting
ApacheKafkaProducerAdapter(ApacheKafkaProducerConfig cfg, KafkaProducer<byte[], byte[]> producer) {
this.producerConfig = cfg;
this.producer = producer;
this.messageSerializer = cfg.getPubSubMessageSerializer();
}

/**
Expand Down Expand Up @@ -91,11 +97,40 @@ public CompletableFuture<PubSubProduceResult> sendMessage(
PubSubMessageHeaders pubsubMessageHeaders,
PubSubProducerCallback pubsubProducerCallback) {
ensureProducerIsNotClosed();
ProducerRecord<KafkaKey, KafkaMessageEnvelope> record = new ProducerRecord<>(
byte[] keyBytes = messageSerializer.serializeKey(topic, key);
byte[] valueBytes = messageSerializer.serializeValue(topic, value);
return sendMessage(topic, partition, keyBytes, valueBytes, pubsubMessageHeaders, pubsubProducerCallback);
}

@Override
public CompletableFuture<PubSubProduceResult> sendMessage(
PubSubTopicPartition pubSubTopicPartition,
byte[] keyBytes,
byte[] valueBytes,
PubSubMessageHeaders pubsubMessageHeaders,
PubSubProducerCallback pubsubProducerCallback) {
return sendMessage(
pubSubTopicPartition.getTopicName(),
pubSubTopicPartition.getPartitionNumber(),
keyBytes,
valueBytes,
pubsubMessageHeaders,
pubsubProducerCallback);
}

private CompletableFuture<PubSubProduceResult> sendMessage(
String topic,
Integer partition,
byte[] keyBytes,
byte[] valueBytes,
PubSubMessageHeaders pubsubMessageHeaders,
PubSubProducerCallback pubsubProducerCallback) {
ensureProducerIsNotClosed();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topic,
partition,
key,
value,
keyBytes,
valueBytes,
ApacheKafkaUtils.convertToKafkaSpecificHeaders(pubsubMessageHeaders));
ApacheKafkaProducerCallback kafkaCallback = new ApacheKafkaProducerCallback(pubsubProducerCallback);
try {
Expand Down
Loading

0 comments on commit 8b6951b

Please sign in to comment.