Skip to content

Commit f1286fa

Browse files
xuxiong1Mayank Sharma
authored and
Mayank Sharma
committed
[Pull-based Ingestion] Add basic NodeStats metrics (opensearch-project#17444)
Signed-off-by: xuxiong1 <xiongxug@outlook.com>
1 parent ab5dd62 commit f1286fa

File tree

22 files changed

+340
-11
lines changed

22 files changed

+340
-11
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.common.settings.Settings;
1818
import org.opensearch.index.query.RangeQueryBuilder;
19+
import org.opensearch.indices.pollingingest.PollingIngestStats;
1920
import org.opensearch.plugins.PluginInfo;
2021
import org.opensearch.test.OpenSearchIntegTestCase;
2122
import org.junit.Assert;
@@ -75,6 +76,11 @@ public void testKafkaIngestion() {
7576
refresh("test");
7677
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
7778
assertThat(response.getHits().getTotalHits().value(), is(1L));
79+
PollingIngestStats stats = client().admin().indices().prepareStats("test").get().getIndex("test").getShards()[0]
80+
.getPollingIngestStats();
81+
assertNotNull(stats);
82+
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2L));
83+
assertThat(stats.getConsumerStats().getTotalPolledCount(), is(2L));
7884
});
7985
}
8086

server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.opensearch.index.seqno.SeqNoStats;
5757
import org.opensearch.index.shard.IndexShard;
5858
import org.opensearch.indices.IndicesService;
59+
import org.opensearch.indices.pollingingest.PollingIngestStats;
5960
import org.opensearch.node.NodeService;
6061
import org.opensearch.threadpool.ThreadPool;
6162
import org.opensearch.transport.TransportRequest;
@@ -210,15 +211,18 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
210211
CommitStats commitStats;
211212
SeqNoStats seqNoStats;
212213
RetentionLeaseStats retentionLeaseStats;
214+
PollingIngestStats pollingIngestStats;
213215
try {
214216
commitStats = indexShard.commitStats();
215217
seqNoStats = indexShard.seqNoStats();
216218
retentionLeaseStats = indexShard.getRetentionLeaseStats();
219+
pollingIngestStats = indexShard.pollingIngestStats();
217220
} catch (final AlreadyClosedException e) {
218221
// shard is closed - no stats is fine
219222
commitStats = null;
220223
seqNoStats = null;
221224
retentionLeaseStats = null;
225+
pollingIngestStats = null;
222226
}
223227
shardsStats.add(
224228
new ShardStats(
@@ -227,7 +231,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
227231
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, commonStatsFlags),
228232
commitStats,
229233
seqNoStats,
230-
retentionLeaseStats
234+
retentionLeaseStats,
235+
pollingIngestStats
231236
)
232237
);
233238
}

server/src/main/java/org/opensearch/action/admin/indices/stats/ShardStats.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.action.admin.indices.stats;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.cluster.routing.ShardRouting;
3637
import org.opensearch.common.Nullable;
3738
import org.opensearch.common.annotation.PublicApi;
@@ -44,6 +45,7 @@
4445
import org.opensearch.index.seqno.RetentionLeaseStats;
4546
import org.opensearch.index.seqno.SeqNoStats;
4647
import org.opensearch.index.shard.ShardPath;
48+
import org.opensearch.indices.pollingingest.PollingIngestStats;
4749

4850
import java.io.IOException;
4951

