Skip to content

Commit

Permalink
[server] Do not register separate RT region heartbeat metrics for fol…
Browse files Browse the repository at this point in the history
…lower (#1561)

Separate HB metrics should only be applied to leader, not for follower.  
This will reduce unnecessary metric count.
  • Loading branch information
sixpluszero authored Feb 28, 2025
1 parent 46a8ee9 commit b36fd44
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -64,7 +63,7 @@ public HeartbeatMonitoringService(
Set<String> regionNames,
String localRegionName,
HeartbeatMonitoringServiceStats heartbeatMonitoringServiceStats) {
this.regionNames = regionNames.stream().filter(x -> !Utils.isSeparateTopicRegion(x)).collect(Collectors.toSet());
this.regionNames = regionNames;
this.localRegionName = localRegionName;
this.reportingThread = new HeartbeatReporterThread();
this.lagLoggingThread = new HeartbeatLagLoggingThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import static com.linkedin.venice.stats.StatsErrorCode.NULL_INGESTION_STATS;

import com.linkedin.davinci.stats.AbstractVeniceStatsReporter;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.stats.AsyncGauge;
import java.util.Set;


public class HeartbeatStatReporter extends AbstractVeniceStatsReporter<HeartbeatStat> {
private static final String LEADER_METRIC_PREFIX = "heartbeat_delay_ms_leader-";
private static final String FOLLOWER_METRIC_PREFIX = "heartbeat_delay_ms_follower-";
private static final String CATCHUP_UP_FOLLOWER_METRIC_PREFIX = "catching_up_heartbeat_delay_ms_follower-";
private static final String MAX = "-Max";
private static final String AVG = "-Avg";
static final String LEADER_METRIC_PREFIX = "heartbeat_delay_ms_leader-";
static final String FOLLOWER_METRIC_PREFIX = "heartbeat_delay_ms_follower-";
static final String CATCHUP_UP_FOLLOWER_METRIC_PREFIX = "catching_up_heartbeat_delay_ms_follower-";
static final String MAX = "-Max";
static final String AVG = "-Avg";

public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeName, Set<String> regions) {
super(metricsRepository, storeName);
Expand All @@ -29,39 +30,39 @@ public HeartbeatStatReporter(MetricsRepository metricsRepository, String storeNa
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}
return getStats().getReadyToServeLeaderLag(region).getAvg();
}, LEADER_METRIC_PREFIX + region + AVG));

return getStats().getReadyToServeFollowerLag(region).getMax();
}, FOLLOWER_METRIC_PREFIX + region + MAX));
// Do not register follower heartbeat metrics for separate RT region.
if (Utils.isSeparateTopicRegion(region)) {
continue;
}

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getReadyToServeLeaderLag(region).getAvg();
}, LEADER_METRIC_PREFIX + region + AVG));
return getStats().getReadyToServeFollowerLag(region).getMax();
}, FOLLOWER_METRIC_PREFIX + region + MAX));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getReadyToServeFollowerLag(region).getAvg();
}, FOLLOWER_METRIC_PREFIX + region + AVG));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getCatchingUpFollowerLag(region).getMax();
}, CATCHUP_UP_FOLLOWER_METRIC_PREFIX + region + MAX));

registerSensor(new AsyncGauge((ignored, ignored2) -> {
if (getStats() == null) {
return NULL_INGESTION_STATS.code;
}

return getStats().getCatchingUpFollowerLag(region).getAvg();
}, CATCHUP_UP_FOLLOWER_METRIC_PREFIX + region + AVG));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatStatReporter.CATCHUP_UP_FOLLOWER_METRIC_PREFIX;
import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatStatReporter.FOLLOWER_METRIC_PREFIX;
import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatStatReporter.LEADER_METRIC_PREFIX;
import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatStatReporter.MAX;
import static com.linkedin.venice.utils.Utils.SEPARATE_TOPIC_SUFFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
Expand Down Expand Up @@ -362,4 +367,36 @@ public void testAddLeaderLagMonitor() {

heartbeatMonitoringService.record();
}

@Test
public void testHeartbeatReporter() {
MetricsRepository repository = new MetricsRepository();
String regionName = "dc-0";
Set<String> regionSet = new HashSet<>();
regionSet.add(regionName);
regionSet.add(regionName + SEPARATE_TOPIC_SUFFIX);
String storeName = "abc";
HeartbeatStatReporter heartbeatStatReporter = new HeartbeatStatReporter(repository, storeName, regionSet);
// Leader should not register separate region metric.

String leaderMetricName = "." + storeName + "--" + LEADER_METRIC_PREFIX + regionName + MAX + ".Gauge";
String leaderMetricNameForSepRT = "." + storeName + "--" + LEADER_METRIC_PREFIX + regionName + MAX + ".Gauge";
Assert.assertTrue(heartbeatStatReporter.getMetricsRepository().metrics().containsKey(leaderMetricName));
Assert.assertTrue(heartbeatStatReporter.getMetricsRepository().metrics().containsKey(leaderMetricNameForSepRT));
// Follower should not register separate region metric.
String followerMetricName = "." + storeName + "--" + FOLLOWER_METRIC_PREFIX + regionName + MAX + ".Gauge";
String followerMetricNameForSepRT =
"." + storeName + "--" + FOLLOWER_METRIC_PREFIX + regionName + SEPARATE_TOPIC_SUFFIX + MAX + ".Gauge";
Assert.assertTrue(heartbeatStatReporter.getMetricsRepository().metrics().containsKey(followerMetricName));
Assert.assertFalse(heartbeatStatReporter.getMetricsRepository().metrics().containsKey(followerMetricNameForSepRT));

// Catching-Up Follower should not register separate region metric.
String catchingUpFollowerMetricName =
"." + storeName + "--" + CATCHUP_UP_FOLLOWER_METRIC_PREFIX + regionName + MAX + ".Gauge";
String catchingUpFollowerMetricNameForSepRT = "." + storeName + "--" + CATCHUP_UP_FOLLOWER_METRIC_PREFIX
+ regionName + SEPARATE_TOPIC_SUFFIX + MAX + ".Gauge";
Assert.assertTrue(heartbeatStatReporter.getMetricsRepository().metrics().containsKey(catchingUpFollowerMetricName));
Assert.assertFalse(
heartbeatStatReporter.getMetricsRepository().metrics().containsKey(catchingUpFollowerMetricNameForSepRT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testDumpHostHeartbeatLag() {
LOGGER.info("Heartbeat Info with topic filtering:\n" + heartbeatInfoMap);
Assert.assertEquals(heartbeatInfoMap.keySet().stream().filter(x -> x.endsWith("dc-0")).count(), 3);
Assert.assertEquals(
heartbeatInfoMap.keySet().stream().filter(x -> x.endsWith("dc-1")).count() * 2,
heartbeatInfoMap.keySet().stream().filter(x -> x.contains("dc-1")).count() * 2,
heartbeatInfoMap.values().stream().filter(x -> x.getLeaderState().equals("LEADER")).count());

heartbeatInfoMap = serverWrapper.getVeniceServer()
Expand Down

0 comments on commit b36fd44

Please sign in to comment.