diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 1a8911c657fac..7010b28090b32 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -460,7 +460,7 @@ private void uploadNewSegments( batchUploadListener.onFailure(ex); }); statsListener.beforeUpload(src); - remoteDirectory.copyFrom(storeDirectory, src, IOContext.READONCE, aggregatedListener, isLowPriorityUpload()); + remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload()); } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index aa856aa22313d..726e94e6d9448 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -357,6 +357,7 @@ private void uploadBlob( ActionListener listener, boolean lowPriorityUpload ) throws Exception { + assert ioContext != IOContext.READONCE : "Remote upload will fail with IoContext.READONCE"; long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; try (IndexInput indexInput = from.openInput(src, ioContext)) { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index ed79a2b0bd8e4..b31acdffbded9 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -94,7 +94,7 @@ public void testCopyFrom() throws IOException, InterruptedException { storeDirectory, filename, filename, - IOContext.READ, + IOContext.DEFAULT, () -> postUploadInvoked.set(true), new ActionListener<>() { @Override @@ -132,7 +132,7 @@ public void testCopyFromWithException() throws IOException, InterruptedException storeDirectory, filename, filename, - IOContext.READ, + IOContext.DEFAULT, () -> postUploadInvoked.set(true), new ActionListener<>() { @Override diff --git a/test/framework/src/main/java/org/opensearch/test/store/MockFSDirectoryFactory.java b/test/framework/src/main/java/org/opensearch/test/store/MockFSDirectoryFactory.java index 9c42ea2672601..d8279170ddd92 100644 --- a/test/framework/src/main/java/org/opensearch/test/store/MockFSDirectoryFactory.java +++ b/test/framework/src/main/java/org/opensearch/test/store/MockFSDirectoryFactory.java @@ -36,7 +36,10 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CheckIndex; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.tests.store.MockDirectoryWrapper; @@ -203,6 +206,19 @@ public synchronized void crash() throws IOException { public Set getPendingDeletions() throws IOException { return in.getPendingDeletions(); } + + // In remote store feature, the upload flow is async and IndexInput can be opened and closed + // by different threads, so we always use IOContext.DEFAULT. + // But MockDirectoryWrapper throws an exception if segments_N fil is opened with any IOContext other than READONCE. + // Following change is temporary override to avoid the test failures. We should fix the multiple thread access + // in remote store upload flow. + @Override + public synchronized IndexInput openInput(String name, IOContext context) throws IOException { + if (name.startsWith(IndexFileNames.SEGMENTS)) { + context = IOContext.READONCE; + } + return super.openInput(name, context); + } } static final class CloseableDirectory implements Closeable {