|
24 | 24 | import java.nio.file.Files;
|
25 | 25 | import java.nio.file.Path;
|
26 | 26 | import java.security.AccessController;
|
27 |
| -import java.security.PrivilegedAction; |
| 27 | +import java.security.PrivilegedActionException; |
| 28 | +import java.security.PrivilegedExceptionAction; |
28 | 29 | import java.util.concurrent.CompletableFuture;
|
29 | 30 | import java.util.concurrent.CompletionException;
|
30 | 31 | import java.util.concurrent.atomic.AtomicBoolean;
|
@@ -56,39 +57,52 @@ public TransferManager(final StreamReader streamReader, final FileCache fileCach
|
56 | 57 |
|
57 | 58 | /**
|
58 | 59 | * Given a blobFetchRequestList, return it's corresponding IndexInput.
|
| 60 | + * |
| 61 | + * Note: Scripted queries/aggs may trigger a blob fetch within a new security context. |
| 62 | + * As such the following operations require elevated permissions. |
| 63 | + * |
| 64 | + * cacheEntry.getIndexInput() downloads new blobs from the remote store to local fileCache. |
| 65 | + * fileCache.compute() as inserting into the local fileCache may trigger an eviction. |
| 66 | + * |
59 | 67 | * @param blobFetchRequest to fetch
|
60 | 68 | * @return future of IndexInput augmented with internal caching maintenance tasks
|
61 | 69 | */
|
62 | 70 | public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {
|
63 |
| - |
64 | 71 | final Path key = blobFetchRequest.getFilePath();
|
65 | 72 | logger.trace("fetchBlob called for {}", key.toString());
|
66 | 73 |
|
67 |
| - // We need to do a privileged action here in order to fetch from remote |
68 |
| - // and write/evict from local file cache in case this is invoked as a side |
69 |
| - // effect of a plugin (such as a scripted search) that doesn't have the |
70 |
| - // necessary permissions. |
71 |
| - final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction<CachedIndexInput>) () -> { |
72 |
| - return fileCache.compute(key, (path, cachedIndexInput) -> { |
73 |
| - if (cachedIndexInput == null || cachedIndexInput.isClosed()) { |
74 |
| - logger.trace("Transfer Manager - IndexInput closed or not in cache"); |
75 |
| - // Doesn't exist or is closed, either way create a new one |
76 |
| - return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); |
77 |
| - } else { |
78 |
| - logger.trace("Transfer Manager - Already in cache"); |
79 |
| - // already in the cache and ready to be used (open) |
80 |
| - return cachedIndexInput; |
| 74 | + try { |
| 75 | + return AccessController.doPrivileged((PrivilegedExceptionAction<IndexInput>) () -> { |
| 76 | + CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { |
| 77 | + if (cachedIndexInput == null || cachedIndexInput.isClosed()) { |
| 78 | + logger.trace("Transfer Manager - IndexInput closed or not in cache"); |
| 79 | + // Doesn't exist or is closed, either way create a new one |
| 80 | + return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); |
| 81 | + } else { |
| 82 | + logger.trace("Transfer Manager - Already in cache"); |
| 83 | + // already in the cache and ready to be used (open) |
| 84 | + return cachedIndexInput; |
| 85 | + } |
| 86 | + }); |
| 87 | + |
| 88 | + // Cache entry was either retrieved from the cache or newly added, either |
| 89 | + // way the reference count has been incremented by one. We can only |
| 90 | + // decrement this reference _after_ creating the clone to be returned. |
| 91 | + try { |
| 92 | + return cacheEntry.getIndexInput().clone(); |
| 93 | + } finally { |
| 94 | + fileCache.decRef(key); |
81 | 95 | }
|
82 | 96 | });
|
83 |
| - }); |
84 |
| - |
85 |
| - // Cache entry was either retrieved from the cache or newly added, either |
86 |
| - // way the reference count has been incremented by one. We can only |
87 |
| - // decrement this reference _after_ creating the clone to be returned. |
88 |
| - try { |
89 |
| - return cacheEntry.getIndexInput().clone(); |
90 |
| - } finally { |
91 |
| - fileCache.decRef(key); |
| 97 | + } catch (PrivilegedActionException e) { |
| 98 | + final Exception cause = e.getException(); |
| 99 | + if (cause instanceof IOException) { |
| 100 | + throw (IOException) cause; |
| 101 | + } else if (cause instanceof RuntimeException) { |
| 102 | + throw (RuntimeException) cause; |
| 103 | + } else { |
| 104 | + throw new IOException(cause); |
| 105 | + } |
92 | 106 | }
|
93 | 107 | }
|
94 | 108 |
|
|
0 commit comments