Skip to content

Commit 1717b55

Browse files
sachinpkaleSachin Kale
and
Sachin Kale
authored
Add timestamp pinning service and scheduler to update in-memory state (#15180)
--------- Signed-off-by: Sachin Kale <kalsac@amazon.com> Co-authored-by: Sachin Kale <kalsac@amazon.com>
1 parent 01acf1c commit 1717b55

File tree

9 files changed

+727
-3
lines changed

9 files changed

+727
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.opensearch.common.collect.Tuple;
12+
import org.opensearch.common.unit.TimeValue;
13+
import org.opensearch.core.action.ActionListener;
14+
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
15+
import org.opensearch.test.OpenSearchIntegTestCase;
16+
17+
import java.util.Set;
18+
19+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
20+
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
21+
static final String INDEX_NAME = "remote-store-test-idx-1";
22+
23+
ActionListener<Void> noOpActionListener = new ActionListener<>() {
24+
@Override
25+
public void onResponse(Void unused) {}
26+
27+
@Override
28+
public void onFailure(Exception e) {}
29+
};
30+
31+
public void testTimestampPinUnpin() throws Exception {
32+
prepareCluster(1, 1, INDEX_NAME, 0, 2);
33+
ensureGreen(INDEX_NAME);
34+
35+
RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
36+
RemoteStorePinnedTimestampService.class,
37+
primaryNodeName(INDEX_NAME)
38+
);
39+
40+
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps();
41+
long lastFetchTimestamp = pinnedTimestampWithFetchTimestamp.v1();
42+
assertEquals(-1L, lastFetchTimestamp);
43+
assertEquals(Set.of(), pinnedTimestampWithFetchTimestamp.v2());
44+
45+
assertThrows(
46+
IllegalArgumentException.class,
47+
() -> remoteStorePinnedTimestampService.pinTimestamp(1234L, "ss1", noOpActionListener)
48+
);
49+
50+
long timestamp1 = System.currentTimeMillis() + 30000L;
51+
long timestamp2 = System.currentTimeMillis() + 60000L;
52+
long timestamp3 = System.currentTimeMillis() + 900000L;
53+
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
54+
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
55+
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);
56+
57+
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));
58+
59+
assertBusy(() -> {
60+
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_2 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
61+
long lastFetchTimestamp_2 = pinnedTimestampWithFetchTimestamp_2.v1();
62+
assertTrue(lastFetchTimestamp_2 != -1);
63+
assertEquals(Set.of(timestamp1, timestamp2, timestamp3), pinnedTimestampWithFetchTimestamp_2.v2());
64+
});
65+
66+
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));
67+
68+
// This should be a no-op as pinning entity is different
69+
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
70+
// Unpinning already pinned entity
71+
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);
72+
// Adding different entity to already pinned timestamp
73+
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);
74+
75+
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueSeconds(1));
76+
77+
assertBusy(() -> {
78+
Tuple<Long, Set<Long>> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
79+
long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1();
80+
assertTrue(lastFetchTimestamp_3 != -1);
81+
assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2());
82+
});
83+
84+
remoteStorePinnedTimestampService.setPinnedTimestampsSchedulerInterval(TimeValue.timeValueMinutes(3));
85+
}
86+
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import org.opensearch.node.Node.DiscoverySettings;
143143
import org.opensearch.node.NodeRoleSettings;
144144
import org.opensearch.node.remotestore.RemoteStoreNodeService;
145+
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
145146
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
146147
import org.opensearch.persistent.PersistentTasksClusterService;
147148
import org.opensearch.persistent.decider.EnableAssignmentDecider;
@@ -760,6 +761,8 @@ public void apply(Settings value, Settings current, Settings previous) {
760761
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
761762
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,
762763

764+
RemoteStorePinnedTimestampService.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,
765+
763766
// Composite index settings
764767
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,
765768

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote.model;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.io.Streams;
14+
import org.opensearch.common.remote.BlobPathParameters;
15+
import org.opensearch.common.remote.RemoteWriteableBlobEntity;
16+
import org.opensearch.core.common.io.stream.StreamInput;
17+
import org.opensearch.core.common.io.stream.StreamOutput;
18+
import org.opensearch.core.common.io.stream.Writeable;
19+
import org.opensearch.core.compress.Compressor;
20+
import org.opensearch.index.remote.RemoteStoreUtils;
21+
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat;
22+
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
31+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
32+
33+
/**
34+
* Wrapper class for uploading/downloading {@link RemotePinnedTimestamps} to/from remote blob store
35+
*
36+
* @opensearch.internal
37+
*/
38+
public class RemotePinnedTimestamps extends RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> {
39+
private static final Logger logger = LogManager.getLogger(RemotePinnedTimestamps.class);
40+
41+
/**
42+
* Represents a collection of pinned timestamps and their associated pinning entities.
43+
* This class is thread-safe and implements the Writeable interface for serialization.
44+
*/
45+
public static class PinnedTimestamps implements Writeable {
46+
private final Map<Long, List<String>> pinnedTimestampPinningEntityMap;
47+
48+
public PinnedTimestamps(Map<Long, List<String>> pinnedTimestampPinningEntityMap) {
49+
this.pinnedTimestampPinningEntityMap = new ConcurrentHashMap<>(pinnedTimestampPinningEntityMap);
50+
}
51+
52+
@Override
53+
public void writeTo(StreamOutput out) throws IOException {
54+
out.writeMap(pinnedTimestampPinningEntityMap, StreamOutput::writeLong, StreamOutput::writeStringCollection);
55+
}
56+
57+
public static PinnedTimestamps readFrom(StreamInput in) throws IOException {
58+
return new PinnedTimestamps(in.readMap(StreamInput::readLong, StreamInput::readStringList));
59+
}
60+
61+
/**
62+
* Pins a timestamp against a pinning entity.
63+
*
64+
* @param timestamp The timestamp to pin.
65+
* @param pinningEntity The entity pinning the timestamp.
66+
*/
67+
public void pin(Long timestamp, String pinningEntity) {
68+
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
69+
pinnedTimestampPinningEntityMap.computeIfAbsent(timestamp, k -> new ArrayList<>()).add(pinningEntity);
70+
}
71+
72+
/**
73+
* Unpins a timestamp for a specific pinning entity.
74+
*
75+
* @param timestamp The timestamp to unpin.
76+
* @param pinningEntity The entity unpinning the timestamp.
77+
*/
78+
public void unpin(Long timestamp, String pinningEntity) {
79+
logger.debug("Unpinning timestamp = {} against entity = {}", timestamp, pinningEntity);
80+
if (pinnedTimestampPinningEntityMap.containsKey(timestamp) == false
81+
|| pinnedTimestampPinningEntityMap.get(timestamp).contains(pinningEntity) == false) {
82+
logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity);
83+
}
84+
pinnedTimestampPinningEntityMap.compute(timestamp, (k, v) -> {
85+
v.remove(pinningEntity);
86+
return v.isEmpty() ? null : v;
87+
});
88+
}
89+
90+
public Map<Long, List<String>> getPinnedTimestampPinningEntityMap() {
91+
return new HashMap<>(pinnedTimestampPinningEntityMap);
92+
}
93+
}
94+
95+
public static final String PINNED_TIMESTAMPS = "pinned_timestamps";
96+
public static final ChecksumWritableBlobStoreFormat<PinnedTimestamps> PINNED_TIMESTAMPS_FORMAT = new ChecksumWritableBlobStoreFormat<>(
97+
PINNED_TIMESTAMPS,
98+
PinnedTimestamps::readFrom
99+
);
100+
101+
private PinnedTimestamps pinnedTimestamps;
102+
103+
public RemotePinnedTimestamps(String clusterUUID, Compressor compressor) {
104+
super(clusterUUID, compressor);
105+
pinnedTimestamps = new PinnedTimestamps(new HashMap<>());
106+
}
107+
108+
@Override
109+
public BlobPathParameters getBlobPathParameters() {
110+
return new BlobPathParameters(List.of(PINNED_TIMESTAMPS), PINNED_TIMESTAMPS);
111+
}
112+
113+
@Override
114+
public String getType() {
115+
return PINNED_TIMESTAMPS;
116+
}
117+
118+
@Override
119+
public String generateBlobFileName() {
120+
return this.blobFileName = String.join(DELIMITER, PINNED_TIMESTAMPS, RemoteStoreUtils.invertLong(System.currentTimeMillis()));
121+
}
122+
123+
@Override
124+
public InputStream serialize() throws IOException {
125+
return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput();
126+
}
127+
128+
@Override
129+
public PinnedTimestamps deserialize(InputStream inputStream) throws IOException {
130+
return PINNED_TIMESTAMPS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
131+
}
132+
133+
public void setBlobFileName(String blobFileName) {
134+
this.blobFileName = blobFileName;
135+
}
136+
137+
public void setPinnedTimestamps(PinnedTimestamps pinnedTimestamps) {
138+
this.pinnedTimestamps = pinnedTimestamps;
139+
}
140+
141+
public PinnedTimestamps getPinnedTimestamps() {
142+
return pinnedTimestamps;
143+
}
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote.model;
10+
11+
import org.opensearch.common.blobstore.BlobPath;
12+
import org.opensearch.common.remote.RemoteWriteableBlobEntity;
13+
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
14+
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
15+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
16+
import org.opensearch.threadpool.ThreadPool;
17+
18+
/**
19+
* Extends the RemoteClusterStateBlobStore to support {@link RemotePinnedTimestamps}
20+
*/
21+
public class RemoteStorePinnedTimestampsBlobStore extends RemoteWriteableEntityBlobStore<
22+
RemotePinnedTimestamps.PinnedTimestamps,
23+
RemotePinnedTimestamps> {
24+
25+
public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps";
26+
private final BlobStoreRepository blobStoreRepository;
27+
28+
public RemoteStorePinnedTimestampsBlobStore(
29+
BlobStoreTransferService blobStoreTransferService,
30+
BlobStoreRepository blobStoreRepository,
31+
String clusterName,
32+
ThreadPool threadPool,
33+
String executor
34+
) {
35+
super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool, executor, PINNED_TIMESTAMPS_PATH_TOKEN);
36+
this.blobStoreRepository = blobStoreRepository;
37+
}
38+
39+
@Override
40+
public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<RemotePinnedTimestamps.PinnedTimestamps> obj) {
41+
return blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN);
42+
}
43+
}

