Skip to content

Commit

Permalink
[server] Produce heartbeat to separate RT topic (#1537)
Browse files Browse the repository at this point in the history
Produce heartbeat to separate RT topic at the same time as producing to RT.  
This is needed in the edge case where if the rewind time is shorter than the  
previous separate RT write, there will be no data to replay in separate RT,  
and the ready-to-serve check will never pass because no offset has been  
persisted for the separate RT region.
  • Loading branch information
sixpluszero authored Feb 14, 2025
1 parent 9284547 commit e05e625
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -2258,27 +2259,20 @@ protected void recordHeartbeatReceived(
return;
}

// Separate incremental push pubsub entries has the same pubsub url but different cluster id, which creates
// confusion for heartbeat tracking. We need to resolve the kafka url to the actual kafka cluster url.
String resolvedKafkaUrl = kafkaClusterUrlResolver != null ? kafkaClusterUrlResolver.apply(kafkaUrl) : kafkaUrl;
// This is just sanity check, as there is no leader producing HB to sep topic by design.
if (!Objects.equals(resolvedKafkaUrl, kafkaUrl)) {
return;
}
if (partitionConsumptionState.getLeaderFollowerState().equals(LEADER)) {
heartbeatMonitoringService.recordLeaderHeartbeat(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isComplete());
} else {
heartbeatMonitoringService.recordFollowerHeartbeat(
storeName,
versionNumber,
partitionConsumptionState.getPartition(),
serverConfig.getKafkaClusterUrlToAliasMap().get(resolvedKafkaUrl),
serverConfig.getKafkaClusterUrlToAliasMap().get(kafkaUrl),
consumerRecord.getValue().producerMetadata.messageTimestamp,
partitionConsumptionState.isComplete());
}
Expand Down Expand Up @@ -3831,10 +3825,16 @@ protected Set<String> maybeSendIngestionHeartbeat() {
return null;
}

AtomicInteger numHeartBeatSuccess = new AtomicInteger(0);
Map<Integer, CompletableFuture<PubSubProduceResult>> heartBeatFutures = new VeniceConcurrentHashMap<>();
Set<String> failedPartitions = VeniceConcurrentHashMap.newKeySet();
List<CompletableFuture<PubSubProduceResult>> heartBeatFutures =
new ArrayList<>(partitionConsumptionStateMap.size());
List<CompletableFuture<PubSubProduceResult>> heartBeatFuturesForSepRT =
isSeparatedRealtimeTopicEnabled() ? new ArrayList<>() : null;
Set<String> failedPartitions = VeniceConcurrentHashMap.newKeySet(partitionConsumptionStateMap.size());
Set<String> failedPartitionsForSepRT =
isSeparatedRealtimeTopicEnabled() ? VeniceConcurrentHashMap.newKeySet() : null;
AtomicReference<CompletionException> completionException = new AtomicReference<>(null);
AtomicReference<CompletionException> completionExceptionForSepRT =
isSeparatedRealtimeTopicEnabled() ? new AtomicReference<>(null) : null;
for (PartitionConsumptionState pcs: partitionConsumptionStateMap.values()) {
PubSubTopic leaderTopic = pcs.getOffsetRecord().getLeaderTopic(pubSubTopicRepository);
if (isLeader(pcs) && leaderTopic != null && leaderTopic.isRealTime()) {
Expand All @@ -3851,24 +3851,47 @@ protected Set<String> maybeSendIngestionHeartbeat() {
if (throwable != null) {
completionException.set(new CompletionException(throwable));
failedPartitions.add(String.valueOf(partition));
} else {
numHeartBeatSuccess.getAndIncrement();
}
});
heartBeatFutures.put(partition, heartBeatFuture);
heartBeatFutures.add(heartBeatFuture);
// Also send to separate RT topic if it is enabled for the version.
if (isSeparatedRealtimeTopicEnabled()) {
CompletableFuture<PubSubProduceResult> heartBeatFutureForSepRT =
sendIngestionHeartbeatToRT(new PubSubTopicPartitionImpl(separateRealTimeTopic, partition));
heartBeatFutureForSepRT.whenComplete((ignore, throwable) -> {
if (throwable != null) {
completionExceptionForSepRT.set(new CompletionException(throwable));
failedPartitionsForSepRT.add(String.valueOf(partition));
}
});
heartBeatFuturesForSepRT.add(heartBeatFutureForSepRT);
}
}
}
sendHeartbeatProduceLog(heartBeatFutures, failedPartitions, completionException, false);
if (isSeparatedRealtimeTopicEnabled()) {
sendHeartbeatProduceLog(heartBeatFuturesForSepRT, failedPartitionsForSepRT, completionExceptionForSepRT, true);
failedPartitions.addAll(failedPartitionsForSepRT);
}
lastSendIngestionHeartbeatTimestamp.set(currentTimestamp);
return failedPartitions;
}

