Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Cache] Use ConcurrentHashMap explicitly in IndicesRequestCache #14409

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1168,7 +1168,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
// when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
int cacheCleanIntervalInMillis = 10;
String node = internalCluster().startNode(
Original file line number Diff line number Diff line change
@@ -81,6 +81,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -506,15 +507,15 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
this.staleKeysCount = new AtomicInteger(0);
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
@@ -572,8 +573,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum);
}

/**
@@ -831,7 +831,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Original file line number Diff line number Diff line change
@@ -101,7 +101,6 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
@@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
@@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
@@ -796,15 +798,15 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
}

// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws Exception {
public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

@@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});
}
@@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
assertFalse(exceptionDetected.get());
assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
assertEquals(
numberOfThreads * numberOfIterations,
cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size()
);
assertFalse(concurrentModificationExceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {