Skip to content

Commit a098dc1

Browse files
opensearch-trigger-bot[bot]github-actions[bot]
authored andcommitted
[Tiered Cache] Use ConcurrentHashMap explicitly in IndicesRequestCache (opensearch-project#14409) (opensearch-project#14430)
(cherry picked from commit 823ce68) Signed-off-by: Kiran Prakash <awskiran@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 1372f14 commit a098dc1

File tree

3 files changed

+22
-16
lines changed

3 files changed

+22
-16
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1167,7 +1167,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
11671167
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
11681168
}
11691169

1170-
// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
1170+
// when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys
11711171
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
11721172
int cacheCleanIntervalInMillis = 10;
11731173
String node = internalCluster().startNode(

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import java.util.Objects;
8282
import java.util.Optional;
8383
import java.util.Set;
84+
import java.util.concurrent.ConcurrentHashMap;
8485
import java.util.concurrent.ConcurrentMap;
8586
import java.util.concurrent.atomic.AtomicBoolean;
8687
import java.util.concurrent.atomic.AtomicInteger;
@@ -507,15 +508,15 @@ public int hashCode() {
507508
* */
508509
class IndicesRequestCacheCleanupManager implements Closeable {
509510
private final Set<CleanupKey> keysToClean;
510-
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
511+
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
511512
private final AtomicInteger staleKeysCount;
512513
private volatile double stalenessThreshold;
513514
private final IndicesRequestCacheCleaner cacheCleaner;
514515

515516
IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
516517
this.stalenessThreshold = stalenessThreshold;
517518
this.keysToClean = ConcurrentCollections.newConcurrentSet();
518-
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
519+
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
519520
this.staleKeysCount = new AtomicInteger(0);
520521
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
521522
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
@@ -573,8 +574,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
573574

574575
// pkg-private for testing
575576
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
576-
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
577-
.merge(readerCacheKeyId, 1, Integer::sum);
577+
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum);
578578
}
579579

580580
/**
@@ -832,7 +832,7 @@ public void close() {
832832
}
833833

834834
// for testing
835-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
835+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
836836
return cleanupKeyToCountMap;
837837
}
838838

server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java

+16-10
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import java.util.Optional;
102102
import java.util.UUID;
103103
import java.util.concurrent.ConcurrentHashMap;
104-
import java.util.concurrent.ConcurrentMap;
105104
import java.util.concurrent.CountDownLatch;
106105
import java.util.concurrent.ExecutorService;
107106
import java.util.concurrent.Executors;
@@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
491490
indexShard.hashCode()
492491
);
493492
// test the mapping
494-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
493+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
494+
.getCleanupKeyToCountMap();
495495
// shard id should exist
496496
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
497497
// reader CacheKeyId should NOT exist
@@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
554554
);
555555

556556
// test the mapping
557-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
557+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
558+
.getCleanupKeyToCountMap();
558559
// shard id should exist
559560
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
560561
// reader CacheKeyId should NOT exist
@@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
722723
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
723724
assertEquals(1, cache.count());
724725
// test the mappings
725-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
726+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
727+
.getCleanupKeyToCountMap();
726728
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));
727729

728730
cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
@@ -796,15 +798,15 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
796798
}
797799

798800
// test adding to cleanupKeyToCountMap with multiple threads
799-
public void testAddToCleanupKeyToCountMap() throws Exception {
801+
public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception {
800802
threadPool = getThreadPool();
801803
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
802804
cache = getIndicesRequestCache(settings);
803805

804806
int numberOfThreads = 10;
805807
int numberOfIterations = 1000;
806808
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
807-
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
809+
AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false);
808810

809811
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
810812

@@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
817819
}
818820
} catch (ConcurrentModificationException e) {
819821
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
820-
exceptionDetected.set(true); // Set flag if exception is detected
822+
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
821823
}
822824
});
823825
}
@@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
836838
}
837839
} catch (ConcurrentModificationException e) {
838840
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
839-
exceptionDetected.set(true); // Set flag if exception is detected
841+
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
840842
}
841843
});
842844

843845
executorService.shutdown();
844-
executorService.awaitTermination(60, TimeUnit.SECONDS);
845-
assertFalse(exceptionDetected.get());
846+
assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
847+
assertEquals(
848+
numberOfThreads * numberOfIterations,
849+
cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size()
850+
);
851+
assertFalse(concurrentModificationExceptionDetected.get());
846852
}
847853

848854
private IndicesRequestCache getIndicesRequestCache(Settings settings) {

0 commit comments

Comments
 (0)