Skip to content

Commit 2be25bb

Browse files
Fix file cache initialization (#14004)
* fix file cache initialization Signed-off-by: panguixin <panguixin@bytedance.com> * changelog Signed-off-by: panguixin <panguixin@bytedance.com> * add test Signed-off-by: panguixin <panguixin@bytedance.com> --------- Signed-off-by: panguixin <panguixin@bytedance.com>
1 parent a99b494 commit 2be25bb

File tree

9 files changed

+106
-41
lines changed

9 files changed

+106
-41
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4747
- Fix fs info reporting negative available size ([#11573](https://github.com/opensearch-project/OpenSearch/pull/11573))
4848
- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495))
4949
- Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378))
50+
- Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004))
5051

5152
### Security
5253

server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java

+25
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
import org.opensearch.cluster.block.ClusterBlockException;
2929
import org.opensearch.cluster.metadata.IndexMetadata;
3030
import org.opensearch.cluster.node.DiscoveryNode;
31+
import org.opensearch.cluster.node.DiscoveryNodeRole;
3132
import org.opensearch.cluster.routing.GroupShardsIterator;
3233
import org.opensearch.cluster.routing.ShardIterator;
3334
import org.opensearch.cluster.routing.ShardRouting;
3435
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
3536
import org.opensearch.common.Priority;
3637
import org.opensearch.common.io.PathUtils;
3738
import org.opensearch.common.settings.Settings;
39+
import org.opensearch.common.settings.SettingsException;
3840
import org.opensearch.common.unit.TimeValue;
3941
import org.opensearch.core.common.unit.ByteSizeUnit;
4042
import org.opensearch.core.index.Index;
@@ -65,10 +67,13 @@
6567
import java.util.stream.StreamSupport;
6668

6769
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
70+
import static org.opensearch.common.util.FeatureFlags.TIERED_REMOTE_INDEX;
6871
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
6972
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
7073
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
7174
import static org.opensearch.test.NodeRoles.dataNode;
75+
import static org.opensearch.test.NodeRoles.onlyRole;
76+
import static org.opensearch.test.NodeRoles.onlyRoles;
7277
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
7378
import static org.hamcrest.Matchers.contains;
7479
import static org.hamcrest.Matchers.containsString;
@@ -1009,6 +1014,26 @@ public void cleanup() throws Exception {
10091014
);
10101015
}
10111016

1017+
public void testStartSearchNode() throws Exception {
1018+
// test start dedicated search node
1019+
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)));
1020+
// test start node without search role
1021+
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.DATA_ROLE)));
1022+
// test start non-dedicated search node with TIERED_REMOTE_INDEX feature enabled
1023+
internalCluster().startNode(
1024+
Settings.builder()
1025+
.put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
1026+
.put(TIERED_REMOTE_INDEX, true)
1027+
);
1028+
// test start non-dedicated search node
1029+
assertThrows(
1030+
SettingsException.class,
1031+
() -> internalCluster().startNode(
1032+
Settings.builder().put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
1033+
)
1034+
);
1035+
}
1036+
10121037
private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
10131038
final Node node = internalCluster().getInstance(Node.class, nodeName);
10141039
final ShardId shardId = new ShardId(index, 0);

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

+4
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ public static boolean isSearchNode(Settings settings) {
130130
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
131131
}
132132

133+
public static boolean isDedicatedSearchNode(Settings settings) {
134+
return getRolesFromSettings(settings).stream().allMatch(DiscoveryNodeRole.SEARCH_ROLE::equals);
135+
}
136+
133137
private final String nodeName;
134138
private final String nodeId;
135139
private final String ephemeralId;

server/src/main/java/org/opensearch/index/store/remote/utils/cache/SegmentedCache.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ private static final int ceilingNextPowerOfTwo(int x) {
5252
private final Weigher<V> weigher;
5353

5454
public SegmentedCache(Builder<K, V> builder) {
55-
this.capacity = builder.capacity;
5655
final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel);
5756
this.segmentMask = segments - 1;
5857
this.table = newSegmentArray(segments);
59-
this.perSegmentCapacity = (capacity + (segments - 1)) / segments;
58+
this.perSegmentCapacity = (builder.capacity + (segments - 1)) / segments;
6059
this.weigher = builder.weigher;
6160
for (int i = 0; i < table.length; i++) {
6261
table[i] = new LRUCache<>(perSegmentCapacity, builder.listener, builder.weigher);
6362
}
63+
this.capacity = perSegmentCapacity * segments;
6464
}
6565

6666
@SuppressWarnings("unchecked")

server/src/main/java/org/opensearch/monitor/fs/FsProbe.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,11 @@ public FsInfo stats(FsInfo previous) throws IOException {
8181
if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) {
8282
paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes());
8383
paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage());
84-
paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized);
84+
// fileCacheFree will be less than zero if the cache being over-subscribed
85+
long fileCacheFree = paths[i].fileCacheReserved - paths[i].fileCacheUtilized;
86+
if (fileCacheFree > 0) {
87+
paths[i].available -= fileCacheFree;
88+
}
8589
// occurs if reserved file cache space is occupied by other files, like local indices
8690
if (paths[i].available < 0) {
8791
paths[i].available = 0;
@@ -215,4 +219,11 @@ public static FsInfo.Path getFSInfo(NodePath nodePath) throws IOException {
215219
return fsPath;
216220
}
217221

222+
public static long getTotalSize(NodePath nodePath) throws IOException {
223+
return adjustForHugeFilesystems(nodePath.fileStore.getTotalSpace());
224+
}
225+
226+
public static long getAvailableSize(NodePath nodePath) throws IOException {
227+
return adjustForHugeFilesystems(nodePath.fileStore.getUsableSpace());
228+
}
218229
}

server/src/main/java/org/opensearch/node/Node.java

+53-33
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.Build;
3939
import org.opensearch.ExceptionsHelper;
4040
import org.opensearch.OpenSearchException;
41+
import org.opensearch.OpenSearchParseException;
4142
import org.opensearch.OpenSearchTimeoutException;
4243
import org.opensearch.Version;
4344
import org.opensearch.action.ActionModule;
@@ -108,6 +109,7 @@
108109
import org.opensearch.common.settings.Settings;
109110
import org.opensearch.common.settings.SettingsException;
110111
import org.opensearch.common.settings.SettingsModule;
112+
import org.opensearch.common.unit.RatioValue;
111113
import org.opensearch.common.unit.TimeValue;
112114
import org.opensearch.common.util.BigArrays;
113115
import org.opensearch.common.util.FeatureFlags;
@@ -176,7 +178,6 @@
176178
import org.opensearch.ingest.IngestService;
177179
import org.opensearch.monitor.MonitorService;
178180
import org.opensearch.monitor.fs.FsHealthService;
179-
import org.opensearch.monitor.fs.FsInfo;
180181
import org.opensearch.monitor.fs.FsProbe;
181182
import org.opensearch.monitor.jvm.JvmInfo;
182183
import org.opensearch.node.remotestore.RemoteStoreNodeService;
@@ -372,9 +373,12 @@ public class Node implements Closeable {
372373
}
373374
}, Setting.Property.NodeScope);
374375

375-
public static final Setting<ByteSizeValue> NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting(
376+
private static final String ZERO = "0";
377+
378+
public static final Setting<String> NODE_SEARCH_CACHE_SIZE_SETTING = new Setting<>(
376379
"node.search.cache.size",
377-
ByteSizeValue.ZERO,
380+
s -> (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) || DiscoveryNode.isDedicatedSearchNode(s)) ? "80%" : ZERO,
381+
Node::validateFileCacheSize,
378382
Property.NodeScope
379383
);
380384

