Skip to content

Commit eaf9bda

Browse files
committed
Composite Directory POC
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
1 parent 76ae14a commit eaf9bda

File tree

12 files changed

+472
-3
lines changed

12 files changed

+472
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.apache.lucene.store.FilterDirectory;
13+
import org.opensearch.action.admin.indices.get.GetIndexRequest;
14+
import org.opensearch.action.admin.indices.get.GetIndexResponse;
15+
import org.opensearch.cluster.metadata.IndexMetadata;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.index.IndexModule;
18+
import org.opensearch.index.IndexService;
19+
import org.opensearch.index.shard.IndexShard;
20+
import org.opensearch.index.store.CompositeDirectory;
21+
import org.opensearch.indices.IndicesService;
22+
import org.opensearch.indices.replication.common.ReplicationType;
23+
24+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
25+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
26+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
27+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
28+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
29+
30+
public class CompositeDirectoryIT extends RemoteStoreBaseIntegTestCase {
31+
public void testCompositeDirectory() throws Exception {
32+
Settings settings = Settings.builder()
33+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
34+
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "compositefs")
35+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
36+
.build();
37+
assertAcked(client().admin().indices().prepareCreate("test-idx-1").setSettings(settings).get());
38+
GetIndexResponse getIndexResponse = client().admin()
39+
.indices()
40+
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
41+
.get();
42+
boolean indexServiceFound = false;
43+
String[] nodes = internalCluster().getNodeNames();
44+
for (String node : nodes) {
45+
IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexService(resolveIndex("test-idx-1"));
46+
if (indexService == null) {
47+
continue;
48+
}
49+
IndexShard shard = indexService.getShardOrNull(0);
50+
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());
51+
assertTrue(directory instanceof CompositeDirectory);
52+
indexServiceFound = true;
53+
}
54+
assertTrue(indexServiceFound);
55+
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
56+
assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE));
57+
assertEquals("true", indexSettings.get(SETTING_REMOTE_STORE_ENABLED));
58+
assertEquals(REPOSITORY_NAME, indexSettings.get(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY));
59+
assertEquals(REPOSITORY_2_NAME, indexSettings.get(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY));
60+
assertEquals("compositefs", indexSettings.get("index.store.type"));
61+
62+
ensureGreen("test-idx-1");
63+
indexData(10, false, "test-idx-1");
64+
ensureGreen("test-idx-1");
65+
}
66+
}

server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java

+12
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.opensearch.common.settings.Setting;
7575
import org.opensearch.common.settings.Settings;
7676
import org.opensearch.common.unit.TimeValue;
77+
import org.opensearch.common.util.FeatureFlags;
7778
import org.opensearch.common.xcontent.XContentHelper;
7879
import org.opensearch.core.action.ActionListener;
7980
import org.opensearch.core.common.Strings;
@@ -940,6 +941,7 @@ static Settings aggregateIndexSettings(
940941
validateStoreTypeSettings(indexSettings);
941942
validateRefreshIntervalSettings(request.settings(), clusterSettings);
942943
validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings);
944+
validateCompositeFS(request.settings());
943945

944946
return indexSettings;
945947
}
@@ -1592,4 +1594,14 @@ static void validateTranslogDurabilitySettings(Settings requestSettings, Cluster
15921594
}
15931595

15941596
}
1597+
1598+
public static void validateCompositeFS(Settings indexSettings) {
1599+
if (indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "")
1600+
.equalsIgnoreCase(IndexModule.Type.COMPOSITEFS.getSettingsKey())
1601+
&& !FeatureFlags.isEnabled(FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING)) {
1602+
throw new IllegalArgumentException(
1603+
"ERROR - Composite FS store type can be enabled only if Feature Flag for Writable Remote Index is true"
1604+
);
1605+
}
1606+
}
15951607
}

