diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index d26e7ad8a2..788152acf7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -88,6 +88,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -2258,19 +2259,12 @@ protected void recordHeartbeatReceived( return; } - // Separate incremental push pubsub entries has the same pubsub url but different cluster id, which creates - // confusion for heartbeat tracking. We need to resolve the kafka url to the actual kafka cluster url. - String resolvedKafkaUrl = kafkaClusterUrlResolver != null ? kafkaClusterUrlResolver.apply(kafkaUrl) : kafkaUrl; - // This is just sanity check, as there is no leader producing HB to sep topic by design. - if (!Objects.equals(resolvedKafkaUrl, kafkaUrl)) { - return; - } if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) { heartbeatMonitoringService.recordLeaderHeartbeat( storeName, versionNumber, partitionConsumptionState.getPartition(), - serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl), + serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), consumerRecord.getValue().producerMetadata.messageTimestamp, partitionConsumptionState.isComplete()); } else { @@ -2278,7 +2272,7 @@ protected void recordHeartbeatReceived( storeName, versionNumber, partitionConsumptionState.getPartition(), - serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl), + serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl), consumerRecord.getValue().producerMetadata.messageTimestamp, partitionConsumptionState.isComplete()); } @@ -3831,10 +3825,16 @@ protected Set maybeSendIngestionHeartbeat() { return null; } - AtomicInteger numHeartBeatSuccess = new AtomicInteger(0); - Map> heartBeatFutures = new VeniceConcurrentHashMap<>(); - Set failedPartitions = VeniceConcurrentHashMap.newKeySet(); + List> heartBeatFutures = + new ArrayList<>(partitionConsumptionStateMap.size()); + List> heartBeatFuturesForSepRT = + isSeparatedRealtimeTopicEnabled() ? new ArrayList<>() : null; + Set failedPartitions = VeniceConcurrentHashMap.newKeySet(partitionConsumptionStateMap.size()); + Set failedPartitionsForSepRT = + isSeparatedRealtimeTopicEnabled() ? VeniceConcurrentHashMap.newKeySet() : null; AtomicReference completionException = new AtomicReference<>(null); + AtomicReference completionExceptionForSepRT = + isSeparatedRealtimeTopicEnabled() ? new AtomicReference<>(null) : null; for (PartitionConsumptionState pcs: partitionConsumptionStateMap.values()) { PubSubTopic leaderTopic = pcs.getOffsetRecord().getLeaderTopic(pubSubTopicRepository); if (isLeader(pcs) && leaderTopic != null && leaderTopic.isRealTime()) { @@ -3851,24 +3851,47 @@ protected Set maybeSendIngestionHeartbeat() { if (throwable != null) { completionException.set(new CompletionException(throwable)); failedPartitions.add(String.valueOf(partition)); - } else { - numHeartBeatSuccess.getAndIncrement(); } }); - heartBeatFutures.put(partition, heartBeatFuture); + heartBeatFutures.add(heartBeatFuture); + // Also send to separate RT topic if it is enabled for the version. + if (isSeparatedRealtimeTopicEnabled()) { + CompletableFuture heartBeatFutureForSepRT = + sendIngestionHeartbeatToRT(new PubSubTopicPartitionImpl(separateRealTimeTopic, partition)); + heartBeatFutureForSepRT.whenComplete((ignore, throwable) -> { + if (throwable != null) { + completionExceptionForSepRT.set(new CompletionException(throwable)); + failedPartitionsForSepRT.add(String.valueOf(partition)); + } + }); + heartBeatFuturesForSepRT.add(heartBeatFutureForSepRT); + } } } + sendHeartbeatProduceLog(heartBeatFutures, failedPartitions, completionException, false); + if (isSeparatedRealtimeTopicEnabled()) { + sendHeartbeatProduceLog(heartBeatFuturesForSepRT, failedPartitionsForSepRT, completionExceptionForSepRT, true); + failedPartitions.addAll(failedPartitionsForSepRT); + } + lastSendIngestionHeartbeatTimestamp.set(currentTimestamp); + return failedPartitions; + } + void sendHeartbeatProduceLog( + List> heartBeatFutures, + Set failedPartitions, + AtomicReference completionException, + boolean isSeparateTopic) { if (!heartBeatFutures.isEmpty()) { - CompletableFuture.allOf(heartBeatFutures.values().toArray(new CompletableFuture[0])) + CompletableFuture.allOf(heartBeatFutures.toArray(new CompletableFuture[0])) .whenCompleteAsync((ignore, throwable) -> { if (!failedPartitions.isEmpty()) { int numFailedPartitions = failedPartitions.size(); String logMessage = String.format( "Send ingestion heartbeat for %d partitions of topic %s: %d succeeded, %d failed for partitions: %s", heartBeatFutures.size(), - realTimeTopic, - numHeartBeatSuccess.get(), + isSeparateTopic ? realTimeTopic : separateRealTimeTopic, + heartBeatFutures.size() - numFailedPartitions, numFailedPartitions, String.join(",", failedPartitions)); if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) { @@ -3878,16 +3901,13 @@ protected Set maybeSendIngestionHeartbeat() { String logMessage = String.format( "Send ingestion heartbeat for %d partitions of topic %s: all succeeded", heartBeatFutures.size(), - realTimeTopic); + isSeparateTopic ? realTimeTopic : separateRealTimeTopic); if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) { LOGGER.debug(logMessage); } } }); } - - lastSendIngestionHeartbeatTimestamp.set(currentTimestamp); - return failedPartitions; } /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 491c732188..092df6cb17 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -206,6 +206,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final String kafkaVersionTopic; protected final PubSubTopic versionTopic; protected final PubSubTopic realTimeTopic; + protected final PubSubTopic separateRealTimeTopic; protected final String storeName; private final boolean isUserSystemStore; protected final int versionNumber; @@ -290,8 +291,6 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final boolean isWriteComputationEnabled; - protected final boolean isSeparatedRealtimeTopicEnabled; - /** * Freeze ingestion if ready to serve or local data exists */ @@ -395,6 +394,9 @@ public StoreIngestionTask( this.storeName = versionTopic.getStoreName(); this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName); this.realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(version)); + this.separateRealTimeTopic = version.isSeparateRealTimeTopicEnabled() + ? pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName)) + : null; this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic); this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY); this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>(); @@ -448,8 +450,6 @@ public StoreIngestionTask( this.isWriteComputationEnabled = store.isWriteComputationEnabled(); - this.isSeparatedRealtimeTopicEnabled = version.isSeparateRealTimeTopicEnabled(); - this.partitionStateSerializer = builder.getPartitionStateSerializer(); this.suppressLiveUpdates = serverConfig.freezeIngestionIfReadyToServeOrLocalDataExists(); @@ -540,7 +540,7 @@ public StoreIngestionTask( Collections.unmodifiableMap(partitionConsumptionStateMap), serverConfig.isHybridQuotaEnabled(), serverConfig.isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled(), - isSeparatedRealtimeTopicEnabled, + version.isSeparateRealTimeTopicEnabled(), ingestionNotificationDispatcher, this::pauseConsumption, this::resumeConsumption); @@ -4648,7 +4648,7 @@ public boolean hasAllPartitionReportedCompleted() { } public boolean isSeparatedRealtimeTopicEnabled() { - return isSeparatedRealtimeTopicEnabled; + return separateRealTimeTopic != null; } /** diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 35fe5f4082..d4ec8bec0b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -4691,7 +4691,7 @@ public void testMaybeSendIngestionHeartbeat( } @Test - public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws InterruptedException { + public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() { String storeName = Utils.getUniqueString("store"); StorageService storageService = mock(StorageService.class); Store mockStore = mock(Store.class); @@ -4712,6 +4712,7 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter doReturn(false).when(mockStore).isHybridStoreDiskQuotaEnabled(); doReturn(mockVersion).when(mockStore).getVersion(1); doReturn(true).when(mockVersion).isActiveActiveReplicationEnabled(); + doReturn(true).when(mockVersion).isSeparateRealTimeTopicEnabled(); VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class); VeniceProperties mockVeniceProperties = mock(VeniceProperties.class); doReturn(true).when(mockVeniceProperties).isEmpty(); @@ -4727,10 +4728,14 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter doReturn(offsetRecord).when(pcs1).getOffsetRecord(); doReturn(LeaderFollowerStateType.LEADER).when(pcs1).getLeaderFollowerState(); doReturn(1).when(pcs1).getPartition(); + PubSubTopic pubsubTopic = mock(PubSubTopic.class); doReturn(pubsubTopic).when(offsetRecord).getLeaderTopic(any()); doReturn(true).when(pubsubTopic).isRealTime(); + PubSubTopic pubsubTopicSepRT = mock(PubSubTopic.class); + doReturn(true).when(pubsubTopicSepRT).isRealTime(); + VeniceWriter veniceWriter = mock(VeniceWriter.class); VeniceWriterFactory veniceWriterFactory = mock(VeniceWriterFactory.class); doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any()); @@ -4767,6 +4772,8 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter heartBeatFuture.complete(null); PubSubTopicPartition pubSubTopicPartition0 = new PubSubTopicPartitionImpl(pubsubTopic, 0); PubSubTopicPartition pubSubTopicPartition1 = new PubSubTopicPartitionImpl(pubsubTopic, 1); + PubSubTopic sepRTtopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName)); + PubSubTopicPartition pubSubTopicPartition1sep = new PubSubTopicPartitionImpl(sepRTtopic, 1); // all succeeded doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong()); @@ -4792,6 +4799,24 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter assertTrue(failedPartitions.get().contains("1")); }); + // 1 partition throws exception + doReturn(heartBeatFuture).when(veniceWriter) + .sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong()); + doAnswer(invocation -> { + throw new Exception("mock exception"); + }).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition1sep), any(), any(), anyBoolean(), any(), anyLong()); + // wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat()); + assertNotNull(failedPartitions.get()); + }); + // wait for the futures to complete that populates failedPartitions + TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { + assertEquals(failedPartitions.get().size(), 1); + assertFalse(failedPartitions.get().contains("0")); + assertTrue(failedPartitions.get().contains("1")); + }); + // both partition throws exception doAnswer(invocation -> { throw new Exception("mock exception"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java index fc53acfe2c..01085bbb86 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestActiveActiveReplicationForIncPush.java @@ -428,7 +428,7 @@ private void verifyForSeparateIncrementalPushTopic( realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10, "Records # is more than expected: " + realTimeTopicOffset); } else { - assertEquals(separateTopicOffset, 0, "Records # is not enough: " + separateTopicOffset); + Assert.assertTrue(separateTopicOffset > 0, "Records # is not enough: " + separateTopicOffset); Assert.assertTrue( realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10, "Records # is more than expected: " + realTimeTopicOffset); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java index 7baed5aef9..09c53f8cdd 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java @@ -19,9 +19,11 @@ import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.davinci.kafka.consumer.ConsumerPoolType; import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator; +import com.linkedin.davinci.kafka.consumer.ReplicaHeartbeatInfo; import com.linkedin.davinci.replication.RmdWithValueSchemaId; import com.linkedin.davinci.replication.merge.RmdSerDe; import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache; +import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService; import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter; import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.record.ValueRecord; @@ -64,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -132,7 +135,7 @@ public void cleanUp() { @Test(timeOut = TEST_TIMEOUT_MS * 2) public void testIncrementalPushPartialUpdate() throws IOException { - final String storeName = Utils.getUniqueString("inc_push_update_classic_format"); + final String storeName = Utils.getUniqueString("sepRT_ingestion"); String parentControllerUrl = parentController.getControllerUrl(); File inputDir = getTempDataDirectory(); Schema recordSchema = writeSimpleAvroFileWithStringToPartialUpdateOpRecordSchema(inputDir); @@ -192,6 +195,7 @@ public void testIncrementalPushPartialUpdate() throws IOException { VeniceClusterWrapper veniceClusterWrapper = childDatacenters.get(0).getClusters().get(CLUSTER_NAME); validateData(storeName, veniceClusterWrapper); + // Empty push will large rewind time to make sure data is all copied to the new version for verification. parentControllerClient.emptyPush(storeName, "test_push_id_v2", 1000); TestUtils.waitForNonDeterministicPushCompletion( Version.composeKafkaTopic(storeName, 2), @@ -276,6 +280,22 @@ public void testIncrementalPushPartialUpdate() throws IOException { NUMBER_OF_CHILD_DATACENTERS, 1); }); + + // Add a new push with minimal rewind time to test that separate RT has heartbeat populated. + UpdateStoreQueryParams updateStoreParams2 = + new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(10L); + updateStoreResponse = + parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams2)); + assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError()); + parentControllerClient.emptyPush(storeName, "test_push_id_v3", 1000); + TestUtils.waitForNonDeterministicPushCompletion( + Version.composeKafkaTopic(storeName, 3), + parentControllerClient, + 30, + TimeUnit.SECONDS); + + // Make sure separate RT heartbeat is tracked properly. + validateSeparateRealtimeTopicHeartbeat(Version.composeKafkaTopic(storeName, 3), 0); } } @@ -323,6 +343,23 @@ private void validateRmdData( } } + private void validateSeparateRealtimeTopicHeartbeat(String topicName, int partition) { + long leaderSepRTTopicCount = 0; + for (VeniceServerWrapper serverWrapper: multiRegionMultiClusterWrapper.getChildRegions() + .get(0) + .getClusters() + .get("venice-cluster0") + .getVeniceServers()) { + HeartbeatMonitoringService heartbeatMonitoringService = + serverWrapper.getVeniceServer().getHeartbeatMonitoringService(); + assertNotNull(heartbeatMonitoringService); + Map heartbeatInfoMap = + heartbeatMonitoringService.getHeartbeatInfo(topicName, partition, false); + leaderSepRTTopicCount += heartbeatInfoMap.keySet().stream().filter(x -> x.endsWith("_sep")).count(); + } + Assert.assertEquals(leaderSepRTTopicCount, NUMBER_OF_CHILD_DATACENTERS); + } + private byte[] serializeStringKeyToByteArray(String key) { Utf8 utf8Key = new Utf8(key); DatumWriter writer = new GenericDatumWriter<>(Schema.create(Schema.Type.STRING));