void sendHeartbeatProduceLog(
List<CompletableFuture<PubSubProduceResult>> heartBeatFutures,
Set<String> failedPartitions,
AtomicReference<CompletionException> completionException,
boolean isSeparateTopic) {
if (!heartBeatFutures.isEmpty()) {
CompletableFuture.allOf(heartBeatFutures.values().toArray(new CompletableFuture[0]))
CompletableFuture.allOf(heartBeatFutures.toArray(new CompletableFuture[0]))
.whenCompleteAsync((ignore, throwable) -> {
if (!failedPartitions.isEmpty()) {
int numFailedPartitions = failedPartitions.size();
String logMessage = String.format(
"Send ingestion heartbeat for %d partitions of topic %s: %d succeeded, %d failed for partitions: %s",
heartBeatFutures.size(),
realTimeTopic,
numHeartBeatSuccess.get(),
isSeparateTopic ? realTimeTopic : separateRealTimeTopic,
heartBeatFutures.size() - numFailedPartitions,
numFailedPartitions,
String.join(",", failedPartitions));
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) {
Expand All @@ -3878,16 +3901,13 @@ protected Set<String> maybeSendIngestionHeartbeat() {
String logMessage = String.format(
"Send ingestion heartbeat for %d partitions of topic %s: all succeeded",
heartBeatFutures.size(),
realTimeTopic);
isSeparateTopic ? realTimeTopic : separateRealTimeTopic);
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(logMessage)) {
LOGGER.debug(logMessage);
}
}
});
}

lastSendIngestionHeartbeatTimestamp.set(currentTimestamp);
return failedPartitions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final String kafkaVersionTopic;
protected final PubSubTopic versionTopic;
protected final PubSubTopic realTimeTopic;
protected final PubSubTopic separateRealTimeTopic;
protected final String storeName;
private final boolean isUserSystemStore;
protected final int versionNumber;
Expand Down Expand Up @@ -290,8 +291,6 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

protected final boolean isWriteComputationEnabled;

protected final boolean isSeparatedRealtimeTopicEnabled;

