13
13
14
14
import com .google .common .collect .ImmutableMap ;
15
15
import lombok .SneakyThrows ;
16
+ import org .junit .Before ;
17
+ import org .mockito .Mock ;
18
+ import org .opensearch .common .settings .ClusterSettings ;
16
19
import org .opensearch .knn .KNNTestCase ;
17
20
import org .opensearch .knn .TestUtils ;
18
21
import org .opensearch .knn .common .KNNConstants ;
19
- import org .opensearch .knn .index .util .IndexUtil ;
22
+ import org .opensearch .knn .index .KNNSettings ;
23
+ import org .opensearch .knn .index .SpaceType ;
20
24
import org .opensearch .knn .index .VectorDataType ;
25
+ import org .opensearch .knn .index .engine .KNNEngine ;
26
+ import org .opensearch .knn .index .util .IndexUtil ;
21
27
import org .opensearch .knn .jni .JNICommons ;
22
28
import org .opensearch .knn .jni .JNIService ;
23
- import org .opensearch .knn .index .SpaceType ;
24
- import org .opensearch .knn .index .engine .KNNEngine ;
25
29
import org .opensearch .watcher .FileWatcher ;
26
30
import org .opensearch .watcher .WatcherHandle ;
27
31
28
32
import java .nio .file .Path ;
29
33
import java .util .Arrays ;
30
34
import java .util .Map ;
35
+ import java .util .concurrent .ExecutionException ;
31
36
import java .util .concurrent .ExecutorService ;
32
37
import java .util .concurrent .Executors ;
38
+ import java .util .concurrent .Future ;
39
+ import java .util .concurrent .TimeUnit ;
40
+ import java .util .concurrent .atomic .AtomicReference ;
33
41
34
42
import static org .mockito .Mockito .doNothing ;
35
43
import static org .mockito .Mockito .mock ;
44
+ import static org .mockito .Mockito .when ;
45
+ import static org .opensearch .knn .common .featureflags .KNNFeatureFlags .KNN_FORCE_EVICT_CACHE_ENABLED_SETTING ;
36
46
37
47
public class NativeMemoryAllocationTests extends KNNTestCase {
38
48
@@ -41,6 +51,19 @@ public class NativeMemoryAllocationTests extends KNNTestCase {
41
51
private int testLockValue3 ;
42
52
private int testLockValue4 ;
43
53
54
+ @ Mock
55
+ ClusterSettings clusterSettings ;
56
+
57
+ @ Before
58
+ @ Override
59
+ public void setUp () throws Exception {
60
+ super .setUp ();
61
+ clusterSettings = mock (ClusterSettings .class );
62
+ when (clusterService .getClusterSettings ()).thenReturn (clusterSettings );
63
+ when (clusterSettings .get (KNN_FORCE_EVICT_CACHE_ENABLED_SETTING )).thenReturn (false );
64
+ KNNSettings .state ().setClusterService (clusterService );
65
+ }
66
+
44
67
public void testIndexAllocation_close () throws InterruptedException {
45
68
// Create basic nmslib HNSW index
46
69
Path dir = createTempDir ();
@@ -207,6 +230,71 @@ public void testIndexAllocation_readLock() throws InterruptedException {
207
230
assertEquals (finalValue , testLockValue1 );
208
231
}
209
232
233
+ public void testIndexAllocation_closeDefault () {
234
+ WatcherHandle <FileWatcher > watcherHandle = (WatcherHandle <FileWatcher >) mock (WatcherHandle .class );
235
+ ExecutorService executorService = Executors .newFixedThreadPool (2 );
236
+ AtomicReference <Exception > expectedException = new AtomicReference <>();
237
+
238
+ // Executor based non-blocking close
239
+ NativeMemoryAllocation .IndexAllocation nonBlockingIndexAllocation = new NativeMemoryAllocation .IndexAllocation (
240
+ mock (ExecutorService .class ),
241
+ 0 ,
242
+ 0 ,
243
+ null ,
244
+ "test" ,
245
+ "test" ,
246
+ watcherHandle
247
+ );
248
+
249
+ executorService .submit (nonBlockingIndexAllocation ::readLock );
250
+ Future <?> closingThread = executorService .submit (nonBlockingIndexAllocation ::close );
251
+ try {
252
+ closingThread .get ();
253
+ } catch (Exception ex ) {
254
+ expectedException .set (ex );
255
+ }
256
+ assertNull (expectedException .get ());
257
+ expectedException .set (null );
258
+ executorService .shutdown ();
259
+ }
260
+
261
+ public void testIndexAllocation_closeBlocking () throws InterruptedException , ExecutionException {
262
+ WatcherHandle <FileWatcher > watcherHandle = (WatcherHandle <FileWatcher >) mock (WatcherHandle .class );
263
+ ExecutorService executorService = Executors .newFixedThreadPool (2 );
264
+ AtomicReference <Exception > expectedException = new AtomicReference <>();
265
+
266
+ // Blocking close
267
+ when (clusterSettings .get (KNN_FORCE_EVICT_CACHE_ENABLED_SETTING )).thenReturn (true );
268
+ NativeMemoryAllocation .IndexAllocation blockingIndexAllocation = new NativeMemoryAllocation .IndexAllocation (
269
+ mock (ExecutorService .class ),
270
+ 0 ,
271
+ 0 ,
272
+ null ,
273
+ "test" ,
274
+ "test" ,
275
+ watcherHandle
276
+ );
277
+
278
+ executorService .submit (blockingIndexAllocation ::readLock );
279
+ Future <?> closingThread = executorService .submit (blockingIndexAllocation ::close );
280
+
281
+ // Check if thread is currently blocked
282
+ try {
283
+ closingThread .get (5 , TimeUnit .SECONDS );
284
+ } catch (Exception e ) {
285
+ expectedException .set (e );
286
+ }
287
+
288
+ assertNotNull (expectedException .get ());
289
+
290
+ executorService .submit (blockingIndexAllocation ::readUnlock );
291
+ closingThread .get ();
292
+
293
+ // Waits until close
294
+ assertTrue (blockingIndexAllocation .isClosed ());
295
+ executorService .shutdown ();
296
+ }
297
+
210
298
public void testIndexAllocation_writeLock () throws InterruptedException {
211
299
// To test the writeLock, we first grab the writeLock in the main thread. Then we start another thread that
212
300
// grabs the readLock and asserts testLockValue2 has been updated. Next in the main thread, we update the value
0 commit comments