@@ -2002,43 +2006,59 @@ DiscoveryNode getNode() {
20022006
* Initializes the search cache with a defined capacity.
20032007
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
20042008
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
2005-
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
2009+
* Else it configures the size to 80% of total capacity for a dedicated search node, if not explicitly defined.
20062010
*/
20072011
private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
20082012
boolean isWritableRemoteIndexEnabled = FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING);
2009-
if (DiscoveryNode.isSearchNode(settings) || isWritableRemoteIndexEnabled) {
2010-
NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
2011-
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
2012-
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath));
2013-
long availableCapacity = info.getAvailable().getBytes();
2014-
2015-
// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
2016-
if (capacity == 0) {
2017-
// If node is not a dedicated search node without configuration, prevent cache initialization
2018-
if (!isWritableRemoteIndexEnabled
2019-
&& DiscoveryNode.getRolesFromSettings(settings)
2020-
.stream()
2021-
.anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
2022-
throw new SettingsException(
2023-
"Unable to initialize the "
2024-
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
2025-
+ "-"
2026-
+ DiscoveryNodeRole.DATA_ROLE.roleName()
2027-
+ " node: Missing value for configuration "
2028-
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
2029-
);
2030-
} else {
2031-
capacity = 80 * availableCapacity / 100;
2032-
}
2013+
if (DiscoveryNode.isSearchNode(settings) == false && isWritableRemoteIndexEnabled == false) {
2014+
return;
2015+
}
2016+
2017+
String capacityRaw = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings);
2018+
logger.info("cache size [{}]", capacityRaw);
2019+
if (capacityRaw.equals(ZERO)) {
2020+
throw new SettingsException(
2021+
"Unable to initialize the "
2022+
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
2023+
+ "-"
2024+
+ DiscoveryNodeRole.DATA_ROLE.roleName()
2025+
+ " node: Missing value for configuration "
2026+
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
2027+
);
2028+
}
2029+
2030+
NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
2031+
long totalSpace = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getTotalSize(fileCacheNodePath));
2032+
long capacity = calculateFileCacheSize(capacityRaw, totalSpace);
2033+
if (capacity <= 0 || totalSpace <= capacity) {
2034+
throw new SettingsException("Cache size must be larger than zero and less than total capacity");
2035+
}
2036+
2037+
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
2038+
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
2039+
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
2040+
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
2041+
}
2042+
2043+
private static long calculateFileCacheSize(String capacityRaw, long totalSpace) {
2044+
try {
2045+
RatioValue ratioValue = RatioValue.parseRatioValue(capacityRaw);
2046+
return Math.round(totalSpace * ratioValue.getAsRatio());
2047+
} catch (OpenSearchParseException e) {
2048+
try {
2049+
return ByteSizeValue.parseBytesSizeValue(capacityRaw, NODE_SEARCH_CACHE_SIZE_SETTING.getKey()).getBytes();
2050+
} catch (OpenSearchParseException ex) {
2051+
ex.addSuppressed(e);
2052+
throw ex;
20332053
}
2034-
capacity = Math.min(capacity, availableCapacity);
2035-
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
2036-
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
2037-
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
2038-
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
20392054
}
20402055
}
20412056

2057+
private static String validateFileCacheSize(String capacityRaw) {
2058+
calculateFileCacheSize(capacityRaw, 0L);
2059+
return capacityRaw;
2060+
}
2061+
20422062
/**
20432063
* Returns the {@link FileCache} instance for remote search node
20442064
* Note: Visible for testing

server/src/test/java/org/opensearch/env/NodeRepurposeCommandTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void createNodePaths() throws IOException {
9595
dataClusterManagerSettings = buildEnvSettings(Settings.EMPTY);
9696
Settings defaultSearchSettings = Settings.builder()
9797
.put(dataClusterManagerSettings)
98-
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB))
98+
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB).toString())
9999
.build();
100100

101101
searchNoDataNoClusterManagerSettings = onlyRole(dataClusterManagerSettings, DiscoveryNodeRole.SEARCH_ROLE);

server/src/test/java/org/opensearch/node/NodeTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ public void testCreateWithFileCache() throws Exception {
380380
List<Class<? extends Plugin>> plugins = basePlugins();
381381
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
382382
Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings)
383-
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize)
383+
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
384384
.build();
385385
Settings onlySearchRoleSettings = Settings.builder()
386386
.put(searchRoleSettingsWithConfig)

test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@
165165
import static org.opensearch.test.NodeRoles.onlyRoles;
166166
import static org.opensearch.test.NodeRoles.removeRoles;
167167
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
168+
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;
168169
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
169170
import static org.hamcrest.Matchers.equalTo;
170171
import static org.hamcrest.Matchers.greaterThan;
@@ -216,7 +217,8 @@ public final class InternalTestCluster extends TestCluster {
216217
nodeAndClient.node.settings()
217218
);
218219

219-
private static final ByteSizeValue DEFAULT_SEARCH_CACHE_SIZE = new ByteSizeValue(2, ByteSizeUnit.GB);
220+
private static final String DEFAULT_SEARCH_CACHE_SIZE_BYTES = "2gb";
221+
private static final String DEFAULT_SEARCH_CACHE_SIZE_PERCENT = "5%";
220222

221223
public static final int DEFAULT_LOW_NUM_CLUSTER_MANAGER_NODES = 1;
222224
public static final int DEFAULT_HIGH_NUM_CLUSTER_MANAGER_NODES = 3;
@@ -700,8 +702,10 @@ public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) {
700702
logger.info("increasing cluster size from {} to {}", size, n);
701703
Set<DiscoveryNodeRole> searchAndDataRoles = Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE);
702704
Settings settings = Settings.builder()
703-
.put(Settings.EMPTY)
704-
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), DEFAULT_SEARCH_CACHE_SIZE)
705+
.put(
706+
Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(),
707+
randomBoolean() ? DEFAULT_SEARCH_CACHE_SIZE_PERCENT : DEFAULT_SEARCH_CACHE_SIZE_BYTES
708+
)
705709
.build();
706710
startNodes(n - size, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build());
707711
validateClusterFormed();

0 commit comments

Comments
 (0)