Skip to content

Commit ceeb08a

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Make IndexStoreListener a pluggable interface
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 4213cc2 commit ceeb08a

File tree

7 files changed

+127
-32
lines changed

7 files changed

+127
-32
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1111
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
1212
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
1313
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
14-
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files))
14+
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
15+
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))
1516

1617
### Dependencies
1718
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))

server/src/main/java/org/opensearch/env/NodeEnvironment.java

+1-14
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.opensearch.index.IndexSettings;
7272
import org.opensearch.index.shard.ShardPath;
7373
import org.opensearch.index.store.FsDirectoryFactory;
74+
import org.opensearch.index.store.IndexStoreListener;
7475
import org.opensearch.monitor.fs.FsInfo;
7576
import org.opensearch.monitor.fs.FsProbe;
7677
import org.opensearch.monitor.jvm.JvmInfo;
@@ -1412,18 +1413,4 @@ private static void tryWriteTempFile(Path path) throws IOException {
14121413
}
14131414
}
14141415
}
1415-
1416-
/**
1417-
* A listener that is executed on per-index and per-shard store events, like deleting shard path
1418-
*
1419-
* @opensearch.internal
1420-
*/
1421-
public interface IndexStoreListener {
1422-
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}
1423-
1424-
default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}
1425-
1426-
IndexStoreListener EMPTY = new IndexStoreListener() {
1427-
};
1428-
}
14291416
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.opensearch.common.annotation.PublicApi;
15+
import org.opensearch.core.index.Index;
16+
import org.opensearch.core.index.shard.ShardId;
17+
import org.opensearch.env.NodeEnvironment;
18+
import org.opensearch.index.IndexSettings;
19+
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
/**
24+
* A listener that is executed on per-index and per-shard store events, like deleting shard path
25+
*
26+
* @opensearch.api
27+
*/
28+
@PublicApi(since = "2.19.0")
29+
public interface IndexStoreListener {
30+
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}
31+
32+
default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}
33+
34+
IndexStoreListener EMPTY = new IndexStoreListener() {
35+
};
36+
37+
/**
38+
* A Composite listener that multiplexes calls to each of the listeners methods.
39+
*/
40+
final class CompositeIndexStoreListener implements IndexStoreListener {
41+
private final List<IndexStoreListener> listeners;
42+
private final Logger logger = LogManager.getLogger(CompositeIndexStoreListener.class);
43+
44+
public CompositeIndexStoreListener(List<IndexStoreListener> listeners) {
45+
this.listeners = Collections.unmodifiableList(listeners);
46+
}
47+
48+
@Override
49+
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {
50+
for (IndexStoreListener listener : listeners) {
51+
try {
52+
listener.beforeShardPathDeleted(shardId, indexSettings, env);
53+
} catch (Exception e) {
54+
logger.warn(() -> new ParameterizedMessage("beforeShardPathDeleted listener [{}] failed", listener), e);
55+
}
56+
}
57+
}
58+
59+
@Override
60+
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {
61+
for (IndexStoreListener listener : listeners) {
62+
try {
63+
listener.beforeIndexPathDeleted(index, indexSettings, env);
64+
} catch (Exception e) {
65+
logger.warn(() -> new ParameterizedMessage("beforeIndexPathDeleted listener [{}] failed", listener), e);
66+
}
67+
}
68+
}
69+
}
70+
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.env.NodeEnvironment;
1919
import org.opensearch.index.IndexSettings;
2020
import org.opensearch.index.shard.ShardPath;
21+
import org.opensearch.index.store.IndexStoreListener;
2122

2223
import java.io.IOException;
2324
import java.nio.file.DirectoryStream;
@@ -33,7 +34,7 @@
3334
*
3435
* @opensearch.internal
3536
*/
36-
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
37+
public class FileCacheCleaner implements IndexStoreListener {
3738
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);
3839

3940
private final Provider<FileCache> fileCacheProvider;

server/src/main/java/org/opensearch/node/Node.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@
157157
import org.opensearch.index.recovery.RemoteStoreRestoreService;
158158
import org.opensearch.index.remote.RemoteIndexPathUploader;
159159
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
160+
import org.opensearch.index.store.IndexStoreListener;
160161
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
161162
import org.opensearch.index.store.remote.filecache.FileCache;
162163
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
@@ -548,11 +549,17 @@ protected Node(
548549
*/
549550
this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
550551
Environment.assertEquivalent(initialEnvironment, this.environment);
551-
if (DiscoveryNode.isSearchNode(settings) == false) {
552-
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
553-
} else {
554-
nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache));
555-
}
552+
IndexStoreListener.CompositeIndexStoreListener compositeIndexStoreListener = new IndexStoreListener.CompositeIndexStoreListener(
553+
Stream.concat(
554+
pluginsService.filterPlugins(IndexStorePlugin.class)
555+
.stream()
556+
.map(IndexStorePlugin::getIndexStoreListener)
557+
.filter(Optional::isPresent)
558+
.map(Optional::get),
559+
Stream.of(new FileCacheCleaner(this::fileCache))
560+
).collect(Collectors.toList())
561+
);
562+
nodeEnvironment = new NodeEnvironment(settings, environment, compositeIndexStoreListener);
556563
logger.info(
557564
"node name [{}], node ID [{}], cluster name [{}], roles {}",
558565
NODE_NAME_SETTING.get(tmpSettings),

server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java

+9
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@
3939
import org.opensearch.common.annotation.PublicApi;
4040
import org.opensearch.index.IndexSettings;
4141
import org.opensearch.index.shard.ShardPath;
42+
import org.opensearch.index.store.IndexStoreListener;
4243
import org.opensearch.indices.recovery.RecoveryState;
4344

4445
import java.io.IOException;
4546
import java.util.Collections;
4647
import java.util.Map;
48+
import java.util.Optional;
4749

4850
/**
4951
* A plugin that provides alternative directory implementations.
@@ -105,4 +107,11 @@ interface RecoveryStateFactory {
105107
default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
106108
return Collections.emptyMap();
107109
}
110+
111+
/**
112+
* The {@link IndexStoreListener}s for this plugin which are triggered upon shard/index path deletion
113+
*/
114+
default Optional<IndexStoreListener> getIndexStoreListener() {
115+
return Optional.empty();
116+
}
108117
}

