Skip to content

Commit 897d1f8

Browse files
authored
Concurrency optimization for graph native loading update (#2345)
Signed-off-by: Ganesh Ramadurai <gramadur@icloud.com>
1 parent 07fe680 commit 897d1f8

7 files changed

+492
-21
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
* [Remote Vector Index Build] Introduce Remote Native Index Build feature flag, settings, and initial skeleton [#2525](https://github.com/opensearch-project/k-NN/pull/2525)
1010
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
1111
* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554)
12+
* Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]
1213
### Enhancements
1314
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
1415
### Bug Fixes

src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java

+66-4
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import java.util.Iterator;
3636
import java.util.Map;
3737
import java.util.Optional;
38+
import java.util.concurrent.ConcurrentHashMap;
3839
import java.util.concurrent.ConcurrentLinkedDeque;
3940
import java.util.concurrent.ExecutionException;
4041
import java.util.concurrent.ExecutorService;
4142
import java.util.concurrent.Executors;
4243
import java.util.concurrent.TimeUnit;
4344
import java.util.concurrent.atomic.AtomicBoolean;
45+
import java.util.concurrent.locks.ReentrantLock;
4446

4547
/**
4648
* Manages native memory allocations made by JNI.
@@ -56,6 +58,7 @@ public class NativeMemoryCacheManager implements Closeable {
5658

5759
private Cache<String, NativeMemoryAllocation> cache;
5860
private Deque<String> accessRecencyQueue;
61+
private final ConcurrentHashMap<String, ReentrantLock> indexLocks = new ConcurrentHashMap<>();
5962
private final ExecutorService executor;
6063
private AtomicBoolean cacheCapacityReached;
6164
private long maxWeight;
@@ -306,6 +309,55 @@ public CacheStats getCacheStats() {
306309
return cache.stats();
307310
}
308311

312+
/**
313+
* Opens a vector index with proper locking mechanism to ensure thread safety.
314+
* The method uses a ReentrantLock to synchronize access to the index file and
315+
* cleans up the lock when no other threads are waiting.
316+
*
317+
* @param key the unique identifier for the index
318+
* @param nativeMemoryEntryContext the context containing vector index information
319+
*/
320+
private void open(String key, NativeMemoryEntryContext nativeMemoryEntryContext) {
321+
ReentrantLock indexFileLock = indexLocks.computeIfAbsent(key, k -> new ReentrantLock());
322+
try {
323+
indexFileLock.lock();
324+
nativeMemoryEntryContext.open();
325+
} finally {
326+
indexFileLock.unlock();
327+
if (!indexFileLock.hasQueuedThreads()) {
328+
indexLocks.remove(key, indexFileLock);
329+
}
330+
}
331+
}
332+
333+
/**
334+
* Retrieves an entry from the cache and updates its access recency if found.
335+
* This method combines cache access with recency queue management to maintain
336+
* the least recently used (LRU) order of cached entries.
337+
*
338+
* @param key the unique identifier for the cached entry
339+
* @return the cached NativeMemoryAllocation if present, null otherwise
340+
*/
341+
private NativeMemoryAllocation getFromCacheAndUpdateRecency(String key) {
342+
NativeMemoryAllocation result = cache.getIfPresent(key);
343+
if (result != null) {
344+
updateAccessRecency(key);
345+
}
346+
return result;
347+
}
348+
349+
/**
350+
* Updates the access recency of a cached entry by moving it to the end of the queue.
351+
* This method maintains the least recently used (LRU) order by removing the entry
352+
* from its current position and adding it to the end of the queue.
353+
*
354+
* @param key the unique identifier for the cached entry whose recency needs to be updated
355+
*/
356+
private void updateAccessRecency(String key) {
357+
accessRecencyQueue.remove(key);
358+
accessRecencyQueue.addLast(key);
359+
}
360+
309361
/**
310362
* Retrieves NativeMemoryAllocation associated with the nativeMemoryEntryContext.
311363
*
@@ -338,23 +390,28 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
338390
// In case of a cache miss, least recently accessed entries are evicted in a blocking manner
339391
// before the new entry can be added to the cache.
340392
String key = nativeMemoryEntryContext.getKey();
341-
NativeMemoryAllocation result = cache.getIfPresent(key);
342393

343394
// Cache Hit
344395
// In case of a cache hit, moving the item to the end of the recency queue adds
345396
// some overhead to the get operation. This can be optimized further to make this operation
346397
// as lightweight as possible. Multiple approaches and their outcomes were documented
347398
// before moving forward with the current solution.
348399
// The details are outlined here: https://github.com/opensearch-project/k-NN/pull/2015#issuecomment-2327064680
400+
NativeMemoryAllocation result = getFromCacheAndUpdateRecency(key);
349401
if (result != null) {
350-
accessRecencyQueue.remove(key);
351-
accessRecencyQueue.addLast(key);
352402
return result;
353403
}
354404

355405
// Cache Miss
356406
// Evict before put
407+
// open the graph file before proceeding to load the graph into memory
408+
open(key, nativeMemoryEntryContext);
357409
synchronized (this) {
410+
// recheck if another thread already loaded this entry into the cache
411+
result = getFromCacheAndUpdateRecency(key);
412+
if (result != null) {
413+
return result;
414+
}
358415
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
359416
Iterator<String> lruIterator = accessRecencyQueue.iterator();
360417
while (lruIterator.hasNext()
@@ -376,7 +433,12 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
376433
return result;
377434
}
378435
} else {
379-
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
436+
// open graphFile before load
437+
try (nativeMemoryEntryContext) {
438+
String key = nativeMemoryEntryContext.getKey();
439+
open(key, nativeMemoryEntryContext);
440+
return cache.get(key, nativeMemoryEntryContext::load);
441+
}
380442
}
381443
}
382444

src/main/java/org/opensearch/knn/index/memory/NativeMemoryEntryContext.java

+81-1
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@
1212
package org.opensearch.knn.index.memory;
1313

1414
import lombok.Getter;
15+
import lombok.extern.log4j.Log4j2;
1516
import org.apache.lucene.store.Directory;
17+
import org.apache.lucene.store.IOContext;
18+
import org.apache.lucene.store.IndexInput;
1619
import org.opensearch.cluster.service.ClusterService;
1720
import org.opensearch.common.Nullable;
1821
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
1922
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
2023
import org.opensearch.knn.index.VectorDataType;
24+
import org.opensearch.knn.index.store.IndexInputWithBuffer;
2125

2226
import java.io.IOException;
2327
import java.util.Map;
@@ -26,7 +30,7 @@
2630
/**
2731
* Encapsulates all information needed to load a component into native memory.
2832
*/
29-
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> {
33+
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> implements AutoCloseable {
3034

3135
protected final String key;
3236

@@ -55,13 +59,27 @@ public String getKey() {
5559
*/
5660
public abstract Integer calculateSizeInKB();
5761

62+
/**
63+
* Opens the graph file by opening the corresponding indexInput so
64+
* that it is available for graph loading
65+
*/
66+
67+
public void open() {}
68+
69+
/**
70+
* Provides the capability to close the closable objects in the {@link NativeMemoryEntryContext}
71+
*/
72+
@Override
73+
public void close() {}
74+
5875
/**
5976
* Loads entry into memory.
6077
*
6178
* @return NativeMemoryAllocation associated with NativeMemoryEntryContext
6279
*/
6380
public abstract T load() throws IOException;
6481

82+
@Log4j2
6583
public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.IndexAllocation> {
6684

6785
@Getter
@@ -75,6 +93,17 @@ public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMem
7593
@Getter
7694
private final String modelId;
7795

96+
@Getter
97+
private boolean indexGraphFileOpened = false;
98+
@Getter
99+
private int indexSizeKb;
100+
101+
@Getter
102+
private IndexInput readStream;
103+
104+
@Getter
105+
IndexInputWithBuffer indexInputWithBuffer;
106+
78107
/**
79108
* Constructor
80109
*
@@ -131,10 +160,61 @@ public Integer calculateSizeInKB() {
131160
}
132161
}
133162

163+
@Override
164+
public void open() {
165+
// if graph file is already opened for index, do nothing
166+
if (isIndexGraphFileOpened()) {
167+
return;
168+
}
169+
// Extract vector file name from the given cache key.
170+
// Ex: _0_165_my_field.faiss@1vaqiupVUwvkXAG4Qc/RPg==
171+
final String cacheKey = this.getKey();
172+
final String vectorFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(cacheKey);
173+
if (vectorFileName == null) {
174+
throw new IllegalStateException(
175+
"Invalid cache key was given. The key [" + cacheKey + "] does not contain the corresponding vector file name."
176+
);
177+
}
178+
179+
// Prepare for opening index input from directory.
180+
final Directory directory = this.getDirectory();
181+
182+
// Try to open an index input then pass it down to native engine for loading an index.
183+
try {
184+
indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);
185+
readStream = directory.openInput(vectorFileName, IOContext.READONCE);
186+
readStream.seek(0);
187+
indexInputWithBuffer = new IndexInputWithBuffer(readStream);
188+
indexGraphFileOpened = true;
189+
log.debug("[KNN] NativeMemoryCacheManager open successful");
190+
} catch (IOException e) {
191+
throw new RuntimeException("Failed to open the index " + openSearchIndexName);
192+
}
193+
}
194+
134195
@Override
135196
public NativeMemoryAllocation.IndexAllocation load() throws IOException {
197+
if (!isIndexGraphFileOpened()) {
198+
throw new IllegalStateException("Index graph file is not open");
199+
}
136200
return indexLoadStrategy.load(this);
137201
}
202+
203+
// close the indexInput
204+
@Override
205+
public void close() {
206+
if (readStream != null) {
207+
try {
208+
readStream.close();
209+
indexGraphFileOpened = false;
210+
} catch (IOException e) {
211+
throw new RuntimeException(
212+
"Exception while closing the indexInput index [" + openSearchIndexName + "] for loading the graph file.",
213+
e
214+
);
215+
}
216+
}
217+
}
138218
}
139219

140220
public static class TrainingDataEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.TrainingDataAllocation> {

src/main/java/org/opensearch/knn/index/memory/NativeMemoryLoadStrategy.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,9 @@
1313

1414
import lombok.extern.log4j.Log4j2;
1515
import org.apache.lucene.store.Directory;
16-
import org.apache.lucene.store.IOContext;
17-
import org.apache.lucene.store.IndexInput;
1816
import org.opensearch.core.action.ActionListener;
1917
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
2018
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
21-
import org.opensearch.knn.index.store.IndexInputWithBuffer;
2219
import org.opensearch.knn.index.util.IndexUtil;
2320
import org.opensearch.knn.jni.JNIService;
2421
import org.opensearch.knn.index.engine.KNNEngine;
@@ -88,10 +85,16 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde
8885
final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);
8986

9087
// Try to open an index input then pass it down to native engine for loading an index.
91-
try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) {
92-
final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream);
93-
final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine);
94-
88+
// open in NativeMemoryEntryContext takes care of opening the indexInput file
89+
if (!indexEntryContext.isIndexGraphFileOpened()) {
90+
throw new IllegalStateException("Index [" + indexEntryContext.getOpenSearchIndexName() + "] is not preloaded");
91+
}
92+
try (indexEntryContext) {
93+
final long indexAddress = JNIService.loadIndex(
94+
indexEntryContext.indexInputWithBuffer,
95+
indexEntryContext.getParameters(),
96+
knnEngine
97+
);
9598
return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName);
9699
}
97100
}

0 commit comments

Comments
 (0)