Skip to content

Commit b42a50e

Browse files
committed
[Writable Warm] Composite Directory implementation and integrating it with FileCache (opensearch-project#12782)
* Composite Directory POC Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Refactor TransferManager interface to RemoteStoreFileTrackerAdapter Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Implement block level fetch for Composite Directory Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Removed CACHE state from FileTracker Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Fixes after latest pull Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add new setting for warm, remove store type setting, FileTracker and RemoteStoreFileTrackerAdapter, CompositeDirectoryFactory and update Composite Directory implementation Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Modify TransferManager - replace BlobContainer with Functional Interface to fetch an InputStream instead Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Reuse OnDemandBlockSnapshotIndexInput instead of OnDemandBlockCompositeIndexInput Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Modify constructors to avoid breaking public api contract and code review fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add experimental annotations for newly created classes and review comment fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Use ref count as a temporary measure to prevent file from eviction until uploaded to Remote Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Remove method level locks Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Handle tmp file deletion Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Nit fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Handle delete and close in Composite Directory, log current state of FileCache and correct it's clear method and modify unit and integration tests as per review comments Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Refactor usages of WRITEABLE_REMOTE_INDEX_SETTING to TIERED_REMOTE_INDEX_SETTING Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add tests for FileCachedIndexInput and review comment fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add additional IT for feature flag disabled Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Move setting for Partial Locality type behind Feature Flag, fix bug for ref count via cloneMap in FullFileCachedIndexInput and other review fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Minor test and nit fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add javadocs for FullFileCachedIndexInput Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Minor precommit fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> --------- Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
1 parent 08bcf43 commit b42a50e

35 files changed

+1889
-209
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
99
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
1010
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
11+
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
1112

1213
### Dependencies
1314
- Update to Apache Lucene 9.11.0 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.apache.lucene.store.Directory;
14+
import org.apache.lucene.store.FilterDirectory;
15+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
16+
import org.opensearch.action.admin.indices.get.GetIndexRequest;
17+
import org.opensearch.action.admin.indices.get.GetIndexResponse;
18+
import org.opensearch.action.search.SearchResponse;
19+
import org.opensearch.cluster.metadata.IndexMetadata;
20+
import org.opensearch.common.settings.Settings;
21+
import org.opensearch.common.settings.SettingsException;
22+
import org.opensearch.common.util.FeatureFlags;
23+
import org.opensearch.index.IndexModule;
24+
import org.opensearch.index.query.QueryBuilders;
25+
import org.opensearch.index.shard.IndexShard;
26+
import org.opensearch.index.store.CompositeDirectory;
27+
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
28+
import org.opensearch.index.store.remote.filecache.FileCache;
29+
import org.opensearch.index.store.remote.utils.FileTypeUtils;
30+
import org.opensearch.indices.IndicesService;
31+
import org.opensearch.node.Node;
32+
import org.opensearch.test.InternalTestCluster;
33+
import org.opensearch.test.OpenSearchIntegTestCase;
34+
35+
import java.util.Arrays;
36+
import java.util.HashSet;
37+
import java.util.Set;
38+
import java.util.stream.Collectors;
39+
40+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
41+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
42+
43+
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
44+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
45+
// Uncomment the below line to enable trace level logs for this test for better debugging
46+
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
47+
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {
48+
49+
protected static final String INDEX_NAME = "test-idx-1";
50+
protected static final int NUM_DOCS_IN_BULK = 1000;
51+
52+
/*
53+
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
54+
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
55+
*/
56+
@Override
57+
protected boolean addMockIndexStorePlugin() {
58+
return false;
59+
}
60+
61+
@Override
62+
protected Settings featureFlagSettings() {
63+
Settings.Builder featureSettings = Settings.builder();
64+
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
65+
return featureSettings.build();
66+
}
67+
68+
public void testWritableWarmFeatureFlagDisabled() {
69+
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build();
70+
InternalTestCluster internalTestCluster = internalCluster();
71+
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
72+
internalTestCluster.startDataOnlyNode(clusterSettings);
73+
74+
Settings indexSettings = Settings.builder()
75+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
76+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
77+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
78+
.build();
79+
80+
try {
81+
prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
82+
fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled");
83+
} catch (SettingsException ex) {
84+
assertEquals(
85+
"unknown setting ["
86+
+ IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()
87+
+ "] please check that any required plugins are installed, or check the "
88+
+ "breaking changes documentation for removed settings",
89+
ex.getMessage()
90+
);
91+
}
92+
}
93+
94+
public void testWritableWarmBasic() throws Exception {
95+
InternalTestCluster internalTestCluster = internalCluster();
96+
internalTestCluster.startClusterManagerOnlyNode();
97+
internalTestCluster.startDataOnlyNode();
98+
Settings settings = Settings.builder()
99+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
100+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
101+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
102+
.build();
103+
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get());
104+
105+
// Verify from the cluster settings if the data locality is partial
106+
GetIndexResponse getIndexResponse = client().admin()
107+
.indices()
108+
.getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true))
109+
.get();
110+
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME);
111+
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
112+
113+
// Ingesting some docs
114+
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
115+
flushAndRefresh(INDEX_NAME);
116+
117+
// ensuring cluster is green after performing force-merge
118+
ensureGreen();
119+
120+
SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
121+
// Asserting that search returns same number of docs as ingested
122+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
123+
124+
// Ingesting docs again before force merge
125+
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
126+
flushAndRefresh(INDEX_NAME);
127+
128+
FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
129+
IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class)
130+
.indexService(resolveIndex(INDEX_NAME))
131+
.getShardOrNull(0);
132+
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());
133+
134+
// Force merging the index
135+
Set<String> filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll()));
136+
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get();
137+
flushAndRefresh(INDEX_NAME);
138+
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll()));
139+
140+
Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream()
141+
.filter(filesAfterMerge::contains)
142+
.filter(file -> !FileTypeUtils.isLockFile(file))
143+
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
144+
.collect(Collectors.toUnmodifiableSet());
145+
146+
// Asserting that after merge all the files from previous gen are no more part of the directory
147+
assertTrue(filesFromPreviousGenStillPresent.isEmpty());
148+
149+
// Asserting that files from previous gen are not present in File Cache as well
150+
filesBeforeMerge.stream()
151+
.filter(file -> !FileTypeUtils.isLockFile(file))
152+
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
153+
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file))));
154+
155+
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
156+
// leaks
157+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
158+
fileCache.prune();
159+
}
160+
}

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.common.annotation.PublicApi;
4242
import org.opensearch.common.logging.Loggers;
4343
import org.opensearch.common.settings.Setting.Property;
44+
import org.opensearch.common.util.FeatureFlags;
4445
import org.opensearch.index.IndexModule;
4546
import org.opensearch.index.IndexSettings;
4647
import org.opensearch.index.IndexSortConfig;
@@ -260,7 +261,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
260261
* is ready for production release, the feature flag can be removed, and the
261262
* setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}.
262263
*/
263-
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of();
264+
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
265+
FeatureFlags.TIERED_REMOTE_INDEX,
266+
List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING)
267+
);
264268

265269
public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);
266270

server/src/main/java/org/opensearch/index/IndexModule.java

+76-2
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@
107107
import java.util.function.Function;
108108
import java.util.function.Supplier;
109109

110+
import static org.apache.logging.log4j.util.Strings.toRootUpperCase;
111+
110112
/**
111113
* IndexModule represents the central extension point for index level custom implementations like:
112114
* <ul>
@@ -141,6 +143,17 @@ public final class IndexModule {
141143
Property.NodeScope
142144
);
143145

146+
/**
147+
* Index setting which used to determine how the data is cached locally fully or partially
148+
*/
149+
public static final Setting<DataLocalityType> INDEX_STORE_LOCALITY_SETTING = new Setting<>(
150+
"index.store.data_locality",
151+
DataLocalityType.FULL.name(),
152+
DataLocalityType::getValueOf,
153+
Property.IndexScope,
154+
Property.NodeScope
155+
);
156+
144157
public static final Setting<String> INDEX_RECOVERY_TYPE_SETTING = new Setting<>(
145158
"index.recovery.type",
146159
"",
@@ -297,6 +310,7 @@ public Iterator<Setting<?>> settings() {
297310
private final AtomicBoolean frozen = new AtomicBoolean(false);
298311
private final BooleanSupplier allowExpensiveQueries;
299312
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
313+
private final FileCache fileCache;
300314

301315
/**
302316
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -315,7 +329,8 @@ public IndexModule(
315329
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
316330
final BooleanSupplier allowExpensiveQueries,
317331
final IndexNameExpressionResolver expressionResolver,
318-
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
332+
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
333+
final FileCache fileCache
319334
) {
320335
this.indexSettings = indexSettings;
321336
this.analysisRegistry = analysisRegistry;
@@ -327,6 +342,30 @@ public IndexModule(
327342
this.allowExpensiveQueries = allowExpensiveQueries;
328343
this.expressionResolver = expressionResolver;
329344
this.recoveryStateFactories = recoveryStateFactories;
345+
this.fileCache = fileCache;
346+
}
347+
348+
public IndexModule(
349+
final IndexSettings indexSettings,
350+
final AnalysisRegistry analysisRegistry,
351+
final EngineFactory engineFactory,
352+
final EngineConfigFactory engineConfigFactory,
353+
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
354+
final BooleanSupplier allowExpensiveQueries,
355+
final IndexNameExpressionResolver expressionResolver,
356+
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
357+
) {
358+
this(
359+
indexSettings,
360+
analysisRegistry,
361+
engineFactory,
362+
engineConfigFactory,
363+
directoryFactories,
364+
allowExpensiveQueries,
365+
expressionResolver,
366+
recoveryStateFactories,
367+
null
368+
);
330369
}
331370

332371
/**
@@ -577,6 +616,40 @@ public boolean match(Settings settings) {
577616
}
578617
}
579618

619+
/**
620+
* Indicates the locality of the data - whether it will be cached fully or partially
621+
*/
622+
public enum DataLocalityType {
623+
/**
624+
* Indicates that all the data will be cached locally
625+
*/
626+
FULL,
627+
/**
628+
* Indicates that only a subset of the data will be cached locally
629+
*/
630+
PARTIAL;
631+
632+
private static final Map<String, DataLocalityType> LOCALITY_TYPES;
633+
634+
static {
635+
final Map<String, DataLocalityType> localityTypes = new HashMap<>(values().length);
636+
for (final DataLocalityType dataLocalityType : values()) {
637+
localityTypes.put(dataLocalityType.name(), dataLocalityType);
638+
}
639+
LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes);
640+
}
641+
642+
public static DataLocalityType getValueOf(final String localityType) {
643+
Objects.requireNonNull(localityType, "No locality type given.");
644+
final String localityTypeName = toRootUpperCase(localityType.trim());
645+
final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName);
646+
if (type != null) {
647+
return type;
648+
}
649+
throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "].");
650+
}
651+
}
652+
580653
public static Type defaultStoreType(final boolean allowMmap) {
581654
if (allowMmap && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
582655
return Type.HYBRIDFS;
@@ -665,7 +738,8 @@ public IndexService newIndexService(
665738
translogFactorySupplier,
666739
clusterDefaultRefreshIntervalSupplier,
667740
recoverySettings,
668-
remoteStoreSettings
741+
remoteStoreSettings,
742+
fileCache
669743
);
670744
success = true;
671745
return indexService;

0 commit comments

Comments
 (0)