/**
* Freeze ingestion if ready to serve or local data exists
*/
Expand Down Expand Up @@ -395,6 +394,9 @@ public StoreIngestionTask(
this.storeName = versionTopic.getStoreName();
this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName);
this.realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(version));
this.separateRealTimeTopic = version.isSeparateRealTimeTopicEnabled()
? pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName))
: null;
this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic);
this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY);
this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>();
Expand Down Expand Up @@ -448,8 +450,6 @@ public StoreIngestionTask(

this.isWriteComputationEnabled = store.isWriteComputationEnabled();

this.isSeparatedRealtimeTopicEnabled = version.isSeparateRealTimeTopicEnabled();

this.partitionStateSerializer = builder.getPartitionStateSerializer();

this.suppressLiveUpdates = serverConfig.freezeIngestionIfReadyToServeOrLocalDataExists();
Expand Down Expand Up @@ -540,7 +540,7 @@ public StoreIngestionTask(
Collections.unmodifiableMap(partitionConsumptionStateMap),
serverConfig.isHybridQuotaEnabled(),
serverConfig.isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled(),
isSeparatedRealtimeTopicEnabled,
version.isSeparateRealTimeTopicEnabled(),
ingestionNotificationDispatcher,
this::pauseConsumption,
this::resumeConsumption);
Expand Down Expand Up @@ -4648,7 +4648,7 @@ public boolean hasAllPartitionReportedCompleted() {
}

public boolean isSeparatedRealtimeTopicEnabled() {
return isSeparatedRealtimeTopicEnabled;
return separateRealTimeTopic != null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4691,7 +4691,7 @@ public void testMaybeSendIngestionHeartbeat(
}

@Test
public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws InterruptedException {
public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() {
String storeName = Utils.getUniqueString("store");
StorageService storageService = mock(StorageService.class);
Store mockStore = mock(Store.class);
Expand All @@ -4712,6 +4712,7 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter
doReturn(false).when(mockStore).isHybridStoreDiskQuotaEnabled();
doReturn(mockVersion).when(mockStore).getVersion(1);
doReturn(true).when(mockVersion).isActiveActiveReplicationEnabled();
doReturn(true).when(mockVersion).isSeparateRealTimeTopicEnabled();
VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class);
VeniceProperties mockVeniceProperties = mock(VeniceProperties.class);
doReturn(true).when(mockVeniceProperties).isEmpty();
Expand All @@ -4727,10 +4728,14 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter
doReturn(offsetRecord).when(pcs1).getOffsetRecord();
doReturn(LeaderFollowerStateType.LEADER).when(pcs1).getLeaderFollowerState();
doReturn(1).when(pcs1).getPartition();

PubSubTopic pubsubTopic = mock(PubSubTopic.class);
doReturn(pubsubTopic).when(offsetRecord).getLeaderTopic(any());
doReturn(true).when(pubsubTopic).isRealTime();

PubSubTopic pubsubTopicSepRT = mock(PubSubTopic.class);
doReturn(true).when(pubsubTopicSepRT).isRealTime();

VeniceWriter veniceWriter = mock(VeniceWriter.class);
VeniceWriterFactory veniceWriterFactory = mock(VeniceWriterFactory.class);
doReturn(veniceWriter).when(veniceWriterFactory).createVeniceWriter(any());
Expand Down Expand Up @@ -4767,6 +4772,8 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter
heartBeatFuture.complete(null);
PubSubTopicPartition pubSubTopicPartition0 = new PubSubTopicPartitionImpl(pubsubTopic, 0);
PubSubTopicPartition pubSubTopicPartition1 = new PubSubTopicPartitionImpl(pubsubTopic, 1);
PubSubTopic sepRTtopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName));
PubSubTopicPartition pubSubTopicPartition1sep = new PubSubTopicPartitionImpl(sepRTtopic, 1);

// all succeeded
doReturn(heartBeatFuture).when(veniceWriter).sendHeartbeat(any(), any(), any(), anyBoolean(), any(), anyLong());
Expand All @@ -4792,6 +4799,24 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter
assertTrue(failedPartitions.get().contains("1"));
});

// 1 partition throws exception
doReturn(heartBeatFuture).when(veniceWriter)
.sendHeartbeat(eq(pubSubTopicPartition0), any(), any(), anyBoolean(), any(), anyLong());
doAnswer(invocation -> {
throw new Exception("mock exception");
}).when(veniceWriter).sendHeartbeat(eq(pubSubTopicPartition1sep), any(), any(), anyBoolean(), any(), anyLong());
// wait for SERVER_INGESTION_HEARTBEAT_INTERVAL_MS
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
failedPartitions.set(ingestionTask.maybeSendIngestionHeartbeat());
assertNotNull(failedPartitions.get());
});
// wait for the futures to complete that populates failedPartitions
TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
assertEquals(failedPartitions.get().size(), 1);
assertFalse(failedPartitions.get().contains("0"));
assertTrue(failedPartitions.get().contains("1"));
});

