142
142
import org .opensearch .indices .RemoteStoreSettings ;
143
143
import org .opensearch .indices .recovery .RecoverySettings ;
144
144
import org .opensearch .indices .recovery .RecoveryState ;
145
+ import org .opensearch .monitor .jvm .JvmInfo ;
145
146
import org .opensearch .node .remotestore .RemoteStorePinnedTimestampService ;
146
147
import org .opensearch .repositories .IndexId ;
147
148
import org .opensearch .repositories .IndexMetaDataGenerations ;
167
168
import java .io .FilterInputStream ;
168
169
import java .io .IOException ;
169
170
import java .io .InputStream ;
171
+ import java .lang .ref .SoftReference ;
170
172
import java .nio .file .NoSuchFileException ;
171
173
import java .util .ArrayList ;
172
174
import java .util .Arrays ;
196
198
import java .util .stream .LongStream ;
197
199
import java .util .stream .Stream ;
198
200
201
+ import static org .opensearch .common .unit .MemorySizeValue .parseBytesSizeValueOrHeapRatio ;
199
202
import static org .opensearch .index .remote .RemoteStoreEnums .PathHashAlgorithm .FNV_1A_COMPOSITE_1 ;
200
203
import static org .opensearch .index .snapshots .blobstore .BlobStoreIndexShardSnapshot .FileInfo .canonicalName ;
201
204
import static org .opensearch .repositories .blobstore .ChecksumBlobStoreFormat .SNAPSHOT_ONLY_FORMAT_PARAMS ;
@@ -253,6 +256,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
253
256
*/
254
257
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__" ;
255
258
259
+ public static final String SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME = "snapshot.repository_data.cache.threshold" ;
260
+
261
+ public static final double SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE = 0.01 ;
262
+
263
+ public static final long CACHE_MIN_THRESHOLD = ByteSizeUnit .KB .toBytes (500 );
264
+
265
+ public static final long CACHE_MAX_THRESHOLD = calculateMaxSnapshotRepositoryDataCacheThreshold ();
266
+
267
+ public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold ();
268
+
269
+ /**
270
+ * Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions.
271
+ * This ensures compatibility across various JDK versions. For a practical usage example,
272
+ * see this link: https://github.com/openjdk/jdk11u/blob/cee8535a9d3de8558b4b5028d68e397e508bef71/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ByteArrayChannel.java#L226
273
+ */
274
+ private static final int MAX_SAFE_ARRAY_SIZE = Integer .MAX_VALUE - 8 ;
275
+
256
276
/**
257
277
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
258
278
* contents will not result in the repository being marked as corrupted.
@@ -275,6 +295,58 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
275
295
Setting .Property .Deprecated
276
296
);
277
297
298
+ /**
299
+ * Sets the cache size for snapshot repository data: the valid range is within 500Kb ... 1% of the node heap memory.
300
+ */
301
+ public static final Setting <ByteSizeValue > SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD = new Setting <>(
302
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME ,
303
+ CACHE_DEFAULT_THRESHOLD + "b" ,
304
+ (s ) -> {
305
+ ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio (s , SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME );
306
+ long userDefinedLimitBytes = userDefinedLimit .getBytes ();
307
+
308
+ if (userDefinedLimitBytes > CACHE_MAX_THRESHOLD ) {
309
+ throw new IllegalArgumentException (
310
+ "["
311
+ + SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
312
+ + "] cannot be larger than ["
313
+ + CACHE_MAX_THRESHOLD
314
+ + "] bytes."
315
+ );
316
+ }
317
+
318
+ if (userDefinedLimitBytes < CACHE_MIN_THRESHOLD ) {
319
+ throw new IllegalArgumentException (
320
+ "["
321
+ + SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
322
+ + "] cannot be smaller than ["
323
+ + CACHE_MIN_THRESHOLD
324
+ + "] bytes."
325
+ );
326
+ }
327
+
328
+ return userDefinedLimit ;
329
+ },
330
+ Setting .Property .NodeScope
331
+ );
332
+
333
+ public static long calculateDefaultSnapshotRepositoryDataCacheThreshold () {
334
+ return Math .max (ByteSizeUnit .KB .toBytes (500 ), CACHE_MAX_THRESHOLD / 2 );
335
+ }
336
+
337
+ public static long calculateMaxSnapshotRepositoryDataCacheThreshold () {
338
+ long jvmHeapSize = JvmInfo .jvmInfo ().getMem ().getHeapMax ().getBytes ();
339
+ long defaultThresholdOfHeap = (long ) (jvmHeapSize * SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE );
340
+ long defaultAbsoluteThreshold = ByteSizeUnit .KB .toBytes (500 );
341
+ long maxThreshold = calculateMaxWithinIntLimit (defaultThresholdOfHeap , defaultAbsoluteThreshold );
342
+
343
+ return maxThreshold ;
344
+ }
345
+
346
+ protected static long calculateMaxWithinIntLimit (long defaultThresholdOfHeap , long defaultAbsoluteThreshold ) {
347
+ return Math .min (Math .max (defaultThresholdOfHeap , defaultAbsoluteThreshold ), MAX_SAFE_ARRAY_SIZE );
348
+ }
349
+
278
350
/**
279
351
* Size hint for the IO buffer size to use when reading from and writing to the repository.
280
352
*/
@@ -461,6 +533,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
461
533
462
534
private volatile boolean enableAsyncDeletion ;
463
535
536
+ protected final long repositoryDataCacheThreshold ;
537
+
464
538
/**
465
539
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
466
540
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
@@ -515,6 +589,7 @@ protected BlobStoreRepository(
515
589
this .snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING .get (clusterService .getSettings ());
516
590
this .enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING .get (clusterService .getSettings ());
517
591
clusterService .getClusterSettings ().addSettingsUpdateConsumer (SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING , this ::setEnableAsyncDeletion );
592
+ this .repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD .get (clusterService .getSettings ()).getBytes ();
518
593
}
519
594
520
595
@ Override
@@ -1132,7 +1207,8 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
1132
1207
cached = null ;
1133
1208
} else {
1134
1209
genToLoad = latestKnownRepoGen .get ();
1135
- cached = latestKnownRepositoryData .get ();
1210
+ SoftReference <Tuple <Long , BytesReference >> softRef = latestKnownRepositoryData .get ();
1211
+ cached = (softRef != null ) ? softRef .get () : null ;
1136
1212
}
1137
1213
if (genToLoad > generation ) {
1138
1214
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
@@ -2926,15 +3002,19 @@ public void endVerification(String seed) {
2926
3002
private final AtomicLong latestKnownRepoGen = new AtomicLong (RepositoryData .UNKNOWN_REPO_GEN );
2927
3003
2928
3004
// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
2929
- private final AtomicReference <Tuple <Long , BytesReference >> latestKnownRepositoryData = new AtomicReference <>();
3005
+ private final AtomicReference <SoftReference <Tuple <Long , BytesReference >>> latestKnownRepositoryData = new AtomicReference <>(
3006
+ new SoftReference <>(null )
3007
+ );
2930
3008
2931
3009
@ Override
2932
3010
public void getRepositoryData (ActionListener <RepositoryData > listener ) {
2933
3011
if (latestKnownRepoGen .get () == RepositoryData .CORRUPTED_REPO_GEN ) {
2934
3012
listener .onFailure (corruptedStateException (null ));
2935
3013
return ;
2936
3014
}
2937
- final Tuple <Long , BytesReference > cached = latestKnownRepositoryData .get ();
3015
+ final SoftReference <Tuple <Long , BytesReference >> softRef = latestKnownRepositoryData .get ();
3016
+ final Tuple <Long , BytesReference > cached = (softRef != null ) ? softRef .get () : null ;
3017
+
2938
3018
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
2939
3019
// the latest known repository generation
2940
3020
if (bestEffortConsistency == false && cached != null && cached .v1 () == latestKnownRepoGen .get ()) {
@@ -2983,7 +3063,8 @@ private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
2983
3063
genToLoad = latestKnownRepoGen .get ();
2984
3064
}
2985
3065
try {
2986
- final Tuple <Long , BytesReference > cached = latestKnownRepositoryData .get ();
3066
+ final SoftReference <Tuple <Long , BytesReference >> softRef = latestKnownRepositoryData .get ();
3067
+ final Tuple <Long , BytesReference > cached = (softRef != null ) ? softRef .get () : null ;
2987
3068
final RepositoryData loaded ;
2988
3069
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
2989
3070
if (bestEffortConsistency == false && cached != null && cached .v1 () == genToLoad ) {
@@ -3050,19 +3131,22 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
3050
3131
try {
3051
3132
serialized = CompressorRegistry .defaultCompressor ().compress (updated );
3052
3133
final int len = serialized .length ();
3053
- if (len > ByteSizeUnit .KB .toBytes (500 )) {
3134
+ long cacheWarningThreshold = Math .min (repositoryDataCacheThreshold * 10 , MAX_SAFE_ARRAY_SIZE );
3135
+ if (len > repositoryDataCacheThreshold ) {
3054
3136
logger .debug (
3055
- "Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in"
3137
+ "Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in"
3056
3138
+ " serialized size" ,
3057
3139
len ,
3058
- metadata .name ()
3140
+ metadata .name (),
3141
+ repositoryDataCacheThreshold
3059
3142
);
3060
- if (len > ByteSizeUnit . MB . toBytes ( 5 ) ) {
3143
+ if (len > cacheWarningThreshold ) {
3061
3144
logger .warn (
3062
- "Your repository metadata blob for repository [{}] is larger than 5MB . Consider moving to a fresh"
3145
+ "Your repository metadata blob for repository [{}] is larger than [{}] bytes . Consider moving to a fresh"
3063
3146
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable"
3064
3147
+ " repository behavior going forward." ,
3065
- metadata .name ()
3148
+ metadata .name (),
3149
+ cacheWarningThreshold
3066
3150
);
3067
3151
}
3068
3152
// Set empty repository data to not waste heap for an outdated cached value
@@ -3074,11 +3158,12 @@ private void cacheRepositoryData(BytesReference updated, long generation) {
3074
3158
logger .warn ("Failed to serialize repository data" , e );
3075
3159
return ;
3076
3160
}
3077
- latestKnownRepositoryData .updateAndGet (known -> {
3161
+ latestKnownRepositoryData .updateAndGet (knownRef -> {
3162
+ Tuple <Long , BytesReference > known = (knownRef != null ) ? knownRef .get () : null ;
3078
3163
if (known != null && known .v1 () > generation ) {
3079
- return known ;
3164
+ return knownRef ;
3080
3165
}
3081
- return new Tuple <>(generation , serialized );
3166
+ return new SoftReference <>( new Tuple <>(generation , serialized ) );
3082
3167
});
3083
3168
}
3084
3169
}
0 commit comments