Skip to content

Commit

Permalink
[server] Add pool type information to Kafka consumer client ID (#1541)
Browse files Browse the repository at this point in the history
A simple fix for Kafka consumer ID generation. Added the pool type so that  
different pool's consumer won't share same client ID. This is helpful when  
debugging consumption with JMX.
  • Loading branch information
sixpluszero authored Feb 19, 2025
1 parent 658d775 commit d056cac
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected KafkaConsumerService(
/**
* We need to assign a unique client id across all the storage nodes, otherwise, they will fail into the same throttling bucket.
*/
consumerProperties.setProperty(KAFKA_CLIENT_ID_CONFIG, getUniqueClientId(kafkaUrl, i));
consumerProperties.setProperty(KAFKA_CLIENT_ID_CONFIG, getUniqueClientId(kafkaUrl, i, poolType));
SharedKafkaConsumer pubSubConsumer = new SharedKafkaConsumer(
pubSubConsumerAdapterFactory.create(
new VeniceProperties(consumerProperties),
Expand Down Expand Up @@ -190,8 +190,8 @@ void handleUnsubscription(
PubSubTopicPartition topicPartition) {
}

private String getUniqueClientId(String kafkaUrl, int suffix) {
return Utils.getHostName() + "_" + kafkaUrl + "_" + suffix;
String getUniqueClientId(String kafkaUrl, int suffix, ConsumerPoolType poolType) {
return Utils.getHostName() + "_" + kafkaUrl + "_" + suffix + poolType.getStatSuffix();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testTopicWiseGetConsumer() throws Exception {
ConsumerPoolType.REGULAR_POOL,
factory,
properties,
1000l,
1000L,
2,
mock(IngestionThrottler.class),
mock(KafkaClusterBasedRecordThrottler.class),
Expand Down Expand Up @@ -421,6 +421,18 @@ public void testPartitionWiseGetConsumer() {
Assert.assertEquals(consumerForT1P3, consumerForT2P1);
}

@Test
public void testGenerateConsumerId() {
String hostName = Utils.getHostName();
String kafkaUrl = "abc:1234";
int suffix = 3;
ConsumerPoolType poolType = ConsumerPoolType.SEP_RT_LEADER_POOL;
KafkaConsumerService consumerService = mock(KafkaConsumerService.class);
doCallRealMethod().when(consumerService).getUniqueClientId(anyString(), anyInt(), any());
String expectedResult = hostName + "_" + kafkaUrl + "_" + suffix + poolType.getStatSuffix();
Assert.assertEquals(consumerService.getUniqueClientId(kafkaUrl, suffix, poolType), expectedResult);
}

@Test
public void testStoreAwarePartitionWiseGetConsumer() {
String storeName1 = Utils.getUniqueString("test_consumer_service1");
Expand Down

0 comments on commit d056cac

Please sign in to comment.