Skip to content

Commit

Permalink
[dvc] Fix metrics reporting issue for DVRT and add more metrics (#1548)
Browse files Browse the repository at this point in the history
1. Fixed an issue where the metrics for DVRT were being registered, but they weren't being emitted.
2. Added more metrics for DVRT to cover more operations for better visibility.
3. Removed unnecessary integration test steps for DVRT to speed up CI.
  • Loading branch information
kvargha authored Feb 28, 2025
1 parent 8b6951b commit 6cb75f9
Show file tree
Hide file tree
Showing 13 changed files with 595 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedDaVinciRecordTransformerStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
Expand Down Expand Up @@ -463,6 +464,12 @@ public void handleStoreDeleted(Store store) {
"Enabled a thread pool for AA/WC ingestion lookup with {} threads.",
serverConfig.getAaWCIngestionStorageLookupThreadPoolSize());

AggVersionedDaVinciRecordTransformerStats recordTransformerStats = null;
if (recordTransformerConfig != null) {
recordTransformerStats =
new AggVersionedDaVinciRecordTransformerStats(metricsRepository, metadataRepo, serverConfig);
}

ingestionTaskFactory = StoreIngestionTaskFactory.builder()
.setVeniceWriterFactory(veniceWriterFactory)
.setStorageEngineRepository(storageService.getStorageEngineRepository())
Expand All @@ -473,6 +480,7 @@ public void handleStoreDeleted(Store store) {
.setTopicManagerRepository(topicManagerRepository)
.setHostLevelIngestionStats(hostLevelIngestionStats)
.setVersionedDIVStats(versionedDIVStats)
.setDaVinciRecordTransformerStats(recordTransformerStats)
.setVersionedIngestionStats(versionedIngestionStats)
.setStoreBufferService(storeBufferService)
.setServerConfig(serverConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.linkedin.davinci.listener.response.AdminResponse;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedDaVinciRecordTransformerStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
Expand Down Expand Up @@ -252,6 +253,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
protected final HostLevelIngestionStats hostLevelIngestionStats;
protected final AggVersionedDIVStats versionedDIVStats;
protected final AggVersionedIngestionStats versionedIngestionStats;
protected AggVersionedDaVinciRecordTransformerStats daVinciRecordTransformerStats;
protected final BooleanSupplier isCurrentVersion;
protected final Optional<HybridStoreConfig> hybridStoreConfig;
protected final Consumer<DataValidationException> divErrorMetricCallback;
Expand Down Expand Up @@ -504,21 +506,17 @@ public StoreIngestionTask(
recordTransformerConfig);
this.recordTransformerDeserializersByPutSchemaId = new SparseConcurrentList<>();

versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber);
versionedIngestionStats.registerTransformerLifecycleStartLatency(storeName, versionNumber);
versionedIngestionStats.registerTransformerLifecycleEndLatency(storeName, versionNumber);
versionedIngestionStats.registerTransformerErrorSensor(storeName, versionNumber);
daVinciRecordTransformerStats = builder.getDaVinciRecordTransformerStats();

// onStartVersionIngestion called here instead of run() because this needs to finish running
// before bootstrapping starts
long startTime = System.currentTimeMillis();
long startTime = System.nanoTime();
recordTransformer.onStartVersionIngestion(isCurrentVersion.getAsBoolean());
long endTime = System.currentTimeMillis();
versionedIngestionStats.recordTransformerLifecycleStartLatency(
daVinciRecordTransformerStats.recordOnStartVersionIngestionLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(startTime),
endTime);
LatencyUtils.getElapsedTimeFromNSToMS(startTime),
System.currentTimeMillis());
} else {
this.recordTransformerKeyDeserializer = null;
this.recordTransformerInputValueSchema = null;
Expand Down Expand Up @@ -661,7 +659,13 @@ public synchronized void subscribePartition(PubSubTopicPartition topicPartition,
int partitionNumber = topicPartition.getPartitionNumber();

if (recordTransformer != null) {
long startTime = System.nanoTime();
recordTransformer.internalOnRecovery(storageEngine, partitionNumber, partitionStateSerializer, compressor);
daVinciRecordTransformerStats.recordOnRecoveryLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromNSToMS(startTime),
System.currentTimeMillis());
}

partitionToPendingConsumerActionCountMap.computeIfAbsent(partitionNumber, x -> new AtomicInteger(0))
Expand Down Expand Up @@ -3767,7 +3771,7 @@ private int processKafkaDataMessage(
int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1;

if (recordTransformer != null) {
long recordTransformerStartTime = System.currentTimeMillis();
long recordTransformerStartTime = System.nanoTime();
ByteBuffer valueBytes = put.getPutValue();
ByteBuffer assembledObject = chunkAssembler.bufferAndAssembleRecord(
consumerRecord.getTopicPartition(),
Expand Down Expand Up @@ -3808,8 +3812,9 @@ private int processKafkaDataMessage(
try {
transformerResult = recordTransformer.transformAndProcessPut(lazyKey, lazyValue);
} catch (Exception e) {
versionedIngestionStats.recordTransformerError(storeName, versionNumber, 1, currentTimeMs);
String errorMessage = "Record transformer experienced an error when transforming value=" + assembledObject;
daVinciRecordTransformerStats.recordPutError(storeName, versionNumber, currentTimeMs);
String errorMessage =
"DaVinciRecordTransformer experienced an error when processing value: " + assembledObject;

throw new VeniceMessageException(errorMessage, e);
}
Expand All @@ -3831,10 +3836,10 @@ private int processKafkaDataMessage(
}

put.putValue = transformedBytes;
versionedIngestionStats.recordTransformerLatency(
daVinciRecordTransformerStats.recordPutLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(recordTransformerStartTime),
LatencyUtils.getElapsedTimeFromNSToMS(recordTransformerStartTime),
currentTimeMs);
writeToStorageEngine(producedPartition, keyBytes, put);
} else {
Expand Down Expand Up @@ -3869,7 +3874,21 @@ private int processKafkaDataMessage(

if (recordTransformer != null) {
Lazy<Object> lazyKey = Lazy.of(() -> this.recordTransformerKeyDeserializer.deserialize(keyBytes));
recordTransformer.processDelete(lazyKey);

long startTime = System.nanoTime();
try {
recordTransformer.processDelete(lazyKey);
} catch (Exception e) {
daVinciRecordTransformerStats.recordDeleteError(storeName, versionNumber, currentTimeMs);
String errorMessage = "DaVinciRecordTransformer experienced an error when deleting key: " + lazyKey.get();

throw new VeniceMessageException(errorMessage, e);
}
daVinciRecordTransformerStats.recordDeleteLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromNSToMS(startTime),
System.currentTimeMillis());

// This is called here after processDelete because if the user stores their data somewhere other than
// Da Vinci, this function needs to execute to allow them to delete the data from the appropriate store
Expand Down Expand Up @@ -4149,15 +4168,14 @@ public synchronized void close() {
// resources before exiting.

if (recordTransformer != null) {
long startTime = System.currentTimeMillis();
long startTime = System.nanoTime();
Store store = storeRepository.getStoreOrThrow(storeName);
recordTransformer.onEndVersionIngestion(store.getCurrentVersion());
long endTime = System.currentTimeMillis();
versionedIngestionStats.recordTransformerLifecycleEndLatency(
daVinciRecordTransformerStats.recordOnEndVersionIngestionLatency(
storeName,
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(startTime),
endTime);
LatencyUtils.getElapsedTimeFromNSToMS(startTime),
System.currentTimeMillis());
Utils.closeQuietlyWithErrorLogged(this.recordTransformer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedDaVinciRecordTransformerStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
import com.linkedin.davinci.storage.StorageEngineRepository;
Expand Down Expand Up @@ -111,6 +112,7 @@ public static class Builder {
private ReadOnlySchemaRepository schemaRepo;
private ReadOnlyStoreRepository metadataRepo;
private TopicManagerRepository topicManagerRepository;
private AggVersionedDaVinciRecordTransformerStats daVinciRecordTransformerStats;
private AggHostLevelIngestionStats ingestionStats;
private AggVersionedDIVStats versionedDIVStats;
private AggVersionedIngestionStats versionedStorageIngestionStats;
Expand Down Expand Up @@ -233,6 +235,15 @@ public Builder setTopicManagerRepository(TopicManagerRepository topicManagerRepo
return set(() -> this.topicManagerRepository = topicManagerRepository);
}

public AggVersionedDaVinciRecordTransformerStats getDaVinciRecordTransformerStats() {
return daVinciRecordTransformerStats;
}

public Builder setDaVinciRecordTransformerStats(
AggVersionedDaVinciRecordTransformerStats daVinciRecordTransformerStats) {
return set(() -> this.daVinciRecordTransformerStats = daVinciRecordTransformerStats);
}

public AggHostLevelIngestionStats getIngestionStats() {
return ingestionStats;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.linkedin.davinci.stats;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import io.tehuti.metrics.MetricsRepository;


/**
* The store level stats for {@link com.linkedin.davinci.client.DaVinciRecordTransformer}
*/
public class AggVersionedDaVinciRecordTransformerStats
extends AbstractVeniceAggVersionedStats<DaVinciRecordTransformerStats, DaVinciRecordTransformerStatsReporter> {
public AggVersionedDaVinciRecordTransformerStats(
MetricsRepository metricsRepository,
ReadOnlyStoreRepository metadataRepository,
VeniceServerConfig serverConfig) {
super(
metricsRepository,
metadataRepository,
DaVinciRecordTransformerStats::new,
DaVinciRecordTransformerStatsReporter::new,
serverConfig.isUnregisterMetricForDeletedStoreEnabled());
}

public void recordPutLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordPutLatency(value, timestamp));
}

public void recordDeleteLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordDeleteLatency(value, timestamp));
}

public void recordOnRecoveryLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordOnRecoveryLatency(value, timestamp));
}

public void recordOnStartVersionIngestionLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordOnStartVersionIngestionLatency(value, timestamp));
}

public void recordOnEndVersionIngestionLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordOnEndVersionIngestionLatency(value, timestamp));
}

public void recordPutError(String storeName, int version, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordPutError(timestamp));
}

public void recordDeleteError(String storeName, int version, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordDeleteError(timestamp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,52 +214,10 @@ public void recordNearlineLocalBrokerToReadyToServeLatency(
stat -> stat.recordNearlineLocalBrokerToReadyToServeLatency(value, timestamp));
}

public void recordTransformerLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerLatency(value, timestamp));
}

public void recordTransformerLifecycleStartLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordTransformerLifecycleStartLatency(value, timestamp));
}

public void recordTransformerLifecycleEndLatency(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(
storeName,
version,
stat -> stat.recordTransformerLifecycleEndLatency(value, timestamp));
}

public void recordTransformerError(String storeName, int version, double value, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerError(value, timestamp));
}

public void recordMaxIdleTime(String storeName, int version, long idleTimeMs) {
getStats(storeName, version).recordIdleTime(idleTimeMs);
}

public void registerTransformerLatencySensor(String storeName, int version) {
getStats(storeName, version).registerTransformerLatencySensor();
getTotalStats(storeName).registerTransformerLatencySensor();
}

public void registerTransformerLifecycleStartLatency(String storeName, int version) {
getStats(storeName, version).registerTransformerLifecycleStartLatencySensor();
getTotalStats(storeName).registerTransformerLifecycleStartLatencySensor();
}

public void registerTransformerLifecycleEndLatency(String storeName, int version) {
getStats(storeName, version).registerTransformerLifecycleEndLatencySensor();
getTotalStats(storeName).registerTransformerLifecycleEndLatencySensor();
}

public void registerTransformerErrorSensor(String storeName, int version) {
getStats(storeName, version).registerTransformerErrorSensor();
getTotalStats(storeName).registerTransformerErrorSensor();
}

public void recordBatchProcessingRequest(String storeName, int version, int size, long timestamp) {
recordVersionedAndTotalStat(storeName, version, stat -> stat.recordBatchProcessingRequest(size, timestamp));
}
Expand Down
Loading

0 comments on commit 6cb75f9

Please sign in to comment.