Skip to content

Commit

Permalink
[changelog] Fix metadata refresh deadlock scenario (#1536)
Browse files Browse the repository at this point in the history
* [changelog] Fix metadata refresh deadlock scenario

There exists a deadlocking race condition when a version swap happens. The version swap data change listener locks itself
and then proceeds with the version swap on the consumer.  However, other function calls would also call refresh() on the
metadata repository.  This in turn would trigger more callbacks on the data change listener.  In some scenarios, this
can cause deadlocking.

This change makes sure that we no longer leverage manual refreshes and instead let the whole process get driven by the
scheduled metadata refresh.  This also makes the refresh time configurable, and seems to speed up tests (yay!) as there
were frequent retries on consuming events as we had to wait for the version swap to happen internally before we could
consume the events we expected.
  • Loading branch information
ZacAttack authored Feb 14, 2025
1 parent a504538 commit 8bc7133
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
private int controllerRequestRetryCount;

private String bootstrapFileSystemPath;
private long versionSwapDetectionIntervalTimeInMs = 600000L;
private long versionSwapDetectionIntervalTimeInSeconds = 60L;

/**
* This will be used in BootstrappingVeniceChangelogConsumer to determine when to sync updates with the underlying
Expand Down Expand Up @@ -166,12 +166,12 @@ public String getBootstrapFileSystemPath() {
return this.bootstrapFileSystemPath;
}

public long getVersionSwapDetectionIntervalTimeInMs() {
return versionSwapDetectionIntervalTimeInMs;
public long getVersionSwapDetectionIntervalTimeInSeconds() {
return versionSwapDetectionIntervalTimeInSeconds;
}

public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInMs(long intervalTimeInMs) {
this.versionSwapDetectionIntervalTimeInMs = intervalTimeInMs;
public ChangelogClientConfig setVersionSwapDetectionIntervalTimeInSeconds(long intervalTimeInSeconds) {
this.versionSwapDetectionIntervalTimeInSeconds = intervalTimeInSeconds;
return this;
}

Expand Down Expand Up @@ -216,7 +216,7 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
.setD2Client(config.getD2Client())
.setControllerRequestRetryCount(config.getControllerRequestRetryCount())
.setBootstrapFileSystemPath(config.getBootstrapFileSystemPath())
.setVersionSwapDetectionIntervalTimeInMs(config.getVersionSwapDetectionIntervalTimeInMs())
.setVersionSwapDetectionIntervalTimeInSeconds(config.getVersionSwapDetectionIntervalTimeInSeconds())
.setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes())
.setConsumerName(config.consumerName)
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.linkedin.davinci.consumer;

import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.alpini.base.concurrency.ScheduledExecutorService;
import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
Expand All @@ -22,7 +20,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.logging.log4j.LogManager;
Expand All @@ -31,14 +28,11 @@

public class VeniceAfterImageConsumerImpl<K, V> extends VeniceChangelogConsumerImpl<K, V> {
private static final Logger LOGGER = LogManager.getLogger(VeniceAfterImageConsumerImpl.class);
// 10 Minute default
protected long versionSwapDetectionIntervalTimeInMs;
// This consumer is used to find EOP messages without impacting consumption by other subscriptions. It's only used
// in the context of seeking to EOP in the event of the user calling that seek or a version push.
// TODO: We shouldn't use this in the long run. Once the EOP position is queryable from venice and version
// swap is produced to VT, then we should remove this as it's no longer needed.
final private Lazy<VeniceChangelogConsumerImpl<K, V>> internalSeekConsumer;
private final ScheduledExecutorService versionSwapExecutorService = Executors.newSingleThreadScheduledExecutor();
AtomicBoolean versionSwapThreadScheduled = new AtomicBoolean(false);
private final VersionSwapDataChangeListener<K, V> versionSwapListener;

Expand All @@ -60,7 +54,6 @@ protected VeniceAfterImageConsumerImpl(
Lazy<VeniceChangelogConsumerImpl<K, V>> seekConsumer) {
super(changelogClientConfig, consumer);
internalSeekConsumer = seekConsumer;
versionSwapDetectionIntervalTimeInMs = changelogClientConfig.getVersionSwapDetectionIntervalTimeInMs();
versionSwapListener = new VersionSwapDataChangeListener<K, V>(
this,
storeRepository,
Expand All @@ -74,8 +67,6 @@ public Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll
return internalPoll(timeoutInMs, "");
} catch (UnknownTopicOrPartitionException ex) {
LOGGER.error("Caught unknown Topic exception, will attempt repair and retry: ", ex);
storeRepository.refresh();
versionSwapListener.handleStoreChanged(null);
return internalPoll(timeoutInMs, "");
}
}
Expand All @@ -101,11 +92,6 @@ public CompletableFuture<Void> subscribe(Set<Integer> partitions) {
if (!versionSwapThreadScheduled.get()) {
// schedule the version swap thread and set up the callback listener
this.storeRepository.registerStoreDataChangedListener(versionSwapListener);
versionSwapExecutorService.scheduleAtFixedRate(
new VersionSwapDetectionThread(),
versionSwapDetectionIntervalTimeInMs,
versionSwapDetectionIntervalTimeInMs,
TimeUnit.MILLISECONDS);
versionSwapThreadScheduled.set(true);
}
return super.subscribe(partitions);
Expand Down Expand Up @@ -206,15 +192,6 @@ protected CompletableFuture<Void> internalSeek(
return super.internalSeek(partitions, targetTopic, seekAction);
}

private class VersionSwapDetectionThread implements Runnable {
@Override
public void run() {
// the purpose of this thread is to just keep polling just in case something goes wrong at time of the store
// repository change.
versionSwapListener.handleStoreChanged(null);
}
}

@Override
public void setStoreRepository(NativeMetadataRepositoryViewAdapter repository) {
super.setStoreRepository(repository);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci.consumer;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_POS;

Expand Down Expand Up @@ -66,6 +67,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -173,9 +175,13 @@ public VeniceChangelogConsumerImpl(
this.startTimestamp = System.currentTimeMillis();
LOGGER.info("VeniceChangelogConsumer created at timestamp: {}", startTimestamp);

Properties properties = new Properties();
properties.put(
CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS,
String.valueOf(changelogClientConfig.getVersionSwapDetectionIntervalTimeInSeconds()));
ThinClientMetaStoreBasedRepository repository = new ThinClientMetaStoreBasedRepository(
changelogClientConfig.getInnerClientConfig(),
VeniceProperties.empty(),
new VeniceProperties(properties),
null);
repository.start();
this.storeRepository = new NativeMetadataRepositoryViewAdapter(repository);
Expand Down Expand Up @@ -229,7 +235,6 @@ protected CompletableFuture<Void> internalSubscribe(Set<Integer> partitions, Pub
}
}
}
storeRepository.refresh();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -317,7 +322,6 @@ public CompletableFuture<Void> seekToBeginningOfPush() {
@Override
public CompletableFuture<Void> seekToEndOfPush(Set<Integer> partitions) {
// Get the latest change capture topic
storeRepository.refresh();
Store store = getStore();
int currentVersion = store.getCurrentVersion();
PubSubTopic topic = pubSubTopicRepository
Expand Down Expand Up @@ -430,10 +434,9 @@ public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> chec
}

void checkLiveVersion(String topicName) {
storeRepository.refresh();
Store store = storeRepository.getStore(storeName);
try {
store.getVersionOrThrow(Version.parseVersionFromVersionTopicName(topicName));
store.getVersionOrThrow(Version.parseVersionFromKafkaTopicName(topicName));
} catch (StoreVersionNotFoundException ex) {
throw new VeniceCoordinateOutOfRangeException("Checkpoint is off retention! Version has been deprecated...", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE
changelogClientConfig,
mockPubSubConsumer,
Lazy.of(() -> mockInternalSeekConsumer));
veniceChangelogConsumer.versionSwapDetectionIntervalTimeInMs = 1;
NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class);
Store store = mock(Store.class);
Version mockVersion = new VersionImpl(storeName, 1, "foo");
Expand Down
Loading

0 comments on commit 8bc7133

Please sign in to comment.