From e05e625d766ee54ff12750fcf59cd8a4dfc09118 Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Fri, 14 Feb 2025 12:16:55 -0800 Subject: [PATCH] [server] Produce heartbeat to separate RT topic (#1537) Produce heartbeat to separate RT topic at the same time as producing to RT. This is needed in the edge case where if the rewind time is shorter than the previous separate RT write, there will be no data to replay in separate RT, and the ready-to-serve check will never pass because no offset has been persisted for the separate RT region. --- .../LeaderFollowerStoreIngestionTask.java | 64 ++++++++++++------- .../kafka/consumer/StoreIngestionTask.java | 12 ++-- .../consumer/StoreIngestionTaskTest.java | 27 +++++++- ...TestActiveActiveReplicationForIncPush.java | 2 +- .../TestSeparateRealtimeTopicIngestion.java | 39 ++++++++++- 5 files changed, 113 insertions(+), 31 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index d26e7ad8a2..788152acf7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -88,6 +88,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -2258,19 +2259,12 @@ protected void recordHeartbeatReceived( return; } - // Separate incremental push pubsub entries has the same pubsub url but different cluster id, which creates - // confusion for heartbeat tracking. We need to resolve the kafka url to the actual kafka cluster url. - String resolvedKafkaUrl = kafkaClusterUrlResolver != null ? kafkaClusterUrlResolver.apply(kafkaUrl) : kafkaUrl; - // This is just sanity check, as there is no leader producing HB to sep topic by design. - if (!Objects.equals(resolvedKafkaUrl, kafkaUrl)) { - return; - } if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) { heartbeatMonitoringService.recordLeaderHeartbeat( storeName, versionNumber, partitionConsumptionState.getPartition(), - serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl), + serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), consumerRecord.getValue().producerMetadata.messageTimestamp, partitionConsumptionState.isComplete()); } else { @@ -2278,7 +2272,7 @@ protected void recordHeartbeatReceived( storeName, versionNumber, partitionConsumptionState.getPartition(), - serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl), + serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), consumerRecord.getValue().producerMetadata.messageTimestamp, partitionConsumptionState.isComplete()); } @@ -3831,10 +3825,16 @@ protected Set maybeSendIngestionHeartbeat() { return null; } - AtomicInteger numHeartBeatSuccess = new AtomicInteger(0); - Map> heartBeatFutures = new VeniceConcurrentHashMap<>(); - Set failedPartitions = VeniceConcurrentHashMap.newKeySet(); + List> heartBeatFutures = + new ArrayList<>(partitionConsumptionStateMap.size()); + List> heartBeatFuturesForSepRT = + isSeparatedRealtimeTopicEnabled() ? new ArrayList<>() : null; + Set failedPartitions = VeniceConcurrentHashMap.newKeySet(partitionConsumptionStateMap.size()); + Set failedPartitionsForSepRT = + isSeparatedRealtimeTopicEnabled() ? VeniceConcurrentHashMap.newKeySet() : null; AtomicReference completionException = new AtomicReference<>(null); + AtomicReference completionExceptionForSepRT = + isSeparatedRealtimeTopicEnabled() ? new AtomicReference<>(null) : null; for (PartitionConsumptionState pcs: partitionConsumptionStateMap.values()) { PubSubTopic leaderTopic = pcs.getOffsetRecord().getLeaderTopic(pubSubTopicRepository); if (isLeader(pcs) && leaderTopic != null && leaderTopic.isRealTime()) { @@ -3851,24 +3851,47 @@ protected Set maybeSendIngestionHeartbeat() { if (throwable != null) { completionException.set(new CompletionException(throwable)); failedPartitions.add(String.valueOf(partition)); - } else { - numHeartBeatSuccess.getAndIncrement(); } }); - heartBeatFutures.put(partition, heartBeatFuture); + heartBeatFutures.add(heartBeatFuture); + // Also send to separate RT topic if it is enabled for the version. + if (isSeparatedRealtimeTopicEnabled()) { + CompletableFuture heartBeatFutureForSepRT = + sendIngestionHeartbeatToRT(new PubSubTopicPartitionImpl(separateRealTimeTopic, partition)); + heartBeatFutureForSepRT.whenComplete((ignore, throwable) -> { + if (throwable != null) { + completionExceptionForSepRT.set(new CompletionException(throwable)); + failedPartitionsForSepRT.add(String.valueOf(partition)); + } + }); + heartBeatFuturesForSepRT.add(heartBeatFutureForSepRT); + } } } + sendHeartbeatProduceLog(heartBeatFutures, failedPartitions, completionException, false); + if (isSeparatedRealtimeTopicEnabled()) { + sendHeartbeatProduceLog(heartBeatFuturesForSepRT, failedPartitionsForSepRT, completionExceptionForSepRT, true); + failedPartitions.addAll(failedPartitionsForSepRT); + } + lastSendIngestionHeartbeatTimestamp.set(currentTimestamp); + return failedPartitions; + } + void sendHeartbeatProduceLog( + List> heartBeatFutures, + Set failedPartitions, + AtomicReference completionException, + boolean isSeparateTopic) { if (!heartBeatFutures.isEmpty()) { - CompletableFuture.allOf(heartBeatFutures.values().toArray(new CompletableFuture[0])) + CompletableFuture.allOf(heartBeatFutures.toArray(new CompletableFuture[0])) .whenCompleteAsync((ignore, throwable) -> { if (!failedPartitions.isEmpty()) { int numFailedPartitions = failedPartitions.size(); String logMessage = String.format( "Send ingestion heartbeat for %d partitions of topic %s: %d succeeded, %d failed for partitions: %s", heartBeatFutures.size(), - realTimeTopic, - numHeartBeatSuccess.get(), + isSeparateTopic ? realTimeTopic : separateRealTimeTopic, + heartBeatFutures.size() - numFailedPartitions, numFailedPartitions, String.join(",", failedPartitions)); if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) { @@ -3878,16 +3901,13 @@ protected Set maybeSendIngestionHeartbeat() { String logMessage = String.format( "Send ingestion heartbeat for %d partitions of topic %s: all succeeded", heartBeatFutures.size(), - realTimeTopic); + isSeparateTopic ? realTimeTopic : separateRealTimeTopic); if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) { LOGGER.debug(logMessage); } } }); } - - lastSendIngestionHeartbeatTimestamp.set(currentTimestamp); - return failedPartitions; } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 491c732188..092df6cb17 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -206,6 +206,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final String kafkaVersionTopic; protected final PubSubTopic versionTopic; protected final PubSubTopic realTimeTopic; + protected final PubSubTopic separateRealTimeTopic; protected final String storeName; private final boolean isUserSystemStore; protected final int versionNumber; @@ -290,8 +291,6 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final boolean isWriteComputationEnabled; - protected final boolean isSeparatedRealtimeTopicEnabled; - /** * Freeze ingestion if ready to serve or local data exists */ @@ -395,6 +394,9 @@ public StoreIngestionTask( this.storeName = versionTopic.getStoreName(); this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName); this.realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(version)); + this.separateRealTimeTopic = version.isSeparateRealTimeTopicEnabled() + ? pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName)) + : null; this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic); this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY); this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>(); @@ -448,8 +450,6 @@ public StoreIngestionTask( this.isWriteComputationEnabled = store.isWriteComputationEnabled(); - this.isSeparatedRealtimeTopicEnabled = version.isSeparateRealTimeTopicEnabled(); - this.partitionStateSerializer = builder.getPartitionStateSerializer(); this.suppressLiveUpdates = serverConfig.freezeIngestionIfReadyToServeOrLocalDataExists(); @@ -540,7 +540,7 @@ public StoreIngestionTask( Collections.unmodifiableMap(partitionConsumptionStateMap), serverConfig.isHybridQuotaEnabled(), serverConfig.isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled(), - isSeparatedRealtimeTopicEnabled, + version.isSeparateRealTimeTopicEnabled(), ingestionNotificationDispatcher, this::pauseConsumption, this::resumeConsumption); @@ -4648,7 +4648,7 @@ public boolean hasAllPartitionReportedCompleted() { } public boolean isSeparatedRealtimeTopicEnabled() { - return isSeparatedRealtimeTopicEnabled; + return separateRealTimeTopic != null; } /** diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 35fe5f4082..d4ec8bec0b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -4691,7 +4691,7 @@ public void testMaybeSendIngestionHeartbeat( } @Test - public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws InterruptedException { + public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() { String storeName = Utils.getUniqueString("store"); StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); @@ -4712,6 +4712,7 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter doReturn(false).when(mockStore).isHybridStoreDiskQuotaEnabled(); doReturn(mockVersion).when(mockStore).getVersion(1); doReturn(true).when(mockVersion).isActiveActiveReplicationEnabled(); + doReturn(true).when(mockVersion).isSeparateRealTimeTopicEnabled(); VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); VeniceProperties mockVeniceProperties = mock(VeniceProperties.class); doReturn(true).when(mockVeniceProperties).isEmpty(); @@ -4727,10 +4728,14 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter doReturn(offsetRecord).when(pcs1).getOffsetRecord(); doReturn(LeaderFollowerStateType.LEADER).when(pcs1).getLeaderFollowerState(); doReturn(1).when(pcs1).getPartition(); + PubSubTopic pubsubTopic = mock(PubSubTopic.class); doReturn(pubsubTopic).when(offsetRecord).getLeaderTopic(any()); doReturn(true).when(pubsubTopic).isRealTime(); + PubSubTopic pubsubTopicSepRT = mock(PubSubTopic.class); + doReturn(true).when(pubsubTopicSepRT).isRealTime(); + VeniceWriter veniceWriter = mock(VeniceWriter.class); VeniceWriterFactory veniceWriterFactory = mock(VeniceWriterFactory.class); doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any()); @@ -4767,6 +4772,8 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter heartBeatFuture.complete(null); PubSubTopicPartition pubSubTopicPartition0 = new PubSubTopicPartitionImpl(pubsubTopic, 0); PubSubTopicPartition pubSubTopicPartition1 = new PubSubTopicPartitionImpl(pubsubTopic, 1); + PubSubTopic sepRTtopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName)); + PubSubTopicPartition pubSubTopicPartition1sep = new PubSubTopicPartitionImpl(sepRTtopic, 1); // all succeeded doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong()); @@ -4792,6 +4799,24 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter assertTrue(failedPartitions.get().contains("1")); }); + // 1 partition throws exception + doReturn(heartBeatFuture).when(veniceWriter) + .sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong()); + doAnswer(invocation -> { + throw new Exception("mock exception"); + }).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition1sep), any(), any(), anyBoolean(), any(), anyLong()); + // wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat()); + assertNotNull(failedPartitions.get()); + }); + // wait for the futures to complete that populates failedPartitions + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + assertEquals(failedPartitions.get().size(), 1); + assertFalse(failedPartitions.get().contains("0")); + assertTrue(failedPartitions.get().contains("1")); + }); + // both partition throws exception doAnswer(invocation -> { throw new Exception("mock exception"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java index fc53acfe2c..01085bbb86 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java @@ -428,7 +428,7 @@ private void verifyForSeparateIncrementalPushTopic( realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10, "Records # is more than expected: " + realTimeTopicOffset); } else { - assertEquals(separateTopicOffset, 0, "Records # is not enough: " + separateTopicOffset); + Assert.assertTrue(separateTopicOffset > 0, "Records # is not enough: " + separateTopicOffset); Assert.assertTrue( realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10, "Records # is more than expected: " + realTimeTopicOffset); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java index 7baed5aef9..09c53f8cdd 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java @@ -19,9 +19,11 @@ import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.davinci.kafka.consumer.ConsumerPoolType; import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator; +import com.linkedin.davinci.kafka.consumer.ReplicaHeartbeatInfo; import com.linkedin.davinci.replication.RmdWithValueSchemaId; import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; +import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.record.ValueRecord; @@ -64,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -132,7 +135,7 @@ public void cleanUp() { @Test(timeOut = TEST_TIMEOUT_MS * 2) public void testIncrementalPushPartialUpdate() throws IOException { - final String storeName = Utils.getUniqueString("inc_push_update_classic_format"); + final String storeName = Utils.getUniqueString("sepRT_ingestion"); String parentControllerUrl = parentController.getControllerUrl(); File inputDir = getTempDataDirectory(); Schema recordSchema = writeSimpleAvroFileWithStringToPartialUpdateOpRecordSchema(inputDir); @@ -192,6 +195,7 @@ public void testIncrementalPushPartialUpdate() throws IOException { VeniceClusterWrapper veniceClusterWrapper = childDatacenters.get(0).getClusters().get(CLUSTER_NAME); validateData(storeName, veniceClusterWrapper); + // Empty push will large rewind time to make sure data is all copied to the new version for verification. parentControllerClient.emptyPush(storeName, "test_push_id_v2", 1000); TestUtils.waitForNonDeterministicPushCompletion( Version.composeKafkaTopic(storeName, 2), @@ -276,6 +280,22 @@ public void testIncrementalPushPartialUpdate() throws IOException { NUMBER_OF_CHILD_DATACENTERS, 1); }); + + // Add a new push with minimal rewind time to test that separate RT has heartbeat populated. + UpdateStoreQueryParams updateStoreParams2 = + new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(10L); + updateStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams2)); + assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError()); + parentControllerClient.emptyPush(storeName, "test_push_id_v3", 1000); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 3), + parentControllerClient, + 30, + TimeUnit.SECONDS); + + // Make sure separate RT heartbeat is tracked properly. + validateSeparateRealtimeTopicHeartbeat(Version.composeKafkaTopic(storeName, 3), 0); } } @@ -323,6 +343,23 @@ private void validateRmdData( } } + private void validateSeparateRealtimeTopicHeartbeat(String topicName, int partition) { + long leaderSepRTTopicCount = 0; + for (VeniceServerWrapper serverWrapper: multiRegionMultiClusterWrapper.getChildRegions() + .get(0) + .getClusters() + .get("venice-cluster0") + .getVeniceServers()) { + HeartbeatMonitoringService heartbeatMonitoringService = + serverWrapper.getVeniceServer().getHeartbeatMonitoringService(); + assertNotNull(heartbeatMonitoringService); + Map heartbeatInfoMap = + heartbeatMonitoringService.getHeartbeatInfo(topicName, partition, false); + leaderSepRTTopicCount += heartbeatInfoMap.keySet().stream().filter(x -> x.endsWith("_sep")).count(); + } + Assert.assertEquals(leaderSepRTTopicCount, NUMBER_OF_CHILD_DATACENTERS); + } + private byte[] serializeStringKeyToByteArray(String key) { Utf8 utf8Key = new Utf8(key); DatumWriter writer = new GenericDatumWriter<>(Schema.create(Schema.Type.STRING));