@@ -65,6 +67,9 @@ public class ShardStats implements Writeable, ToXContentFragment {
6567
@Nullable
6668
private RetentionLeaseStats retentionLeaseStats;
6769

70+
@Nullable
71+
private PollingIngestStats pollingIngestStats;
72+
6873
/**
6974
* Gets the current retention lease stats.
7075
*
@@ -87,6 +92,9 @@ public ShardStats(StreamInput in) throws IOException {
8792
isCustomDataPath = in.readBoolean();
8893
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
8994
retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new);
95+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
96+
pollingIngestStats = in.readOptionalWriteable(PollingIngestStats::new);
97+
}
9098
}
9199

92100
public ShardStats(
@@ -95,7 +103,8 @@ public ShardStats(
95103
final CommonStats commonStats,
96104
final CommitStats commitStats,
97105
final SeqNoStats seqNoStats,
98-
final RetentionLeaseStats retentionLeaseStats
106+
final RetentionLeaseStats retentionLeaseStats,
107+
final PollingIngestStats pollingIngestStats
99108
) {
100109
this.shardRouting = routing;
101110
this.dataPath = shardPath.getRootDataPath().toString();
@@ -105,6 +114,7 @@ public ShardStats(
105114
this.commonStats = commonStats;
106115
this.seqNoStats = seqNoStats;
107116
this.retentionLeaseStats = retentionLeaseStats;
117+
this.pollingIngestStats = pollingIngestStats;
108118
}
109119

110120
/**
@@ -128,6 +138,11 @@ public SeqNoStats getSeqNoStats() {
128138
return this.seqNoStats;
129139
}
130140

141+
@Nullable
142+
public PollingIngestStats getPollingIngestStats() {
143+
return this.pollingIngestStats;
144+
}
145+
131146
public String getDataPath() {
132147
return dataPath;
133148
}
@@ -150,6 +165,9 @@ public void writeTo(StreamOutput out) throws IOException {
150165
out.writeBoolean(isCustomDataPath);
151166
out.writeOptionalWriteable(seqNoStats);
152167
out.writeOptionalWriteable(retentionLeaseStats);
168+
if (out.getVersion().onOrAfter((Version.V_3_0_0))) {
169+
out.writeOptionalWriteable(pollingIngestStats);
170+
}
153171
}
154172

155173
@Override
@@ -171,6 +189,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
171189
if (retentionLeaseStats != null) {
172190
retentionLeaseStats.toXContent(builder, params);
173191
}
192+
if (pollingIngestStats != null) {
193+
pollingIngestStats.toXContent(builder, params);
194+
}
174195
builder.startObject(Fields.SHARD_PATH);
175196
builder.field(Fields.STATE_PATH, statePath);
176197
builder.field(Fields.DATA_PATH, dataPath);

server/src/main/java/org/opensearch/action/admin/indices/stats/TransportIndicesStatsAction.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.index.shard.IndexShard;
5353
import org.opensearch.index.shard.ShardNotFoundException;
5454
import org.opensearch.indices.IndicesService;
55+
import org.opensearch.indices.pollingingest.PollingIngestStats;
5556
import org.opensearch.threadpool.ThreadPool;
5657
import org.opensearch.transport.TransportService;
5758

@@ -141,16 +142,27 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
141142
CommitStats commitStats;
142143
SeqNoStats seqNoStats;
143144
RetentionLeaseStats retentionLeaseStats;
145+
PollingIngestStats pollingIngestStats;
144146
try {
145147
commitStats = indexShard.commitStats();
146148
seqNoStats = indexShard.seqNoStats();
147149
retentionLeaseStats = indexShard.getRetentionLeaseStats();
150+
pollingIngestStats = indexShard.pollingIngestStats();
148151
} catch (final AlreadyClosedException e) {
149152
// shard is closed - no stats is fine
150153
commitStats = null;
151154
seqNoStats = null;
152155
retentionLeaseStats = null;
156+
pollingIngestStats = null;
153157
}
154-
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), commonStats, commitStats, seqNoStats, retentionLeaseStats);
158+
return new ShardStats(
159+
indexShard.routingEntry(),
160+
indexShard.shardPath(),
161+
commonStats,
162+
commitStats,
163+
seqNoStats,
164+
retentionLeaseStats,
165+
pollingIngestStats
166+
);
155167
}
156168
}

server/src/main/java/org/opensearch/index/engine/Engine.java

+8
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import org.opensearch.index.translog.Translog;
9494
import org.opensearch.index.translog.TranslogDeletionPolicy;
9595
import org.opensearch.index.translog.TranslogManager;
96+
import org.opensearch.indices.pollingingest.PollingIngestStats;
9697
import org.opensearch.search.suggest.completion.CompletionStats;
9798

9899
import java.io.Closeable;
@@ -946,6 +947,13 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
946947
return stats;
947948
}
948949

950+
/**
951+
* @return Stats for pull-based ingestion.
952+
*/
953+
public PollingIngestStats pollingIngestStats() {
954+
return null;
955+
}
956+
949957
protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
950958
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
951959
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.index.translog.TranslogStats;
3030
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
3131
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
32+
import org.opensearch.indices.pollingingest.PollingIngestStats;
3233
import org.opensearch.indices.pollingingest.StreamPoller;
3334

3435
import java.io.IOException;
@@ -288,4 +289,9 @@ protected TranslogManager createTranslogManager(
288289
protected Map<String, String> commitDataAsMap() {
289290
return commitDataAsMap(indexWriter);
290291
}
292+
293+
@Override
294+
public PollingIngestStats pollingIngestStats() {
295+
return streamPoller.getStats();
296+
}
291297
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+5
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@
184184
import org.opensearch.indices.IndicesService;
185185
import org.opensearch.indices.RemoteStoreSettings;
186186
import org.opensearch.indices.cluster.IndicesClusterStateService;
187+
import org.opensearch.indices.pollingingest.PollingIngestStats;
187188
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
188189
import org.opensearch.indices.recovery.RecoveryFailedException;
189190
import org.opensearch.indices.recovery.RecoveryListener;
@@ -1533,6 +1534,10 @@ public CompletionStats completionStats(String... fields) {
15331534
return getEngine().completionStats(fields);
15341535
}
15351536

1537+
public PollingIngestStats pollingIngestStats() {
1538+
return getEngine().pollingIngestStats();
1539+
}
1540+
15361541
/**
15371542
* Executes the given flush request against the engine.
15381543
*

server/src/main/java/org/opensearch/indices/IndicesService.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
150150
import org.opensearch.indices.mapper.MapperRegistry;
151151
import org.opensearch.indices.pollingingest.IngestionEngineFactory;
152+
import org.opensearch.indices.pollingingest.PollingIngestStats;
152153
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
153154
import org.opensearch.indices.recovery.RecoveryListener;
154155
import org.opensearch.indices.recovery.RecoverySettings;
@@ -758,15 +759,18 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
758759
CommitStats commitStats;
759760
SeqNoStats seqNoStats;
760761
RetentionLeaseStats retentionLeaseStats;
762+
PollingIngestStats pollingIngestStats;
761763
try {
762764
commitStats = indexShard.commitStats();
763765
seqNoStats = indexShard.seqNoStats();
764766
retentionLeaseStats = indexShard.getRetentionLeaseStats();
767+
pollingIngestStats = indexShard.pollingIngestStats();
765768
} catch (AlreadyClosedException e) {
766769
// shard is closed - no stats is fine
767770
commitStats = null;
768771
seqNoStats = null;
769772
retentionLeaseStats = null;
773+
pollingIngestStats = null;
770774
}
771775

772776
return new IndexShardStats(
@@ -778,7 +782,8 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
778782
new CommonStats(indicesService.getIndicesQueryCache(), indexShard, flags),
779783
commitStats,
780784
seqNoStats,
781-
retentionLeaseStats
785+
retentionLeaseStats,
786+
pollingIngestStats
782787
) }
783788
);
784789
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+12
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.common.Nullable;
14+
import org.opensearch.common.metrics.CounterMetric;
1415
import org.opensearch.index.IngestionShardConsumer;
1516
import org.opensearch.index.IngestionShardPointer;
1617
import org.opensearch.index.Message;
@@ -60,6 +61,8 @@ public class DefaultStreamPoller implements StreamPoller {
6061

6162
private MessageProcessorRunnable processorRunnable;
6263

64+
private final CounterMetric totalPolledCount = new CounterMetric();
65+
6366
// A pointer to the max persisted pointer for optimizing the check
6467
@Nullable
6568
private IngestionShardPointer maxPersistedPointer;
@@ -204,6 +207,7 @@ protected void startPoll() {
204207
logger.info("Skipping message with pointer {} as it is already processed", result.getPointer().asString());
205208
continue;
206209
}
210+
totalPolledCount.inc();
207211
blockingQueue.put(result);
208212
logger.debug(
209213
"Put message {} with pointer {} to the blocking queue",
@@ -297,6 +301,14 @@ public IngestionShardPointer getBatchStartPointer() {
297301
return batchStartPointer;
298302
}
299303

304+
@Override
305+
public PollingIngestStats getStats() {
306+
PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
307+
builder.setTotalPolledCount(totalPolledCount.count());
308+
builder.setTotalProcessedCount(processorRunnable.getStats().count());
309+
return builder.build();
310+
}
311+
300312
public State getState() {
301313
return state;
302314
}

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.index.Term;
1515
import org.opensearch.action.DocWriteRequest;
1616
import org.opensearch.common.lucene.uid.Versions;
17+
import org.opensearch.common.metrics.CounterMetric;
1718
import org.opensearch.common.xcontent.XContentFactory;
1819
import org.opensearch.common.xcontent.XContentHelper;
1920
import org.opensearch.core.common.bytes.BytesArray;
@@ -48,6 +49,7 @@ public class MessageProcessorRunnable implements Runnable {
4849

4950
private final BlockingQueue<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> blockingQueue;
5051
private final MessageProcessor messageProcessor;
52+
private final CounterMetric stats = new CounterMetric();
5153

5254
private static final String ID = "_id";
5355
private static final String OP_TYPE = "_op_type";
@@ -229,8 +231,13 @@ public void run() {
229231
Thread.currentThread().interrupt(); // Restore interrupt status
230232
}
231233
if (result != null) {
234+
stats.inc();
232235
messageProcessor.process(result.getMessage(), result.getPointer());
233236
}
234237
}
235238
}
239+
240+
public CounterMetric getStats() {
241+
return stats;
242+
}
236243
}

0 commit comments

Comments
 (0)