diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 20e4727b2826..5f8d0a3ddba5 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -323,6 +323,12 @@ public enum CassandraRelevantProperties /** Which class to use for coordinator client request metrics */ CUSTOM_CLIENT_REQUEST_METRICS_PROVIDER_PROPERTY("cassandra.custom_client_request_metrics_provider_class"), + /** + * Which class to use for internode metrics for {@link org.apache.cassandra.net.OutboundConnections}. + * The provided class name must point to an implementation of {@link org.apache.cassandra.metrics.InternodeOutboundMetrics}. + */ + CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY("cassandra.custom_internode_inbound_metrics_provider_class"), + /** * Which class to use for messaging metrics for {@link org.apache.cassandra.net.MessagingService}. * The provided class name must point to an implementation of {@link org.apache.cassandra.metrics.MessagingMetrics}. diff --git a/src/java/org/apache/cassandra/metrics/InternodeInboundMetrics.java b/src/java/org/apache/cassandra/metrics/InternodeInboundMetrics.java index cc8dae9f4cfa..d508586cad96 100644 --- a/src/java/org/apache/cassandra/metrics/InternodeInboundMetrics.java +++ b/src/java/org/apache/cassandra/metrics/InternodeInboundMetrics.java @@ -20,27 +20,29 @@ import com.codahale.metrics.Gauge; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.InboundMessageHandlers; -import org.apache.cassandra.metrics.CassandraMetricsRegistry.MetricName; +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; /** * Metrics for internode connections. */ public class InternodeInboundMetrics { - private final MetricName corruptFramesRecovered; - private final MetricName corruptFramesUnrecovered; - private final MetricName errorBytes; - private final MetricName errorCount; - private final MetricName expiredBytes; - private final MetricName expiredCount; - private final MetricName pendingBytes; - private final MetricName pendingCount; - private final MetricName processedBytes; - private final MetricName processedCount; - private final MetricName receivedBytes; - private final MetricName receivedCount; - private final MetricName throttledCount; - private final MetricName throttledNanos; + public final Gauge corruptFramesRecovered; + public final Gauge corruptFramesUnrecovered; + public final Gauge errorBytes; + public final Gauge errorCount; + public final Gauge expiredBytes; + public final Gauge expiredCount; + public final Gauge pendingBytes; + public final Gauge pendingCount; + public final Gauge processedBytes; + public final Gauge processedCount; + public final Gauge receivedBytes; + public final Gauge receivedCount; + public final Gauge throttledCount; + public final Gauge throttledNanos; + + private final MetricNameFactory factory; /** * Create metrics for given inbound message handlers. @@ -50,49 +52,40 @@ public class InternodeInboundMetrics public InternodeInboundMetrics(InetAddressAndPort peer, InboundMessageHandlers handlers) { // ipv6 addresses will contain colons, which are invalid in a JMX ObjectName - MetricNameFactory factory = new DefaultNameFactory("InboundConnection", peer.getHostAddressAndPortForJMX()); + factory = new DefaultNameFactory("InboundConnection", peer.getHostAddressAndPortForJMX()); - register(corruptFramesRecovered = factory.createMetricName("CorruptFramesRecovered"), handlers::corruptFramesRecovered); - register(corruptFramesUnrecovered = factory.createMetricName("CorruptFramesUnrecovered"), handlers::corruptFramesUnrecovered); - register(errorBytes = factory.createMetricName("ErrorBytes"), handlers::errorBytes); - register(errorCount = factory.createMetricName("ErrorCount"), handlers::errorCount); - register(expiredBytes = factory.createMetricName("ExpiredBytes"), handlers::expiredBytes); - register(expiredCount = factory.createMetricName("ExpiredCount"), handlers::expiredCount); - register(pendingBytes = factory.createMetricName("ScheduledBytes"), handlers::scheduledBytes); - register(pendingCount = factory.createMetricName("ScheduledCount"), handlers::scheduledCount); - register(processedBytes = factory.createMetricName("ProcessedBytes"), handlers::processedBytes); - register(processedCount = factory.createMetricName("ProcessedCount"), handlers::processedCount); - register(receivedBytes = factory.createMetricName("ReceivedBytes"), handlers::receivedBytes); - register(receivedCount = factory.createMetricName("ReceivedCount"), handlers::receivedCount); - register(throttledCount = factory.createMetricName("ThrottledCount"), handlers::throttledCount); - register(throttledNanos = factory.createMetricName("ThrottledNanos"), handlers::throttledNanos); + corruptFramesRecovered = Metrics.register(factory.createMetricName("CorruptFramesRecovered"), handlers::corruptFramesRecovered); + corruptFramesUnrecovered = Metrics.register(factory.createMetricName("CorruptFramesUnrecovered"), handlers::corruptFramesUnrecovered); + errorBytes = Metrics.register(factory.createMetricName("ErrorBytes"), handlers::errorBytes); + errorCount = Metrics.register(factory.createMetricName("ErrorCount"), handlers::errorCount); + expiredBytes = Metrics.register(factory.createMetricName("ExpiredBytes"), handlers::expiredBytes); + expiredCount = Metrics.register(factory.createMetricName("ExpiredCount"), handlers::expiredCount); + pendingBytes = Metrics.register(factory.createMetricName("ScheduledBytes"), handlers::scheduledBytes); + pendingCount = Metrics.register(factory.createMetricName("ScheduledCount"), handlers::scheduledCount); + processedBytes = Metrics.register(factory.createMetricName("ProcessedBytes"), handlers::processedBytes); + processedCount = Metrics.register(factory.createMetricName("ProcessedCount"), handlers::processedCount); + receivedBytes = Metrics.register(factory.createMetricName("ReceivedBytes"), handlers::receivedBytes); + receivedCount = Metrics.register(factory.createMetricName("ReceivedCount"), handlers::receivedCount); + throttledCount = Metrics.register(factory.createMetricName("ThrottledCount"), handlers::throttledCount); + throttledNanos = Metrics.register(factory.createMetricName("ThrottledNanos"), handlers::throttledNanos); } public void release() { - remove(corruptFramesRecovered); - remove(corruptFramesUnrecovered); - remove(errorBytes); - remove(errorCount); - remove(expiredBytes); - remove(expiredCount); - remove(pendingBytes); - remove(pendingCount); - remove(processedBytes); - remove(processedCount); - remove(receivedBytes); - remove(receivedCount); - remove(throttledCount); - remove(throttledNanos); - } - - private static void register(MetricName name, Gauge gauge) - { - CassandraMetricsRegistry.Metrics.register(name, gauge); + Metrics.remove(factory.createMetricName("CorruptFramesRecovered")); + Metrics.remove(factory.createMetricName("CorruptFramesUnrecovered")); + Metrics.remove(factory.createMetricName("ErrorBytes")); + Metrics.remove(factory.createMetricName("ErrorCount")); + Metrics.remove(factory.createMetricName("ExpiredBytes")); + Metrics.remove(factory.createMetricName("ExpiredCount")); + Metrics.remove(factory.createMetricName("PendingBytes")); + Metrics.remove(factory.createMetricName("PendingCount")); + Metrics.remove(factory.createMetricName("ProcessedBytes")); + Metrics.remove(factory.createMetricName("ProcessedCount")); + Metrics.remove(factory.createMetricName("ReceivedBytes")); + Metrics.remove(factory.createMetricName("ReceivedCount")); + Metrics.remove(factory.createMetricName("ThrottledCount")); + Metrics.remove(factory.createMetricName("ThrottledNanos")); } - private static void remove(MetricName name) - { - CassandraMetricsRegistry.Metrics.remove(name); - } } diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java index 5582cfda7581..5f67a52760e7 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.net; +import java.lang.reflect.InvocationTargetException; import java.util.Collection; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -30,6 +31,9 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.InternodeInboundMetrics; import org.apache.cassandra.net.Message.Header; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY; /** * An aggregation of {@link InboundMessageHandler}s for all connections from a peer. @@ -128,7 +132,25 @@ public InboundMessageHandlers(InetAddressAndPort self, largeCallbacks = makeMessageCallbacks(peer, largeCounters, globalMetricCallbacks, messageConsumer); legacyCallbacks = makeMessageCallbacks(peer, legacyCounters, globalMetricCallbacks, messageConsumer); - metrics = new InternodeInboundMetrics(peer, this); + if (CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.isPresent()) + { + Class klass = FBUtilities.classForName(CUSTOM_INTERNODE_INBOUND_METRICS_PROVIDER_PROPERTY.getString(), "Internode Inbound Metrics Provider"); + InternodeInboundMetrics ibInstance; + try + { + ibInstance = klass.getDeclaredConstructor(InetAddressAndPort.class, InboundMessageHandlers.class).newInstance(peer, this); + } + catch (NoSuchMethodException | InstantiationException | IllegalAccessException | + InvocationTargetException e) + { + throw new RuntimeException(e); + } + metrics = ibInstance; + } + else + { + metrics = new InternodeInboundMetrics(peer, this); + } } InboundMessageHandler createHandler(FrameDecoder frameDecoder, ConnectionType type, Channel channel, int version)