Skip to content

Commit 53fb91c

Browse files
use concurrentHashMap explicitly
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
1 parent 21d3aaa commit 53fb91c

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
### Removed
2525

2626
### Fixed
27+
- Used ConcurrentHashMap explicitly in IndicesRequestCache ([#14409](https://github.com/opensearch-project/OpenSearch/pull/14409))
2728

2829
### Security
2930

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import java.util.Objects;
8282
import java.util.Optional;
8383
import java.util.Set;
84+
import java.util.concurrent.ConcurrentHashMap;
8485
import java.util.concurrent.ConcurrentMap;
8586
import java.util.concurrent.atomic.AtomicBoolean;
8687
import java.util.concurrent.atomic.AtomicInteger;
@@ -506,15 +507,15 @@ public int hashCode() {
506507
* */
507508
class IndicesRequestCacheCleanupManager implements Closeable {
508509
private final Set<CleanupKey> keysToClean;
509-
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
510+
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
510511
private final AtomicInteger staleKeysCount;
511512
private volatile double stalenessThreshold;
512513
private final IndicesRequestCacheCleaner cacheCleaner;
513514

514515
IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
515516
this.stalenessThreshold = stalenessThreshold;
516517
this.keysToClean = ConcurrentCollections.newConcurrentSet();
517-
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
518+
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
518519
this.staleKeysCount = new AtomicInteger(0);
519520
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
520521
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
@@ -572,8 +573,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
572573

573574
// pkg-private for testing
574575
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
575-
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
576-
.merge(readerCacheKeyId, 1, Integer::sum);
576+
cleanupKeyToCountMap.compute(shardId, (currentShardId, readerCacheKeyMap) -> {
577+
if (readerCacheKeyMap == null) {
578+
readerCacheKeyMap = new ConcurrentHashMap<>();
579+
}
580+
readerCacheKeyMap.compute(readerCacheKeyId, (currentReaderCacheKeyId, count) -> (count == null) ? 1 : count + 1);
581+
return readerCacheKeyMap;
582+
});
577583
}
578584

579585
/**
@@ -831,7 +837,7 @@ public void close() {
831837
}
832838

833839
// for testing
834-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
840+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
835841
return cleanupKeyToCountMap;
836842
}
837843

server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import java.util.Optional;
102102
import java.util.UUID;
103103
import java.util.concurrent.ConcurrentHashMap;
104-
import java.util.concurrent.ConcurrentMap;
105104
import java.util.concurrent.CountDownLatch;
106105
import java.util.concurrent.ExecutorService;
107106
import java.util.concurrent.Executors;
@@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
491490
indexShard.hashCode()
492491
);
493492
// test the mapping
494-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
493+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
494+
.getCleanupKeyToCountMap();
495495
// shard id should exist
496496
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
497497
// reader CacheKeyId should NOT exist
@@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
554554
);
555555

556556
// test the mapping
557-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
557+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
558+
.getCleanupKeyToCountMap();
558559
// shard id should exist
559560
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
560561
// reader CacheKeyId should NOT exist
@@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
722723
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
723724
assertEquals(1, cache.count());
724725
// test the mappings
725-
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
726+
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
727+
.getCleanupKeyToCountMap();
726728
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));
727729

728730
cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());

0 commit comments

Comments
 (0)