// both partition throws exception
doAnswer(invocation -> {
throw new Exception("mock exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ private void verifyForSeparateIncrementalPushTopic(
realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10,
"Records # is more than expected: " + realTimeTopicOffset);
} else {
assertEquals(separateTopicOffset, 0, "Records # is not enough: " + separateTopicOffset);
Assert.assertTrue(separateTopicOffset > 0, "Records # is not enough: " + separateTopicOffset);
Assert.assertTrue(
realTimeTopicOffset < TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT / 10,
"Records # is more than expected: " + realTimeTopicOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.kafka.consumer.ConsumerPoolType;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator;
import com.linkedin.davinci.kafka.consumer.ReplicaHeartbeatInfo;
import com.linkedin.davinci.replication.RmdWithValueSchemaId;
import com.linkedin.davinci.replication.merge.RmdSerDe;
import com.linkedin.davinci.replication.merge.StringAnnotatedStoreSchemaCache;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.record.ValueRecord;
Expand Down Expand Up @@ -64,6 +66,7 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -132,7 +135,7 @@ public void cleanUp() {

@Test(timeOut = TEST_TIMEOUT_MS * 2)
public void testIncrementalPushPartialUpdate() throws IOException {
final String storeName = Utils.getUniqueString("inc_push_update_classic_format");
final String storeName = Utils.getUniqueString("sepRT_ingestion");
String parentControllerUrl = parentController.getControllerUrl();
File inputDir = getTempDataDirectory();
Schema recordSchema = writeSimpleAvroFileWithStringToPartialUpdateOpRecordSchema(inputDir);
Expand Down Expand Up @@ -192,6 +195,7 @@ public void testIncrementalPushPartialUpdate() throws IOException {
VeniceClusterWrapper veniceClusterWrapper = childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
validateData(storeName, veniceClusterWrapper);

// Empty push will large rewind time to make sure data is all copied to the new version for verification.
parentControllerClient.emptyPush(storeName, "test_push_id_v2", 1000);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 2),
Expand Down Expand Up @@ -276,6 +280,22 @@ public void testIncrementalPushPartialUpdate() throws IOException {
NUMBER_OF_CHILD_DATACENTERS,
1);
});

// Add a new push with minimal rewind time to test that separate RT has heartbeat populated.
UpdateStoreQueryParams updateStoreParams2 =
new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(10L);
updateStoreResponse =
parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams2));
assertFalse(updateStoreResponse.isError(), "Update store got error: " + updateStoreResponse.getError());
parentControllerClient.emptyPush(storeName, "test_push_id_v3", 1000);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 3),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Make sure separate RT heartbeat is tracked properly.
validateSeparateRealtimeTopicHeartbeat(Version.composeKafkaTopic(storeName, 3), 0);
}
}

Expand Down Expand Up @@ -323,6 +343,23 @@ private void validateRmdData(
}
}

private void validateSeparateRealtimeTopicHeartbeat(String topicName, int partition) {
long leaderSepRTTopicCount = 0;
for (VeniceServerWrapper serverWrapper: multiRegionMultiClusterWrapper.getChildRegions()
.get(0)
.getClusters()
.get("venice-cluster0")
.getVeniceServers()) {
HeartbeatMonitoringService heartbeatMonitoringService =
serverWrapper.getVeniceServer().getHeartbeatMonitoringService();
assertNotNull(heartbeatMonitoringService);
Map<String, ReplicaHeartbeatInfo> heartbeatInfoMap =
heartbeatMonitoringService.getHeartbeatInfo(topicName, partition, false);
leaderSepRTTopicCount += heartbeatInfoMap.keySet().stream().filter(x -> x.endsWith("_sep")).count();
}
Assert.assertEquals(leaderSepRTTopicCount, NUMBER_OF_CHILD_DATACENTERS);
}

private byte[] serializeStringKeyToByteArray(String key) {
Utf8 utf8Key = new Utf8(key);
DatumWriter<Utf8> writer = new GenericDatumWriter<>(Schema.create(Schema.Type.STRING));
Expand Down

0 comments on commit e05e625

Please sign in to comment.