Skip to content

Commit 230a06d

Browse files
sachinpkaleSachin Kale
and
Sachin Kale
authored
[Remote Segment Store] Use files from segmentInfosSnapshot in upload (#8487)
* Use files from segmentInfosSnapshot in upload --------- Signed-off-by: Sachin Kale <kalsac@amazon.com> Co-authored-by: Sachin Kale <kalsac@amazon.com>
1 parent 2ba1157 commit 230a06d

File tree

4 files changed

+185
-78
lines changed

4 files changed

+185
-78
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

+9
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
package org.opensearch.remotestore;
1010

1111
import org.junit.After;
12+
import org.opensearch.action.index.IndexResponse;
1213
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.common.UUIDs;
1315
import org.opensearch.common.settings.Settings;
1416
import org.opensearch.common.util.FeatureFlags;
1517
import org.opensearch.index.IndexModule;
@@ -49,6 +51,13 @@ public Settings indexSettings() {
4951
return defaultIndexSettings();
5052
}
5153

54+
IndexResponse indexSingleDoc(String indexName) {
55+
return client().prepareIndex(indexName)
56+
.setId(UUIDs.randomBase64UUID())
57+
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
58+
.get();
59+
}
60+
5261
private Settings defaultIndexSettings() {
5362
return Settings.builder()
5463
.put(super.indexSettings())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.junit.Before;
12+
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
13+
import org.opensearch.action.index.IndexResponse;
14+
import org.opensearch.action.support.PlainActionFuture;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.plugins.Plugin;
17+
import org.opensearch.test.InternalTestCluster;
18+
import org.opensearch.test.OpenSearchIntegTestCase;
19+
import org.opensearch.test.transport.MockTransportService;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
30+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
31+
32+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 3)
33+
public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase {
34+
35+
private static final String INDEX_NAME = "remote-store-test-idx-1";
36+
private static final String TOTAL_OPERATIONS = "total-operations";
37+
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> nodePlugins() {
41+
return Arrays.asList(MockTransportService.TestPlugin.class);
42+
}
43+
44+
@Before
45+
public void setup() {
46+
setupRepo();
47+
}
48+
49+
@Override
50+
public Settings indexSettings() {
51+
return remoteStoreIndexSettings(0);
52+
}
53+
54+
private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs) {
55+
long totalOperations = 0;
56+
long maxSeqNo = -1;
57+
List<IndexResponse> indexResponseList = new ArrayList<>();
58+
for (int i = 0; i < numberOfIterations; i++) {
59+
int numberOfOperations = randomIntBetween(20, 50);
60+
for (int j = 0; j < numberOfOperations; j++) {
61+
IndexResponse response = indexSingleDoc(INDEX_NAME);
62+
maxSeqNo = response.getSeqNo();
63+
indexResponseList.add(response);
64+
}
65+
totalOperations += numberOfOperations;
66+
if (invokeFlush) {
67+
flush(INDEX_NAME);
68+
} else {
69+
refresh(INDEX_NAME);
70+
}
71+
}
72+
if (deletedDocs == -1) {
73+
deletedDocs = totalOperations;
74+
}
75+
int length = indexResponseList.size();
76+
for (int j = 0; j < deletedDocs; j++) {
77+
maxSeqNo = client().prepareDelete().setIndex(INDEX_NAME).setId(indexResponseList.get(length - j - 1).getId()).get().getSeqNo();
78+
}
79+
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(flushAfterMerge).get();
80+
refresh(INDEX_NAME);
81+
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), totalOperations - deletedDocs);
82+
Map<String, Long> indexingStats = new HashMap<>();
83+
indexingStats.put(TOTAL_OPERATIONS, totalOperations);
84+
indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo);
85+
return indexingStats;
86+
}
87+
88+
private void verifyRestoredData(Map<String, Long> indexStats, long deletedDocs) {
89+
ensureYellowAndNoInitializingShards(INDEX_NAME);
90+
ensureGreen(INDEX_NAME);
91+
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) - deletedDocs);
92+
IndexResponse response = indexSingleDoc(INDEX_NAME);
93+
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
94+
refresh(INDEX_NAME);
95+
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1 - deletedDocs);
96+
}
97+
98+
private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs)
99+
throws IOException {
100+
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
101+
ensureYellowAndNoInitializingShards(INDEX_NAME);
102+
ensureGreen(INDEX_NAME);
103+
104+
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs);
105+
106+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
107+
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
108+
109+
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
110+
ensureGreen(INDEX_NAME);
111+
112+
if (deletedDocs == -1) {
113+
verifyRestoredData(indexStats, indexStats.get(TOTAL_OPERATIONS));
114+
} else {
115+
verifyRestoredData(indexStats, deletedDocs);
116+
}
117+
}
118+
119+
// Following integ tests use randomBoolean to control the number of integ tests. If we use the separate
120+
// values for each of the flags, number of integ tests become 16 in comparison to current 2.
121+
// We have run all the 16 tests on local and they run fine.
122+
public void testRestoreForceMergeSingleIteration() throws IOException {
123+
boolean invokeFLush = randomBoolean();
124+
boolean flushAfterMerge = randomBoolean();
125+
testRestoreWithMergeFlow(1, invokeFLush, flushAfterMerge, randomIntBetween(0, 10));
126+
}
127+
128+
public void testRestoreForceMergeMultipleIterations() throws IOException {
129+
boolean invokeFLush = randomBoolean();
130+
boolean flushAfterMerge = randomBoolean();
131+
testRestoreWithMergeFlow(randomIntBetween(2, 5), invokeFLush, flushAfterMerge, randomIntBetween(0, 10));
132+
}
133+
134+
public void testRestoreForceMergeMultipleIterationsDeleteAll() throws IOException {
135+
boolean invokeFLush = randomBoolean();
136+
boolean flushAfterMerge = randomBoolean();
137+
testRestoreWithMergeFlow(randomIntBetween(2, 3), invokeFLush, flushAfterMerge, -1);
138+
}
139+
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java

