From c0a52cad741c041480baf47ee0e92cdc5ed7561a Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Thu, 29 Aug 2024 13:50:24 -0700 Subject: [PATCH 1/6] Remove cache overflow setting - TransferManager fails BlobFetchRequest on full cache Signed-off-by: Finn Carroll --- .../index/store/remote/utils/TransferManager.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index f07c4832d982c..3076bf4aba07e 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -95,6 +95,12 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio @SuppressWarnings("removal") private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) { try { + // This local file cache is ref counted and may not strictly enforce configured capacity. + // If we find available capacity is exceeded, deny further BlobFetchRequests. + if (fileCache.capacity() < fileCache.usage().usage()) { + fileCache.prune(); + throw new IOException("Local file cache capacity (" + fileCache.capacity() + ") exceeded (" + fileCache.usage().usage() + ") - BlobFetchRequest failed: " + request.getFilePath()); + } if (Files.exists(request.getFilePath()) == false) { logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); try ( From f2c92374a25401d52361a85b9323e330665efc65 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Thu, 29 Aug 2024 14:04:22 -0700 Subject: [PATCH 2/6] Remove cache overflow tests - overflow now throws exception Signed-off-by: Finn Carroll --- .../remote/utils/TransferManagerTestCase.java | 56 +++---------------- 1 file changed, 7 insertions(+), 49 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 810a4c336fdf7..4dc99bbd5ee79 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -99,56 +99,14 @@ public void testConcurrentAccess() throws Exception { } } - public void testFetchBlobWithConcurrentCacheEvictions() throws Exception { - // Submit 256 tasks to an executor with 16 threads that will each randomly - // request one of eight blobs. Given that the cache can only hold two - // blobs this will lead to a huge amount of contention and thrashing. - final ExecutorService testRunner = Executors.newFixedThreadPool(16); - try { - final List> futures = new ArrayList<>(); - for (int i = 0; i < 256; i++) { - // request an index input and immediately close it - final String blobname = "blob-" + randomIntBetween(0, 7); - futures.add(testRunner.submit(() -> { - try { - try (IndexInput indexInput = fetchBlobWithName(blobname)) { - assertIndexInputIsFunctional(indexInput); - } - } catch (Exception e) { - throw new AssertionError(e); - } - })); - } - // Wait for all threads to complete - for (Future future : futures) { - future.get(10, TimeUnit.SECONDS); - } - } finally { - assertTrue(terminate(testRunner)); - } - MatcherAssert.assertThat("Expected many evictions to happen", fileCache.stats().evictionCount(), greaterThan(0L)); - } + public void testOverflowDisabled() throws Exception { + initializeTransferManager(); + IndexInput i1 = fetchBlobWithName("1"); + IndexInput i2 = fetchBlobWithName("2"); - public void testUsageExceedsCapacity() throws Exception { - // Fetch resources that exceed the configured capacity of the cache and assert that the - // returned IndexInputs are still functional. - try (IndexInput i1 = fetchBlobWithName("1"); IndexInput i2 = fetchBlobWithName("2"); IndexInput i3 = fetchBlobWithName("3")) { - assertIndexInputIsFunctional(i1); - assertIndexInputIsFunctional(i2); - assertIndexInputIsFunctional(i3); - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB * 3)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3)); - } - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB * 3)); - // Fetch another resource which will trigger an eviction - try (IndexInput i1 = fetchBlobWithName("1")) { - assertIndexInputIsFunctional(i1); - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo((long) EIGHT_MB)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB)); - } - MatcherAssert.assertThat(fileCache.usage().activeUsage(), equalTo(0L)); - MatcherAssert.assertThat(fileCache.usage().usage(), equalTo((long) EIGHT_MB)); + assertThrows(IOException.class, () -> { + IndexInput i3 = fetchBlobWithName("3"); + }); } public void testDownloadFails() throws Exception { From 8fddd6d41a7b77ad3b24c1ae9b60be905097a72f Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Thu, 29 Aug 2024 14:09:21 -0700 Subject: [PATCH 3/6] Spotless apply Signed-off-by: Finn Carroll --- .../index/store/remote/utils/TransferManager.java | 9 ++++++++- .../store/remote/utils/TransferManagerTestCase.java | 5 +---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 3076bf4aba07e..94c25202ac90c 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -99,7 +99,14 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, Stream // If we find available capacity is exceeded, deny further BlobFetchRequests. if (fileCache.capacity() < fileCache.usage().usage()) { fileCache.prune(); - throw new IOException("Local file cache capacity (" + fileCache.capacity() + ") exceeded (" + fileCache.usage().usage() + ") - BlobFetchRequest failed: " + request.getFilePath()); + throw new IOException( + "Local file cache capacity (" + + fileCache.capacity() + + ") exceeded (" + + fileCache.usage().usage() + + ") - BlobFetchRequest failed: " + + request.getFilePath() + ); } if (Files.exists(request.getFilePath()) == false) { logger.trace("Fetching from Remote in createIndexInput of Transfer Manager"); diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 4dc99bbd5ee79..0978e8039c83e 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public abstract class TransferManagerTestCase extends OpenSearchTestCase { @@ -104,9 +103,7 @@ public void testOverflowDisabled() throws Exception { IndexInput i1 = fetchBlobWithName("1"); IndexInput i2 = fetchBlobWithName("2"); - assertThrows(IOException.class, () -> { - IndexInput i3 = fetchBlobWithName("3"); - }); + assertThrows(IOException.class, () -> { IndexInput i3 = fetchBlobWithName("3"); }); } public void testDownloadFails() throws Exception { From 9c31825fe82883ded01699970792f1764c8a26d1 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Wed, 4 Sep 2024 11:12:57 -0700 Subject: [PATCH 4/6] Changelog Signed-off-by: Finn Carroll --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d1c0c78d6db02..9979d7e0f7c19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394)) - Fix null values indexed as "null" strings in flat_object field ([#14069](https://github.com/opensearch-project/OpenSearch/pull/14069)) - Fix terms query on wildcard field returns nothing ([#15607](https://github.com/opensearch-project/OpenSearch/pull/15607)) +- Fix remote snapshot file_cache exceeding capacity ([#15077](https://github.com/opensearch-project/OpenSearch/pull/15077)) ### Security From c99c2f2793a71b3c588e829fecd0110641485ff8 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Tue, 3 Sep 2024 10:05:54 -0700 Subject: [PATCH 5/6] testFetchBlobWithConcurrentCacheEvictions Signed-off-by: Finn Carroll --- .../remote/utils/TransferManagerTestCase.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 0978e8039c83e..5c8b084501a42 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public abstract class TransferManagerTestCase extends OpenSearchTestCase { @@ -98,6 +99,43 @@ public void testConcurrentAccess() throws Exception { } } + public void testFetchBlobWithConcurrentCacheEvictions() { + // Submit 256 tasks to an executor with 16 threads that will each randomly + // request one of eight blobs. Given that the cache can only hold two + // blobs this will lead to a huge amount of contention and thrashing. + final ExecutorService testRunner = Executors.newFixedThreadPool(16); + try { + final List> futures = new ArrayList<>(); + for (int i = 0; i < 256; i++) { + // request an index input and immediately close it + final String blobname = "blob-" + randomIntBetween(0, 7); + futures.add(testRunner.submit(() -> { + try { + try (IndexInput indexInput = fetchBlobWithName(blobname)) { + assertIndexInputIsFunctional(indexInput); + } + } catch (IOException ignored) { // fetchBlobWithName may fail due to fixed capacity + } catch (Exception e) { + throw new AssertionError(e); + } + })); + } + // Wait for all threads to complete + try { + for (Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + } catch (java.util.concurrent.ExecutionException ignored) { // Index input may be null + } catch (Exception e) { + throw new AssertionError(e); + } + + } finally { + assertTrue(terminate(testRunner)); + } + MatcherAssert.assertThat("Expected many evictions to happen", fileCache.stats().evictionCount(), greaterThan(0L)); + } + public void testOverflowDisabled() throws Exception { initializeTransferManager(); IndexInput i1 = fetchBlobWithName("1"); From d3d77d1656f54fbf328d06238e825a19036855fa Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Tue, 3 Sep 2024 11:17:48 -0700 Subject: [PATCH 6/6] Spotless apply Signed-off-by: Finn Carroll --- .../index/store/remote/utils/TransferManagerTestCase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 5c8b084501a42..1eae5119ab462 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -100,10 +100,10 @@ public void testConcurrentAccess() throws Exception { } public void testFetchBlobWithConcurrentCacheEvictions() { - // Submit 256 tasks to an executor with 16 threads that will each randomly - // request one of eight blobs. Given that the cache can only hold two - // blobs this will lead to a huge amount of contention and thrashing. - final ExecutorService testRunner = Executors.newFixedThreadPool(16); + // Submit 256 tasks to an executor with 16 threads that will each randomly + // request one of eight blobs. Given that the cache can only hold two + // blobs this will lead to a huge amount of contention and thrashing. + final ExecutorService testRunner = Executors.newFixedThreadPool(16); try { final List> futures = new ArrayList<>(); for (int i = 0; i < 256; i++) {