Skip to content

Commit 51af2e2

Browse files
Add UTs for RemoteIndexMetadataManager (#14660)
Signed-off-by: Shivansh Arora <hishiv@amazon.com> Co-authored-by: Arpit-Bandejiya <abandeji@amazon.com>
1 parent 0684342 commit 51af2e2

File tree

2 files changed

+190
-21
lines changed

2 files changed

+190
-21
lines changed

server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java

-21
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@
2626
import org.opensearch.threadpool.ThreadPool;
2727

2828
import java.io.IOException;
29-
import java.util.HashMap;
3029
import java.util.Locale;
31-
import java.util.Map;
32-
import java.util.Objects;
3330

3431
/**
3532
* A Manager which provides APIs to write and read Index Metadata to remote store
@@ -136,24 +133,6 @@ IndexMetadata getIndexMetadata(ClusterMetadataManifest.UploadedIndexMetadata upl
136133
}
137134
}
138135

139-
/**
140-
* Fetch latest index metadata from remote cluster state
141-
*
142-
* @param clusterMetadataManifest manifest file of cluster
143-
* @param clusterUUID uuid of cluster state to refer to in remote
144-
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
145-
*/
146-
Map<String, IndexMetadata> getIndexMetadataMap(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) {
147-
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
148-
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
149-
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
150-
for (ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
151-
IndexMetadata indexMetadata = getIndexMetadata(uploadedIndexMetadata, clusterUUID);
152-
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
153-
}
154-
return remoteIndexMetadata;
155-
}
156-
157136
public TimeValue getIndexMetadataUploadTimeout() {
158137
return this.indexMetadataUploadTimeout;
159138
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.action.LatchedActionListener;
13+
import org.opensearch.cluster.metadata.AliasMetadata;
14+
import org.opensearch.cluster.metadata.IndexMetadata;
15+
import org.opensearch.common.Nullable;
16+
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
17+
import org.opensearch.common.blobstore.BlobContainer;
18+
import org.opensearch.common.blobstore.BlobPath;
19+
import org.opensearch.common.blobstore.BlobStore;
20+
import org.opensearch.common.blobstore.stream.write.WritePriority;
21+
import org.opensearch.common.settings.ClusterSettings;
22+
import org.opensearch.common.settings.Settings;
23+
import org.opensearch.common.util.TestCapturingListener;
24+
import org.opensearch.core.action.ActionListener;
25+
import org.opensearch.core.compress.Compressor;
26+
import org.opensearch.core.compress.NoneCompressor;
27+
import org.opensearch.gateway.remote.model.RemoteReadResult;
28+
import org.opensearch.index.remote.RemoteStoreUtils;
29+
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
30+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
31+
import org.opensearch.test.OpenSearchTestCase;
32+
import org.opensearch.threadpool.TestThreadPool;
33+
import org.opensearch.threadpool.ThreadPool;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
37+
import java.io.IOException;
38+
import java.util.concurrent.CountDownLatch;
39+
40+
import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS;
41+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
42+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;
43+
import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX;
44+
import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_METADATA_FORMAT;
45+
import static org.mockito.ArgumentMatchers.any;
46+
import static org.mockito.ArgumentMatchers.anyIterable;
47+
import static org.mockito.ArgumentMatchers.anyString;
48+
import static org.mockito.ArgumentMatchers.eq;
49+
import static org.mockito.Mockito.doAnswer;
50+
import static org.mockito.Mockito.doThrow;
51+
import static org.mockito.Mockito.mock;
52+
import static org.mockito.Mockito.when;
53+
54+
public class RemoteIndexMetadataManagerTests extends OpenSearchTestCase {
55+
56+
private RemoteIndexMetadataManager remoteIndexMetadataManager;
57+
private BlobStoreRepository blobStoreRepository;
58+
private BlobStoreTransferService blobStoreTransferService;
59+
private Compressor compressor;
60+
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
61+
62+
@Before
63+
public void setup() {
64+
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
65+
blobStoreRepository = mock(BlobStoreRepository.class);
66+
BlobPath blobPath = new BlobPath().add("random-path");
67+
when((blobStoreRepository.basePath())).thenReturn(blobPath);
68+
blobStoreTransferService = mock(BlobStoreTransferService.class);
69+
compressor = new NoneCompressor();
70+
when(blobStoreRepository.getCompressor()).thenReturn(compressor);
71+
remoteIndexMetadataManager = new RemoteIndexMetadataManager(
72+
clusterSettings,
73+
"test-cluster",
74+
blobStoreRepository,
75+
blobStoreTransferService,
76+
threadPool
77+
);
78+
}
79+
80+
@After
81+
public void tearDown() throws Exception {
82+
super.tearDown();
83+
threadPool.shutdown();
84+
}
85+
86+
public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception {
87+
IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10));
88+
BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class);
89+
BlobStore blobStore = mock(BlobStore.class);
90+
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
91+
TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
92+
CountDownLatch latch = new CountDownLatch(1);
93+
String expectedFilePrefix = String.join(DELIMITER, "metadata", RemoteStoreUtils.invertLong(indexMetadata.getVersion()));
94+
95+
doAnswer((invocationOnMock -> {
96+
invocationOnMock.getArgument(4, ActionListener.class).onResponse(null);
97+
return null;
98+
})).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class));
99+
100+
remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction(
101+
indexMetadata,
102+
"cluster-uuid",
103+
new LatchedActionListener<>(listener, latch)
104+
).run();
105+
latch.await();
106+
107+
assertNull(listener.getFailure());
108+
assertNotNull(listener.getResult());
109+
ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult();
110+
assertEquals(INDEX + "--" + indexMetadata.getIndex().getName(), uploadedMetadata.getComponent());
111+
String uploadedFileName = uploadedMetadata.getUploadedFilename();
112+
String[] pathTokens = uploadedFileName.split(PATH_DELIMITER);
113+
assertEquals(7, pathTokens.length);
114+
assertEquals(INDEX, pathTokens[4]);
115+
assertEquals(indexMetadata.getIndex().getUUID(), pathTokens[5]);
116+
assertTrue(pathTokens[6].startsWith(expectedFilePrefix));
117+
}
118+
119+
public void testGetAsyncIndexMetadataWriteAction_IOFailure() throws Exception {
120+
IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10));
121+
BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class);
122+
BlobStore blobStore = mock(BlobStore.class);
123+
when(blobStore.blobContainer(any())).thenReturn(blobContainer);
124+
TestCapturingListener<ClusterMetadataManifest.UploadedMetadata> listener = new TestCapturingListener<>();
125+
CountDownLatch latch = new CountDownLatch(1);
126+
127+
doAnswer((invocationOnMock -> {
128+
invocationOnMock.getArgument(4, ActionListener.class).onFailure(new IOException("failure"));
129+
return null;
130+
})).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class));
131+
132+
remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction(
133+
indexMetadata,
134+
"cluster-uuid",
135+
new LatchedActionListener<>(listener, latch)
136+
).run();
137+
latch.await();
138+
assertNull(listener.getResult());
139+
assertNotNull(listener.getFailure());
140+
assertTrue(listener.getFailure() instanceof RemoteStateTransferException);
141+
}
142+
143+
public void testGetAsyncIndexMetadataReadAction_Success() throws Exception {
144+
IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10));
145+
String fileName = randomAlphaOfLength(10);
146+
fileName = fileName + DELIMITER + '2';
147+
when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn(
148+
INDEX_METADATA_FORMAT.serialize(indexMetadata, fileName, compressor, FORMAT_PARAMS).streamInput()
149+
);
150+
TestCapturingListener<RemoteReadResult> listener = new TestCapturingListener<>();
151+
CountDownLatch latch = new CountDownLatch(1);
152+
153+
remoteIndexMetadataManager.getAsyncIndexMetadataReadAction("cluster-uuid", fileName, new LatchedActionListener<>(listener, latch))
154+
.run();
155+
latch.await();
156+
assertNull(listener.getFailure());
157+
assertNotNull(listener.getResult());
158+
assertEquals(indexMetadata, listener.getResult().getObj());
159+
}
160+
161+
public void testGetAsyncIndexMetadataReadAction_IOFailure() throws Exception {
162+
String fileName = randomAlphaOfLength(10);
163+
fileName = fileName + DELIMITER + '2';
164+
Exception exception = new IOException("testing failure");
165+
doThrow(exception).when(blobStoreTransferService).downloadBlob(anyIterable(), anyString());
166+
TestCapturingListener<RemoteReadResult> listener = new TestCapturingListener<>();
167+
CountDownLatch latch = new CountDownLatch(1);
168+
169+
remoteIndexMetadataManager.getAsyncIndexMetadataReadAction("cluster-uuid", fileName, new LatchedActionListener<>(listener, latch))
170+
.run();
171+
latch.await();
172+
assertNull(listener.getResult());
173+
assertNotNull(listener.getFailure());
174+
assertEquals(exception, listener.getFailure());
175+
}
176+
177+
private IndexMetadata getIndexMetadata(String name, @Nullable Boolean writeIndex, String... aliases) {
178+
IndexMetadata.Builder builder = IndexMetadata.builder(name)
179+
.settings(
180+
Settings.builder()
181+
.put("index.version.created", Version.CURRENT.id)
182+
.put("index.number_of_shards", 1)
183+
.put("index.number_of_replicas", 1)
184+
);
185+
for (String alias : aliases) {
186+
builder.putAlias(AliasMetadata.builder(alias).writeIndex(writeIndex).build());
187+
}
188+
return builder.build();
189+
}
190+
}

0 commit comments

Comments
 (0)