+1-11
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
1313
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder;
1414
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
15-
import org.opensearch.action.index.IndexResponse;
1615
import org.opensearch.cluster.ClusterState;
1716
import org.opensearch.cluster.node.DiscoveryNode;
18-
import org.opensearch.common.UUIDs;
1917
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
2018
import org.opensearch.test.OpenSearchIntegTestCase;
2119

@@ -128,7 +126,7 @@ private void indexDocs() {
128126
}
129127
int numberOfOperations = randomIntBetween(20, 50);
130128
for (int j = 0; j < numberOfOperations; j++) {
131-
indexSingleDoc();
129+
indexSingleDoc(INDEX_NAME);
132130
}
133131
}
134132
}
@@ -149,12 +147,4 @@ private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) {
149147
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
150148
assertTrue(stats.uploadTimeMovingAverage > 0);
151149
}
152-
153-
private IndexResponse indexSingleDoc() {
154-
return client().prepareIndex(INDEX_NAME)
155-
.setId(UUIDs.randomBase64UUID())
156-
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
157-
.get();
158-
}
159-
160150
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+36-67
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.logging.log4j.message.ParameterizedMessage;
1313
import org.apache.lucene.codecs.CodecUtil;
1414
import org.apache.lucene.index.CorruptIndexException;
15-
import org.apache.lucene.index.IndexFileNames;
1615
import org.apache.lucene.index.SegmentInfos;
1716
import org.apache.lucene.store.Directory;
1817
import org.apache.lucene.store.FilterDirectory;
@@ -40,13 +39,10 @@
4039

4140
import java.io.IOException;
4241
import java.util.Collection;
43-
import java.util.Comparator;
4442
import java.util.HashMap;
4543
import java.util.HashSet;
4644
import java.util.Iterator;
47-
import java.util.List;
4845
import java.util.Map;
49-
import java.util.Optional;
5046
import java.util.Set;
5147
import java.util.concurrent.CountDownLatch;
5248
import java.util.concurrent.atomic.AtomicBoolean;
@@ -216,77 +212,50 @@ private synchronized boolean syncSegments() {
216212
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
217213
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);
218214