server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java

+31-11
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.opensearch.core.index.shard.ShardId;
4646
import org.opensearch.gateway.MetadataStateFormat;
4747
import org.opensearch.index.IndexSettings;
48+
import org.opensearch.index.store.IndexStoreListener;
4849
import org.opensearch.node.Node;
4950
import org.opensearch.test.IndexSettingsModule;
5051
import org.opensearch.test.NodeRoles;
@@ -360,24 +361,39 @@ protected void doRun() throws Exception {
360361
}
361362

362363
public void testIndexStoreListener() throws Exception {
363-
final AtomicInteger shardCounter = new AtomicInteger(0);
364-
final AtomicInteger indexCounter = new AtomicInteger(0);
364+
final AtomicInteger shardCounter1 = new AtomicInteger(0);
365+
final AtomicInteger shardCounter2 = new AtomicInteger(0);
366+
final AtomicInteger indexCounter1 = new AtomicInteger(0);
367+
final AtomicInteger indexCounter2 = new AtomicInteger(0);
365368
final Index index = new Index("foo", "fooUUID");
366369
final ShardId shardId = new ShardId(index, 0);
367-
final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() {
370+
final IndexStoreListener listener1 = new IndexStoreListener() {
368371
@Override
369372
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
370373
assertEquals(shardId, inShardId);
371-
shardCounter.incrementAndGet();
374+
shardCounter1.incrementAndGet();
372375
}
373376

374377
@Override
375378
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
376379
assertEquals(index, inIndex);
377-
indexCounter.incrementAndGet();
380+
indexCounter1.incrementAndGet();
378381
}
379382
};
380-
final NodeEnvironment env = newNodeEnvironment(listener);
383+
final IndexStoreListener listener2 = new IndexStoreListener() {
384+
@Override
385+
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
386+
assertEquals(shardId, inShardId);
387+
shardCounter2.incrementAndGet();
388+
}
389+
390+
@Override
391+
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
392+
assertEquals(index, inIndex);
393+
indexCounter2.incrementAndGet();
394+
}
395+
};
396+
final NodeEnvironment env = newNodeEnvironment(new IndexStoreListener.CompositeIndexStoreListener(List.of(listener1, listener2)));
381397

382398
for (Path path : env.indexPaths(index)) {
383399
Files.createDirectories(path.resolve("0"));
@@ -386,26 +402,30 @@ public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, N
386402
for (Path path : env.indexPaths(index)) {
387403
assertTrue(Files.exists(path.resolve("0")));
388404
}
389-
assertEquals(0, shardCounter.get());
405+
assertEquals(0, shardCounter1.get());
406+
assertEquals(0, shardCounter2.get());
390407

391408
env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);
392409

393410
for (Path path : env.indexPaths(index)) {
394411
assertFalse(Files.exists(path.resolve("0")));
395412
}
396-
assertEquals(1, shardCounter.get());
413+
assertEquals(1, shardCounter1.get());
414+
assertEquals(1, shardCounter2.get());
397415

398416
for (Path path : env.indexPaths(index)) {
399417
assertTrue(Files.exists(path));
400418
}
401-
assertEquals(0, indexCounter.get());
419+
assertEquals(0, indexCounter1.get());
420+
assertEquals(0, indexCounter2.get());
402421

403422
env.deleteIndexDirectorySafe(index, 5000, idxSettings);
404423

405424
for (Path path : env.indexPaths(index)) {
406425
assertFalse(Files.exists(path));
407426
}
408-
assertEquals(1, indexCounter.get());
427+
assertEquals(1, indexCounter1.get());
428+
assertEquals(1, indexCounter2.get());
409429
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
410430
env.close();
411431
}
@@ -680,7 +700,7 @@ public NodeEnvironment newNodeEnvironment() throws IOException {
680700
return newNodeEnvironment(Settings.EMPTY);
681701
}
682702

683-
public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException {
703+
public NodeEnvironment newNodeEnvironment(IndexStoreListener listener) throws IOException {
684704
Settings build = buildEnvSettings(Settings.EMPTY);
685705
return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener);
686706
}

0 commit comments

Comments
 (0)