Skip to content

Commit 823ce68

Browse files
[Tiered Cache] Use ConcurrentHashMap explicitly in IndicesRequestCache (opensearch-project#14409)
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
1 parent 903784b commit 823ce68

File tree

3 files changed

+22
-16
lines changed

3 files changed

+22
-16
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1168,7 +1168,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
11681168
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
11691169
}
11701170

1171-
// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
1171+
// when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys
11721172
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
11731173
int cacheCleanIntervalInMillis = 10;
11741174
String node = internalCluster().startNode(

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import java.util.Objects;
8282
import java.util.Optional;
8383
import java.util.Set;
84+
import java.util.concurrent.ConcurrentHashMap;
8485
import java.util.concurrent.ConcurrentMap;
8586
import java.util.concurrent.atomic.AtomicBoolean;
8687
import java.util.concurrent.atomic.AtomicInteger;
@@ -506,15 +507,15 @@ public int hashCode() {
506507
* */
507508
class IndicesRequestCacheCleanupManager implements Closeable {
508509
private final Set<CleanupKey> keysToClean;
509-
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
510+
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
510511
private final AtomicInteger staleKeysCount;
511512
private volatile double stalenessThreshold;
512513
private final IndicesRequestCacheCleaner cacheCleaner;
513514

514515
IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
515516
this.stalenessThreshold = stalenessThreshold;
516517
this.keysToClean = ConcurrentCollections.newConcurrentSet();
517-
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
518+
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
518519
this.staleKeysCount = new AtomicInteger(0);
519520
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
520521
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
@@ -572,8 +573,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
572573

573574
// pkg-private for testing
574575
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
575-
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
576-
.merge(readerCacheKeyId, 1, Integer::sum);
576+
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum);
577577
}
578578

579579
/**
@@ -831,7 +831,7 @@ public void close() {
831831
}
832832

833833
// for testing
834-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
834+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
835835
return cleanupKeyToCountMap;
836836
}
837837

server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java

+16-10
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import java.util.Optional;
102102
import java.util.UUID;
103103
import java.util.concurrent.ConcurrentHashMap;
104-
import java.util.concurrent.ConcurrentMap;
105104
import java.util.concurrent.CountDownLatch;
106105
import java.util.concurrent.ExecutorService;
107106
import java.util.concurrent.Executors;
@@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
491490
indexShard.hashCode()
492491
);
493492
// test the mapping
494-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
493+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
494+
.getCleanupKeyToCountMap();
495495
// shard id should exist
496496
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
497497
// reader CacheKeyId should NOT exist
@@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
554554
);
555555

556556
// test the mapping
557-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
557+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
558+
.getCleanupKeyToCountMap();
558559
// shard id should exist
559560
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
560561
// reader CacheKeyId should NOT exist
@@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
722723
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
723724
assertEquals(1, cache.count());
724725
// test the mappings
725-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
726+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
727+
.getCleanupKeyToCountMap();
726728
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));
727729

728730
cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
@@ -796,15 +798,15 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
796798
}
797799

798800
// test adding to cleanupKeyToCountMap with multiple threads
799-
public void testAddToCleanupKeyToCountMap() throws Exception {
801+
public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception {
800802
threadPool = getThreadPool();
801803
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
802804
cache = getIndicesRequestCache(settings);
803805

804806
int numberOfThreads = 10;
805807
int numberOfIterations = 1000;
806808
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
807-
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
809+
AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false);
808810

809811
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
810812

@@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
817819
}
818820
} catch (ConcurrentModificationException e) {
819821
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
820-
exceptionDetected.set(true); // Set flag if exception is detected
822+
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
821823
}
822824
});
823825
}
@@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
836838
}
837839
} catch (ConcurrentModificationException e) {
838840
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
839-
exceptionDetected.set(true); // Set flag if exception is detected
841+
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
840842
}
841843
});
842844

843845
executorService.shutdown();
844-
executorService.awaitTermination(60, TimeUnit.SECONDS);
845-
assertFalse(exceptionDetected.get());
846+
assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
847+
assertEquals(
848+
numberOfThreads * numberOfIterations,
849+
cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size()
850+
);
851+
assertFalse(concurrentModificationExceptionDetected.get());
846852
}
847853

848854
private IndicesRequestCache getIndicesRequestCache(Settings settings) {

0 commit comments

Comments
 (0)