server/src/main/java/org/opensearch/node/Node.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@
185185
import org.opensearch.monitor.fs.FsProbe;
186186
import org.opensearch.monitor.jvm.JvmInfo;
187187
import org.opensearch.node.remotestore.RemoteStoreNodeService;
188+
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
188189
import org.opensearch.node.resource.tracker.NodeResourceUsageTracker;
189190
import org.opensearch.persistent.PersistentTasksClusterService;
190191
import org.opensearch.persistent.PersistentTasksExecutor;
@@ -810,6 +811,18 @@ protected Node(
810811
remoteIndexPathUploader = null;
811812
remoteClusterStateCleanupManager = null;
812813
}
814+
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService;
815+
if (isRemoteStoreAttributePresent(settings)) {
816+
remoteStorePinnedTimestampService = new RemoteStorePinnedTimestampService(
817+
repositoriesServiceReference::get,
818+
settings,
819+
threadPool,
820+
clusterService
821+
);
822+
resourcesToClose.add(remoteStorePinnedTimestampService);
823+
} else {
824+
remoteStorePinnedTimestampService = null;
825+
}
813826

814827
// collect engine factory providers from plugins
815828
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
@@ -1173,7 +1186,8 @@ protected Node(
11731186
clusterModule.getIndexNameExpressionResolver(),
11741187
repositoryService,
11751188
transportService,
1176-
actionModule.getActionFilters()
1189+
actionModule.getActionFilters(),
1190+
remoteStorePinnedTimestampService
11771191
);
11781192
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
11791193
settings,
@@ -1426,6 +1440,7 @@ protected Node(
14261440
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
14271441
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
14281442
b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader);
1443+
b.bind(RemoteStorePinnedTimestampService.class).toProvider(() -> remoteStorePinnedTimestampService);
14291444
b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager);
14301445
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
14311446
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
@@ -1581,6 +1596,12 @@ public Node start() throws NodeValidationException {
15811596
if (remoteIndexPathUploader != null) {
15821597
remoteIndexPathUploader.start();
15831598
}
1599+
final RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = injector.getInstance(
1600+
RemoteStorePinnedTimestampService.class
1601+
);
1602+
if (remoteStorePinnedTimestampService != null) {
1603+
remoteStorePinnedTimestampService.start();
1604+
}
15841605
// Load (and maybe upgrade) the metadata stored on disk
15851606
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
15861607
gatewayMetaState.start(

0 commit comments

Comments
 (0)