Skip to content

Commit eae2518

Browse files
Merge remote-tracking branch 'upstream/main' into searchonly-2
2 parents 084956f + ffa46ca commit eae2518

File tree

21 files changed

+1990
-141
lines changed

21 files changed

+1990
-141
lines changed

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/scale/searchonly/ScaleSearchOnlyIT.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ public void testScaleDownToSearchOnly() throws Exception {
106106
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings(TEST_INDEX).get();
107107
assertTrue(settingsResponse.getSetting(TEST_INDEX, IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey()).equals("true"));
108108

109-
SearchResponse searchResponse = client().prepareSearch(TEST_INDEX).get();
110-
assertHitCount(searchResponse, 10);
109+
assertBusy(() -> {
110+
SearchResponse searchResponse = client().prepareSearch(TEST_INDEX).get();
111+
assertHitCount(searchResponse, 10);
112+
}, 30, TimeUnit.SECONDS);
111113

112114
try {
113115
client().prepareIndex(TEST_INDEX).setId("new-doc").setSource("field1", "new-value").get();

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@
4949
import java.util.stream.Collectors;
5050

5151
import static java.util.Arrays.asList;
52-
import static org.opensearch.test.OpenSearchIntegTestCase.client;
53-
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
5452
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
5553

5654
public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {

server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexSegmentReplicationIT.java

+1,665
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java

+114-119
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java

-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
import java.nio.file.Path;
1818

19-
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
20-
2119
/**
2220
* This class runs Segment Replication Integ test suite with remote store enabled.
2321
*/

server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ private IndexMetadata.Builder updateInSyncAllocations(
242242
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
243243
} else {
244244
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource
245-
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource;
245+
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource
246+
|| recoverySource instanceof RecoverySource.ExistingStoreRecoverySource) : recoverySource;
246247
allocationId = updates.initializedPrimary.allocationId().getId();
247248
}
248249
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.Queue;
3434
import java.util.Set;
3535

36+
import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;
37+
3638
/**
3739
* A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations
3840
* for remote shards within the cluster.
@@ -345,7 +347,8 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
345347
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
346348
// to re-fetch any shard blocks from the repository.
347349
if (shard.primary()) {
348-
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false) {
350+
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false
351+
&& isPartialIndex(allocation.metadata().getIndexSafe(shard.index())) == false) {
349352
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
350353
}
351354
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,11 @@ public boolean shouldPeriodicallyFlush() {
368368
@Override
369369
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
370370
ensureOpen();
371+
// Skip flushing for indices with partial locality (warm indices)
372+
// For these indices, we don't need to commit as we will sync from the remote store on re-open
373+
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
374+
return;
375+
}
371376
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
372377
try (final ReleasableLock lock = readLock.acquire()) {
373378
ensureOpen();
@@ -442,7 +447,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
442447
latestSegmentInfos.changed();
443448
}
444449
try {
445-
commitSegmentInfos(latestSegmentInfos);
450+
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
451+
commitSegmentInfos(latestSegmentInfos);
452+
}
446453
} catch (IOException e) {
447454
// mark the store corrupted unless we are closing as result of engine failure.
448455
// in this case Engine#failShard will handle store corruption.

server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java

+48
Original file line numberDiff line numberDiff line change
@@ -580,5 +580,53 @@ public int hashCode() {
580580
directoryFileTransferTrackerStats
581581
);
582582
}
583+
584+
@Override
585+
public String toString() {
586+
return "Stats{"
587+
+ "shardId="
588+
+ shardId
589+
+ ", localRefreshClockTimeMs="
590+
+ localRefreshClockTimeMs
591+
+ ", remoteRefreshClockTimeMs="
592+
+ remoteRefreshClockTimeMs
593+
+ ", refreshTimeLagMs="
594+
+ refreshTimeLagMs
595+
+ ", localRefreshNumber="
596+
+ localRefreshNumber
597+
+ ", remoteRefreshNumber="
598+
+ remoteRefreshNumber
599+
+ ", uploadBytesStarted="
600+
+ uploadBytesStarted
601+
+ ", uploadBytesFailed="
602+
+ uploadBytesFailed
603+
+ ", uploadBytesSucceeded="
604+
+ uploadBytesSucceeded
605+
+ ", totalUploadsStarted="
606+
+ totalUploadsStarted
607+
+ ", totalUploadsFailed="
608+
+ totalUploadsFailed
609+
+ ", totalUploadsSucceeded="
610+
+ totalUploadsSucceeded
611+
+ ", rejectionCount="
612+
+ rejectionCount
613+
+ ", consecutiveFailuresCount="
614+
+ consecutiveFailuresCount
615+
+ ", lastSuccessfulRemoteRefreshBytes="
616+
+ lastSuccessfulRemoteRefreshBytes
617+
+ ", uploadBytesMovingAverage="
618+
+ uploadBytesMovingAverage
619+
+ ", uploadBytesPerSecMovingAverage="
620+
+ uploadBytesPerSecMovingAverage
621+
+ ", totalUploadTimeInMs="
622+
+ totalUploadTimeInMs
623+
+ ", uploadTimeMovingAverage="
624+
+ uploadTimeMovingAverage
625+
+ ", bytesLag="
626+
+ bytesLag
627+
+ ", directoryFileTransferTrackerStats="
628+
+ directoryFileTransferTrackerStats
629+
+ '}';
630+
}
583631
}
584632
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -5142,7 +5142,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51425142
} else {
51435143
storeDirectory = store.directory();
51445144
}
5145-
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5145+
if (indexSettings.isStoreLocalityPartial() == false) {
5146+
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5147+
}
51465148

51475149
if (remoteSegmentMetadata != null) {
51485150
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
@@ -5158,7 +5160,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
51585160
}
51595161
}
51605162
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5161-
: "There should not be any segments file in the dir";
5163+
|| indexSettings.isStoreLocalityPartial() : "There should not be any segments file in the dir";
51625164
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
51635165
}
51645166
syncSegmentSuccess = true;

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ private void uploadNewSegments(
459459
batchUploadListener.onFailure(ex);
460460
});
461461
statsListener.beforeUpload(src);
462-
remoteDirectory.copyFrom(storeDirectory, src, IOContext.READONCE, aggregatedListener, isLowPriorityUpload());
462+
remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload());
463463
}
464464
}
465465

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

+54-1
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@
3434
import java.util.Arrays;
3535
import java.util.Collection;
3636
import java.util.HashSet;
37+
import java.util.List;
3738
import java.util.Set;
3839
import java.util.stream.Collectors;
40+
import java.util.stream.Stream;
41+
42+
import static org.apache.lucene.index.IndexFileNames.SEGMENTS;
3943

4044
/**
4145
* Composite Directory will contain both local and remote directory
@@ -74,12 +78,37 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
7478
);
7579
}
7680

81+
/**
82+
* Returns names of all files stored in local directory
83+
* @throws IOException in case of I/O error
84+
*/
85+
private String[] listLocalFiles() throws IOException {
86+
ensureOpen();
87+
logger.trace("Composite Directory[{}]: listLocalOnly() called", this::toString);
88+
return localDirectory.listAll();
89+
}
90+
91+
/**
92+
* Returns a list of names of all block files stored in the local directory for a given file,
93+
* including the original file itself if present.
94+
*
95+
* @param fileName The name of the file to search for, along with its associated block files.
96+
* @return A list of file names, including the original file (if present) and all its block files.
97+
* @throws IOException in case of I/O error while listing files.
98+
*/
99+
private List<String> listBlockFiles(String fileName) throws IOException {
100+
return Stream.of(listLocalFiles())
101+
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_IDENTIFIER))
102+
.collect(Collectors.toList());
103+
}
104+
77105
/**
78106
* Returns names of all files stored in this directory in sorted order
79107
* Does not include locally stored block files (having _block_ in their names) and files pending deletion
80108
*
81109
* @throws IOException in case of I/O error
82110
*/
111+
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17527
83112
@Override
84113
public String[] listAll() throws IOException {
85114
ensureOpen();
@@ -105,6 +134,7 @@ public String[] listAll() throws IOException {
105134
* Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory
106135
* @param name the name of an existing file.
107136
* @throws IOException in case of I/O error
137+
* @throws NoSuchFileException when file does not exist in the directory
108138
*/
109139
@Override
110140
public void deleteFile(String name) throws IOException {
@@ -115,7 +145,21 @@ public void deleteFile(String name) throws IOException {
115145
} else if (Arrays.asList(listAll()).contains(name) == false) {
116146
throw new NoSuchFileException("File " + name + " not found in directory");
117147
} else {
118-
fileCache.remove(getFilePath(name));
148+
List<String> blockFiles = listBlockFiles(name);
149+
if (blockFiles.isEmpty()) {
150+
// Remove this condition when this issue is addressed.
151+
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17526
152+
logger.debug("The file [{}] or its block files do not exist in local directory", name);
153+
} else {
154+
for (String blockFile : blockFiles) {
155+
if (fileCache.get(getFilePath(blockFile)) == null) {
156+
logger.debug("The file [{}] exists in local but not part of FileCache, deleting it from local", blockFile);
157+
localDirectory.deleteFile(blockFile);
158+
} else {
159+
fileCache.remove(getFilePath(blockFile));
160+
}
161+
}
162+
}
119163
}
120164
}
121165

@@ -254,6 +298,15 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
254298
public void close() throws IOException {
255299
ensureOpen();
256300
logger.trace("Composite Directory[{}]: close() called", this::toString);
301+
String[] localFiles = listLocalFiles();
302+
for (String localFile : localFiles) {
303+
// Delete segments_N file with ref count 1 created during index creation on replica shards
304+
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17534
305+
if (localFile.startsWith(SEGMENTS)) {
306+
fileCache.remove(getFilePath(localFile));
307+
}
308+
}
309+
fileCache.prune();
257310
localDirectory.close();
258311
}
259312

server/src/main/java/org/opensearch/index/store/RemoteDirectory.java

+1
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ private void uploadBlob(
383383
ActionListener<Void> listener,
384384
boolean lowPriorityUpload
385385
) throws Exception {
386+
assert ioContext != IOContext.READONCE : "Remote upload will fail with IoContext.READONCE";
386387
long expectedChecksum = calculateChecksumOfChecksum(from, src);
387388
long contentLength;
388389
try (IndexInput indexInput = from.openInput(src, ioContext)) {

server/src/main/java/org/opensearch/index/store/remote/utils/FileTypeUtils.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
@ExperimentalApi
1919
public class FileTypeUtils {
2020

21+
public static String BLOCK_FILE_IDENTIFIER = "_block_";
22+
2123
public static boolean isTempFile(String name) {
2224
return name.endsWith(".tmp");
2325
}
2426

2527
public static boolean isBlockFile(String name) {
26-
return name.contains("_block_");
28+
return name.contains(BLOCK_FILE_IDENTIFIER);
2729
}
2830

2931
public static boolean isExtraFSFile(String name) {

server/src/main/java/org/opensearch/index/store/remote/utils/cache/LRUCache.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,20 @@ public CacheStats stats() {
310310
public void logCurrentState() {
311311
lock.lock();
312312
try {
313-
String allFiles = "\n";
313+
final StringBuilder allFiles = new StringBuilder("\n");
314314
for (Map.Entry<K, Node<K, V>> entry : data.entrySet()) {
315315
String path = entry.getKey().toString();
316316
String file = path.substring(path.lastIndexOf('/'));
317-
allFiles += file + " [RefCount: " + entry.getValue().refCount + " , Weight: " + entry.getValue().weight + " ]\n";
317+
allFiles.append(file)
318+
.append(" [RefCount: ")
319+
.append(entry.getValue().refCount)
320+
.append(" , Weight: ")
321+
.append(entry.getValue().weight)
322+
.append(" ]\n");
323+
}
324+
if (allFiles.length() > 1) {
325+
logger.trace(() -> "Cache entries : " + allFiles);
318326
}
319-
logger.trace("Cache entries : " + allFiles);
320327
} finally {
321328
lock.unlock();
322329
}

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,11 @@ public CacheStats stats() {
190190
public void logCurrentState() {
191191
int i = 0;
192192
for (RefCountedCache<K, V> cache : table) {
193-
logger.trace("SegmentedCache " + i);
194-
((LRUCache<K, V>) cache).logCurrentState();
193+
if (cache.size() > 0) {
194+
final int segmentIndex = i;
195+
logger.trace(() -> "SegmentedCache " + segmentIndex);
196+
((LRUCache<K, V>) cache).logCurrentState();
197+
}
195198
i++;
196199
}
197200
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import java.io.IOException;
3838
import java.io.UncheckedIOException;
39+
import java.util.Collections;
3940
import java.util.List;
4041
import java.util.Locale;
4142
import java.util.Set;
@@ -202,6 +203,12 @@ public void startReplication(ActionListener<Void> listener, BiConsumer<Replicati
202203
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
203204
cancellableThreads.checkForCancel();
204205
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
206+
207+
// Return an empty list for warm indices, In this case, replica shards don't require downloading files from remote storage
208+
// as replicas will sync all files from remote in case of failure.
209+
if (indexShard.indexSettings().isStoreLocalityPartial()) {
210+
return Collections.emptyList();
211+
}
205212
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
206213
// local files
207214
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());

server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java

+31
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,37 @@ public void testGetPinnedTimestampLockedFilesWithPinnedTimestampsDifferentPrefix
993993
assertEquals(0, metadataFilePinnedTimestampCache.size());
994994
}
995995

996+
/**
997+
* This test checks the case when a stale writer is uploading metadata files with higher timestamp, but lower primary
998+
* term.
999+
*/
1000+
public void testGetPinnedTimestampLockedFilesForDivergentWrites() {
1001+
setupRemotePinnedTimestampFeature(true);
1002+
1003+
Map<Long, String> metadataFilePinnedTimestampCache = new HashMap<>();
1004+
1005+
// Pinned timestamp 7000
1006+
// Primary Term - Timestamp in md file
1007+
// 6 - 7002
1008+
// 3 - 6999
1009+
// 4 - 6998
1010+
// 5 - 6995
1011+
// 5 - 6990
1012+
Tuple<Map<Long, String>, Set<String>> metadataAndLocks = testGetPinnedTimestampLockedFilesWithPinnedTimestamps(
1013+
Map.of(7002L, 6L, 6999L, 3L, 6998L, 4L, 6995L, 5L, 6990L, 5L),
1014+
Set.of(4000L, 5000L, 6000L, 7000L),
1015+
metadataFilePinnedTimestampCache
1016+
);
1017+
Map<Long, String> metadataFiles = metadataAndLocks.v1();
1018+
Set<String> implicitLockedFiles = metadataAndLocks.v2();
1019+
1020+
assertEquals(1, implicitLockedFiles.size());
1021+
assertTrue(implicitLockedFiles.contains(metadataFiles.get(6995L)));
1022+
// Now we cache all the matches except the last one.
1023+
assertEquals(1, metadataFilePinnedTimestampCache.size());
1024+
assertEquals(metadataFiles.get(6995L), metadataFilePinnedTimestampCache.get(7000L));
1025+
}
1026+
9961027
public void testFilterOutMetadataFilesBasedOnAgeFeatureDisabled() {
9971028
setupRemotePinnedTimestampFeature(false);
9981029
List<String> metadataFiles = new ArrayList<>();

0 commit comments

Comments
 (0)