Skip to content

Commit

Permalink
expose observed journal expirations as metrics (#679)
Browse files Browse the repository at this point in the history
Currently there is no easy way to observer what expirations are used by
different clients - only options are:
* peek at Kafka messages
* crawl the `journal` table
* search in source

With this change, `replicator` will expose new metric
`*_kafka_journal_client_journal_expiration_info` which will expose [a
sample] of expirations as observed in replicated data.
  • Loading branch information
mr-git authored Sep 30, 2024
1 parent 3d8c49a commit ac4ae5d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ private[journal] object ReplicateRecords {
): ReplicateRecords[F] = { (records: Nel[ConsRecord], timestamp: Instant) =>
{

def apply(records: Nel[ActionRecord[Action]]) = {
def apply(records: Nel[ActionRecord[Action]]): F[Int] = {
val record = records.last
val key = record.action.key
val partition = record.partitionOffset.partition
val id = key.id

def measurements(records: Int) = {
def measurements(records: Int): F[TopicReplicatorMetrics.Measurements] = {
for {
now <- Clock[F].instant
} yield {
Expand All @@ -50,14 +50,14 @@ private[journal] object ReplicateRecords {
}
}

def append(offset: Offset, records: Nel[ActionRecord[Action.Append]]) = {
def append(offset: Offset, records: Nel[ActionRecord[Action.Append]]): F[Int] = {
val bytes = records.foldLeft(0L) { case (bytes, record) => bytes + record.action.payload.size }

def msg(
events: Nel[EventRecord[EventualPayloadAndType]],
latency: FiniteDuration,
expireAfter: Option[ExpireAfter],
) = {
): String = {
val seqNrs =
if (events.tail.isEmpty) s"seqNr: ${events.head.seqNr}"
else s"seqNrs: ${events.head.seqNr}..${events.last.seqNr}"
Expand All @@ -69,14 +69,16 @@ private[journal] object ReplicateRecords {
s"append in ${latency.toMillis}ms, id: $id, partition: $partition, offset: $offset, $seqNrs$originStr, version: $versionStr$expireAfterStr"
}

def measure(events: Nel[EventRecord[EventualPayloadAndType]], expireAfter: Option[ExpireAfter]) = {
def measure(events: Nel[EventRecord[EventualPayloadAndType]], expireAfter: Option[ExpireAfter]): F[Unit] = {
for {
measurements <- measurements(records.size)
version = events.last.version.map(_.value).getOrElse("none")
expiration = expireAfter.map(_.value.toString).getOrElse("none")
result <- metrics.append(
events = events.length,
bytes = bytes,
clientVersion = version,
expiration = expiration,
measurements = measurements,
)
_ <- log.info(msg(events, measurements.replicationLatency, expireAfter))
Expand Down Expand Up @@ -105,15 +107,15 @@ private[journal] object ReplicateRecords {
} yield result
}

def delete(offset: Offset, deleteTo: DeleteTo, origin: Option[Origin], version: Option[Version]) = {
def delete(offset: Offset, deleteTo: DeleteTo, origin: Option[Origin], version: Option[Version]): F[Int] = {

def msg(latency: FiniteDuration) = {
def msg(latency: FiniteDuration): String = {
val originStr = origin.foldMap { origin => s", origin: $origin" }
val versionStr = version.fold("none") { _.toString }
s"delete in ${latency.toMillis}ms, id: $id, offset: $partition:$offset, deleteTo: $deleteTo$originStr, version: $versionStr"
}

def measure() = {
def measure(): F[Unit] = {
for {
measurements <- measurements(1)
latency = measurements.replicationLatency
Expand All @@ -128,15 +130,15 @@ private[journal] object ReplicateRecords {
} yield result
}

def purge(offset: Offset, origin: Option[Origin], version: Option[Version]) = {
def purge(offset: Offset, origin: Option[Origin], version: Option[Version]): F[Int] = {

def msg(latency: FiniteDuration) = {
def msg(latency: FiniteDuration): String = {
val originStr = origin.foldMap { origin => s", origin: $origin" }
val versionStr = version.fold("none") { _.toString }
s"purge in ${latency.toMillis}ms, id: $id, offset: $partition:$offset$originStr, version: $versionStr"
}

def measure() = {
def measure(): F[Unit] = {
for {
measurements <- measurements(1)
latency = measurements.replicationLatency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ trait TopicReplicatorMetrics[F[_]] {

/** Accounts number of processed events, the size of their payloads and client version as well as
* replication and delivery latencies and number of processed Kafka records */
def append(events: Int, bytes: Long, clientVersion: String, measurements: Measurements): F[Unit]
def append(events: Int, bytes: Long, clientVersion: String, expiration: String, measurements: Measurements): F[Unit]

/** Accounts replication and delivery latencies and number of processed Kafka records */
def delete(measurements: Measurements): F[Unit]
Expand All @@ -36,7 +36,7 @@ object TopicReplicatorMetrics {
class Const
new Const with TopicReplicatorMetrics[F] {

def append(events: Int, bytes: Long, clientVersion: String, measurements: Measurements): F[Unit] = unit
def append(events: Int, bytes: Long, clientVersion: String, expiration: String, measurements: Measurements): F[Unit] = unit

def delete(measurements: Measurements): F[Unit] = unit

Expand Down Expand Up @@ -106,19 +106,26 @@ object TopicReplicatorMetrics {
labels = LabelNames("topic", "version"),
)

val expirationDurationGauge = registry.gauge(
name = s"${prefix}_kafka_journal_client_journal_expiration_info",
help = "kafka-journal's client expiration as observed in payloads",
labels = LabelNames("topic", "expiration"),
)

for {
replicationSummary <- replicationSummary
deliverySummary <- deliverySummary
eventsSummary <- eventsSummary
bytesSummary <- bytesSummary
recordsSummary <- recordsSummary
roundSummary <- roundSummary
roundRecordsSummary <- roundRecordsSummary
clientVersionGauge <- clientVersionGauge
replicationSummary <- replicationSummary
deliverySummary <- deliverySummary
eventsSummary <- eventsSummary
bytesSummary <- bytesSummary
recordsSummary <- recordsSummary
roundSummary <- roundSummary
roundRecordsSummary <- roundRecordsSummary
clientVersionGauge <- clientVersionGauge
expirationDurationGauge <- expirationDurationGauge
} yield { (topic: Topic) =>
{

def observeMeasurements(name: String, measurements: Measurements) = {
def observeMeasurements(name: String, measurements: Measurements): F[Unit] = {
for {
_ <- replicationSummary.labels(topic, name).observe(measurements.replicationLatency.toNanos.nanosToSeconds)
_ <- deliverySummary.labels(topic, name).observe(measurements.deliveryLatency.toNanos.nanosToSeconds)
Expand All @@ -129,12 +136,13 @@ object TopicReplicatorMetrics {
class Main
new Main with TopicReplicatorMetrics[F] {

def append(events: Int, bytes: Long, clientVersion: String, measurements: Measurements): F[Unit] = {
def append(events: Int, bytes: Long, clientVersion: String, expiration: String, measurements: Measurements): F[Unit] = {
for {
_ <- observeMeasurements("append", measurements)
_ <- eventsSummary.labels(topic).observe(events.toDouble)
_ <- bytesSummary.labels(topic).observe(bytes.toDouble)
_ <- clientVersionGauge.labels(topic, clientVersion).set(1)
_ <- expirationDurationGauge.labels(topic, expiration).set(1)
} yield {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ object TopicReplicatorSpec {

implicit val metrics: TopicReplicatorMetrics[StateT] = new TopicReplicatorMetrics[StateT] {

def append(events: Int, bytes: Long, clientVersion: String, measurements: Measurements) = {
def append(events: Int, bytes: Long, clientVersion: String, expiration: String, measurements: Measurements) = {
StateT { s =>
s + Metrics.Append(latency = measurements.replicationLatency, events = events, records = measurements.records)
}
Expand Down

0 comments on commit ac4ae5d

Please sign in to comment.