219-
List<String> segmentInfosFiles = localSegmentsPostRefresh.stream()
220-
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
221-
.collect(Collectors.toList());
222-
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
223-
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
224-
225-
if (latestSegmentInfos.isPresent()) {
226-
// SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain
227-
// all the segments from last commit if they are merged away but not yet committed.
228-
// Each metadata file in the remote segment store represents a commit and the following
229-
// statement keeps sure that each metadata will always contain all the segments from last commit + refreshed
230-
// segments.
231-
SegmentInfos segmentCommitInfos;
232-
try {
233-
segmentCommitInfos = SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get());
234-
} catch (Exception e) {
235-
// Seeing discrepancy in segment infos and files on disk. SegmentInfosSnapshot is returning
236-
// a segment_N file which does not exist on local disk.
237-
logger.error("Exception occurred while SegmentInfos.readCommit(..)", e);
238-
logger.error("segmentInfosFiles={} diskFiles={}", localSegmentsPostRefresh, storeDirectory.listAll());
239-
throw e;
240-
}
241-
localSegmentsPostRefresh.addAll(segmentCommitInfos.files(true));
242-
segmentInfosFiles.stream()
243-
.filter(file -> !file.equals(latestSegmentInfos.get()))
244-
.forEach(localSegmentsPostRefresh::remove);
245-
246-
// Create a map of file name to size and update the refresh segment tracker
247-
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
248-
CountDownLatch latch = new CountDownLatch(1);
249-
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
250-
@Override
251-
public void onResponse(Void unused) {
252-
try {
253-
// Start metadata file upload
254-
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
255-
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
256-
onSuccessfulSegmentsSync(
257-
refreshTimeMs,
258-
refreshClockTimeMs,
259-
refreshSeqNo,
260-
lastRefreshedCheckpoint,
261-
checkpoint
262-
);
263-
// At this point since we have uploaded new segments, segment infos and segment metadata file,
264-
// along with marking minSeqNoToKeep, upload has succeeded completely.
265-
successful.set(true);
266-
} catch (Exception e) {
267-
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
268-
// in the next refresh. This should not affect durability of the indexed data after remote trans-log
269-
// integration.
270-
logger.warn("Exception in post new segment upload actions", e);
271-
}
215+
// Create a map of file name to size and update the refresh segment tracker
216+
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
217+
CountDownLatch latch = new CountDownLatch(1);
218+
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
219+
@Override
220+
public void onResponse(Void unused) {
221+
try {
222+
// Start metadata file upload
223+
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
224+
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
225+
onSuccessfulSegmentsSync(
226+
refreshTimeMs,
227+
refreshClockTimeMs,
228+
refreshSeqNo,
229+
lastRefreshedCheckpoint,
230+
checkpoint
231+
);
232+
// At this point since we have uploaded new segments, segment infos and segment metadata file,
233+
// along with marking minSeqNoToKeep, upload has succeeded completely.
234+
successful.set(true);
235+
} catch (Exception e) {
236+
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
237+
// as part of exponential back-off retry logic. This should not affect durability of the indexed data
238+
// with remote trans-log integration.
239+
logger.warn("Exception in post new segment upload actions", e);
272240
}
241+
}
273242

274-
@Override
275-
public void onFailure(Exception e) {
276-
logger.warn("Exception while uploading new segments to the remote segment store", e);
277-
}
278-
}, latch);
243+
@Override
244+
public void onFailure(Exception e) {
245+
logger.warn("Exception while uploading new segments to the remote segment store", e);
246+
}
247+
}, latch);
279248

280-
// Start the segments files upload
281-
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
282-
latch.await();
283-
}
249+
// Start the segments files upload
250+
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
251+
latch.await();
284252
} catch (EngineException e) {
285253
logger.warn("Exception while reading SegmentInfosSnapshot", e);
286254
}
287255
} catch (IOException e) {
288256
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
289-
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.
257+
// as part of exponential back-off retry logic. This should not affect durability of the indexed data
258+
// with remote trans-log integration.
290259
logger.warn("Exception while uploading new segments to the remote segment store", e);
291260
}
292261
} catch (Throwable t) {

0 commit comments

Comments
 (0)