server/src/main/java/org/opensearch/index/IndexModule.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.opensearch.index.shard.IndexingOperationListener;
7575
import org.opensearch.index.shard.SearchOperationListener;
7676
import org.opensearch.index.similarity.SimilarityService;
77+
import org.opensearch.index.store.CompositeDirectoryFactory;
7778
import org.opensearch.index.store.FsDirectoryFactory;
7879
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
7980
import org.opensearch.index.store.remote.filecache.FileCache;
@@ -505,7 +506,8 @@ public enum Type {
505506
MMAPFS("mmapfs"),
506507
SIMPLEFS("simplefs"),
507508
FS("fs"),
508-
REMOTE_SNAPSHOT("remote_snapshot");
509+
REMOTE_SNAPSHOT("remote_snapshot"),
510+
COMPOSITEFS("compositefs");
509511

510512
private final String settingsKey;
511513
private final boolean deprecated;
@@ -787,6 +789,9 @@ public static Map<String, IndexStorePlugin.DirectoryFactory> createBuiltInDirect
787789
new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool, remoteStoreFileCache)
788790
);
789791
break;
792+
case COMPOSITEFS:
793+
factories.put(type.getSettingsKey(), new CompositeDirectoryFactory(repositoriesService, remoteStoreFileCache));
794+
break;
790795
default:
791796
throw new IllegalStateException("No directory factory mapping for built-in type " + type);
792797
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.index.engine.InternalEngine;
3131
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
3232
import org.opensearch.index.seqno.SequenceNumbers;
33+
import org.opensearch.index.store.CompositeDirectory;
3334
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
3435
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
3536
import org.opensearch.index.translog.Translog;
@@ -286,6 +287,10 @@ public void onFailure(Exception e) {
286287

287288
// Start the segments files upload
288289
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
290+
Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();
291+
if (directory instanceof CompositeDirectory) {
292+
((CompositeDirectory) directory).afterSyncToRemote(localSegmentsPostRefresh);
293+
}
289294
latch.await();
290295
} catch (EngineException e) {
291296
logger.warn("Exception while reading SegmentInfosSnapshot", e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.FSDirectory;
12+
import org.apache.lucene.store.FilterDirectory;
13+
import org.apache.lucene.store.IOContext;
14+
import org.apache.lucene.store.IndexInput;
15+
import org.apache.lucene.store.IndexOutput;
16+
import org.apache.lucene.store.Lock;
17+
import org.opensearch.common.blobstore.BlobContainer;
18+
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
19+
import org.opensearch.index.store.remote.filecache.FileCache;
20+
import org.opensearch.index.store.remote.utils.filetracker.FileState;
21+
import org.opensearch.index.store.remote.utils.filetracker.FileType;
22+
23+
import java.io.IOException;
24+
import java.util.Collection;
25+
import java.util.Set;
26+
27+
public class CompositeDirectory extends FilterDirectory {
28+
29+
private final FSDirectory localDirectory;
30+
private final TransferManager transferManager;
31+
private final FileCache fileCache;
32+
33+
public CompositeDirectory(FSDirectory localDirectory, BlobContainer blobContainer, FileCache fileCache) {
34+
super(localDirectory);
35+
this.localDirectory = localDirectory;
36+
this.fileCache = fileCache;
37+
this.transferManager = new CompositeDirectoryTransferManager(fileCache, blobContainer);
38+
}
39+
40+
@Override
41+
public String[] listAll() throws IOException {
42+
return localDirectory.listAll();
43+
}
44+
45+
@Override
46+
public void deleteFile(String name) throws IOException {
47+
super.deleteFile(name);
48+
transferManager.removeFileFromTracker(name);
49+
fileCache.remove(localDirectory.getDirectory().resolve(name));
50+
}
51+
52+
@Override
53+
public long fileLength(String name) throws IOException {
54+
return localDirectory.fileLength(name);
55+
}
56+
57+
@Override
58+
public IndexOutput createOutput(String name, IOContext context) throws IOException {
59+
transferManager.trackFile(name, FileState.DISK, FileType.NON_BLOCK);
60+
return localDirectory.createOutput(name, context);
61+
}
62+
63+
@Override
64+
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
65+
return localDirectory.createTempOutput(prefix, suffix, context);
66+
}
67+
68+
@Override
69+
public void sync(Collection<String> names) throws IOException {
70+
localDirectory.sync(names);
71+
}
72+
73+
@Override
74+
public void syncMetaData() throws IOException {
75+
localDirectory.syncMetaData();
76+
}
77+
78+
@Override
79+
public void rename(String source, String dest) throws IOException {
80+
localDirectory.rename(source, dest);
81+
transferManager.trackFile(dest, transferManager.getFileState(source), transferManager.getFileType(source));
82+
transferManager.removeFileFromTracker(source);
83+
}
84+
85+
@Override
86+
public IndexInput openInput(String name, IOContext context) throws IOException {
87+
if (!transferManager.isFilePresent(name)) {
88+
return localDirectory.openInput(name, context);
89+
}
90+
IndexInput indexInput = null;
91+
switch (transferManager.getFileState(name)) {
92+
case DISK:
93+
indexInput = localDirectory.openInput(name, context);
94+
break;
95+
96+
case CACHE:
97+
indexInput = fileCache.get(localDirectory.getDirectory().resolve(name)).getIndexInput();
98+
break;
99+
100+
case REMOTE_ONLY:
101+
// TODO - return an implementation of OnDemandBlockIndexInput where the fetchBlock method is implemented
102+
break;
103+
}
104+
return indexInput;
105+
}
106+
107+
@Override
108+
public Lock obtainLock(String name) throws IOException {
109+
return localDirectory.obtainLock(name);
110+
}
111+
112+
@Override
113+
public void close() throws IOException {
114+
localDirectory.close();
115+
}
116+
117+
@Override
118+
public Set<String> getPendingDeletions() throws IOException {
119+
return localDirectory.getPendingDeletions();
120+
}
121+
122+
public void afterSyncToRemote(Collection<String> files) throws IOException {
123+
for (String fileName : files) {
124+
if (transferManager.isFilePresent(fileName) && !transferManager.getFileState(fileName).equals(FileState.CACHE)) {
125+
transferManager.updateFileState(fileName, FileState.CACHE);
126+
}
127+
fileCache.put(localDirectory.getDirectory().resolve(fileName), new CachedIndexInput() {
128+
@Override
129+
public IndexInput getIndexInput() throws IOException {
130+
return localDirectory.openInput(fileName, IOContext.READ);
131+
}
132+
133+
@Override
134+
public long length() {
135+
try {
136+
return localDirectory.fileLength(fileName);
137+
} catch (IOException e) {
138+
throw new RuntimeException(e);
139+
}
140+
}
141+
142+
@Override
143+
public boolean isClosed() {
144+
return false;
145+
}
146+
147+
@Override
148+
public void close() {}
149+
});
150+
}
151+
}
152+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.Directory;
12+
import org.apache.lucene.store.FSDirectory;
13+
import org.opensearch.common.blobstore.BlobContainer;
14+
import org.opensearch.common.blobstore.BlobPath;
15+
import org.opensearch.index.IndexSettings;
16+
import org.opensearch.index.shard.ShardPath;
17+
import org.opensearch.index.store.remote.filecache.FileCache;
18+
import org.opensearch.plugins.IndexStorePlugin;
19+
import org.opensearch.repositories.RepositoriesService;
20+
import org.opensearch.repositories.Repository;
21+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
22+
23+
import java.io.IOException;
24+
import java.nio.file.Path;
25+
import java.util.function.Supplier;
26+
27+
public class CompositeDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
28+
29+
private final Supplier<RepositoriesService> repositoriesService;
30+
private final FileCache remoteStoreFileCache;
31+
32+
public CompositeDirectoryFactory(Supplier<RepositoriesService> repositoriesService, FileCache remoteStoreFileCache) {
33+
this.repositoriesService = repositoriesService;
34+
this.remoteStoreFileCache = remoteStoreFileCache;
35+
}
36+
37+
@Override
38+
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
39+
String repositoryName = indexSettings.getRemoteStoreRepository();
40+
Repository repository = repositoriesService.get().repository(repositoryName);
41+
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
42+
String shardId = String.valueOf(shardPath.getShardId().getId());
43+
String indexUUID = indexSettings.getIndex().getUUID();
44+
BlobPath blobPath = blobStoreRepository.basePath().add(indexUUID).add(shardId).add("segments").add("data");
45+
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath);
46+
47+
final Path location = shardPath.resolveIndex();
48+
final FSDirectory primaryDirectory = FSDirectory.open(location);
49+
50+
return new CompositeDirectory(primaryDirectory, blobContainer, remoteStoreFileCache);
51+
}
52+
}

0 commit comments

Comments
 (0)