Skip to content

Commit 768585e

Browse files
committed
Add read flow for remote routing
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent b9ca5a8 commit 768585e

File tree

3 files changed

+482
-9
lines changed

3 files changed

+482
-9
lines changed

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

+135-2
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,39 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.cluster.DiffableUtils;
14+
import org.opensearch.cluster.routing.IndexRoutingTable;
15+
import org.opensearch.cluster.routing.RoutingTable;
16+
import org.opensearch.common.blobstore.BlobContainer;
1317
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
18+
import org.opensearch.common.settings.Setting;
1419
import org.opensearch.common.settings.Settings;
1520
import org.opensearch.common.util.io.IOUtils;
21+
import org.opensearch.core.action.ActionListener;
22+
import org.opensearch.core.common.io.stream.StreamInput;
23+
import org.opensearch.core.common.io.stream.StreamOutput;
24+
import org.opensearch.core.index.Index;
25+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
26+
import org.opensearch.gateway.remote.RemoteClusterStateService;
27+
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
1628
import org.opensearch.node.Node;
1729
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
1830
import org.opensearch.repositories.RepositoriesService;
1931
import org.opensearch.repositories.Repository;
2032
import org.opensearch.repositories.blobstore.BlobStoreRepository;
33+
import org.opensearch.threadpool.ThreadPool;
2134

2235
import java.io.IOException;
36+
import java.io.InputStream;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Optional;
41+
import java.util.Set;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.function.Function;
2344
import java.util.function.Supplier;
45+
import java.util.stream.Collectors;
2446

2547
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
2648

@@ -31,19 +53,130 @@
3153
*/
3254
public class RemoteRoutingTableService extends AbstractLifecycleComponent {
3355

56+
/**
57+
* Cluster setting to specify if routing table should be published to remote store
58+
*/
59+
public static final Setting<Boolean> REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting(
60+
"cluster.remote_store.routing.enabled",
61+
true,
62+
Setting.Property.NodeScope,
63+
Setting.Property.Final
64+
);
65+
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
66+
public static final String ROUTING_TABLE = "routing-table";
67+
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
68+
public static final String DELIMITER = "__";
69+
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
3470
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
3571
private final Settings settings;
3672
private final Supplier<RepositoriesService> repositoriesService;
3773
private BlobStoreRepository blobStoreRepository;
74+
private final ThreadPool threadPool;
3875

39-
public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
76+
private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
77+
@Override
78+
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
79+
value.writeTo(out);
80+
}
81+
82+
@Override
83+
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
84+
return IndexRoutingTable.readFrom(in);
85+
}
86+
};
87+
88+
89+
public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
90+
Settings settings, ThreadPool threadPool) {
4091
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
4192
this.repositoriesService = repositoriesService;
4293
this.settings = settings;
94+
this.threadPool = threadPool;
95+
}
96+
97+
public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingToUpload, Set<String> indicesRoutingToDelete) {
98+
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting()
99+
.stream()
100+
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity()));
101+
102+
indicesRoutingToUpload.forEach(
103+
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting)
104+
);
105+
106+
indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove);
107+
108+
logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting);
109+
110+
return new ArrayList<>(allUploadedIndicesRouting.values());
111+
}
112+
113+
114+
private String getIndexRoutingFileName() {
115+
return String.join(
116+
DELIMITER,
117+
INDEX_ROUTING_FILE_PREFIX,
118+
RemoteStoreUtils.invertLong(System.currentTimeMillis())
119+
);
120+
43121
}
44122

123+
public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
124+
String uploadedFilename,
125+
Index index,
126+
LatchedActionListener<IndexRoutingTable> latchedActionListener) {
127+
int idx = uploadedFilename.lastIndexOf("/");
128+
String blobFileName = uploadedFilename.substring(idx+1);
129+
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));
130+
131+
return () -> readAsync(
132+
blobContainer,
133+
blobFileName,
134+
index,
135+
threadPool.executor(ThreadPool.Names.GENERIC),
136+
ActionListener.wrap(response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), latchedActionListener::onFailure)
137+
);
138+
}
139+
140+
public void readAsync(BlobContainer blobContainer, String name, Index index, ExecutorService executorService, ActionListener<RemoteIndexRoutingTable> listener) throws IOException {
141+
executorService.execute(() -> {
142+
try {
143+
listener.onResponse(read(blobContainer, name, index));
144+
} catch (Exception e) {
145+
listener.onFailure(e);
146+
}
147+
});
148+
}
149+
150+
public RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
151+
try {
152+
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
153+
} catch (IOException | AssertionError e) {
154+
logger.info("RoutingTable read failed with error: {}", e.toString());
155+
throw new RemoteClusterStateService.RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
156+
}
157+
}
158+
159+
public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(List<String> updatedIndicesRouting, List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting) {
160+
return updatedIndicesRouting.stream().map(idx -> {
161+
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
162+
assert uploadedIndexMetadataOptional.isPresent() == true;
163+
return uploadedIndexMetadataOptional.get();
164+
}).collect(Collectors.toList());
165+
}
166+
167+
168+
public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) {
169+
return DiffableUtils.diff(
170+
before.getIndicesRouting(),
171+
after.getIndicesRouting(),
172+
DiffableUtils.getStringKeySerializer(),
173+
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
174+
);
175+
}
176+
177+
45178
@Override
46-
protected void doClose() throws IOException {
179+
public void doClose() throws IOException {
47180
if (blobStoreRepository != null) {
48181
IOUtils.close(blobStoreRepository);
49182
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+122
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414
import org.opensearch.Version;
1515
import org.opensearch.action.LatchedActionListener;
1616
import org.opensearch.cluster.ClusterState;
17+
import org.opensearch.cluster.block.ClusterBlocks;
1718
import org.opensearch.cluster.coordination.CoordinationMetadata;
19+
import org.opensearch.cluster.metadata.DiffableStringMap;
1820
import org.opensearch.cluster.metadata.IndexMetadata;
1921
import org.opensearch.cluster.metadata.Metadata;
2022
import org.opensearch.cluster.metadata.TemplatesMetadata;
23+
import org.opensearch.cluster.node.DiscoveryNodes;
24+
import org.opensearch.cluster.routing.IndexRoutingTable;
25+
import org.opensearch.cluster.routing.RoutingTable;
2126
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
2227
import org.opensearch.cluster.service.ClusterService;
2328
import org.opensearch.common.CheckedRunnable;
@@ -33,6 +38,7 @@
3338
import org.opensearch.common.unit.TimeValue;
3439
import org.opensearch.common.util.io.IOUtils;
3540
import org.opensearch.core.action.ActionListener;
41+
import org.opensearch.core.index.Index;
3642
import org.opensearch.core.xcontent.ToXContent;
3743
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
3844
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
@@ -816,6 +822,122 @@ private ClusterMetadataManifest uploadManifest(
816822
}
817823
}
818824

825+
private ClusterState readClusterStateInParallel(
826+
ClusterState previousState,
827+
ClusterMetadataManifest manifest,
828+
String clusterName,
829+
String clusterUUID,
830+
String localNodeId,
831+
List<UploadedIndexMetadata> indicesToRead,
832+
Map<String, UploadedMetadataAttribute> customToRead,
833+
boolean readCoordinationMetadata,
834+
boolean readSettingsMetadata,
835+
boolean readTransientSettingsMetadata,
836+
boolean readTemplatesMetadata,
837+
boolean readDiscoveryNodes,
838+
boolean readClusterBlocks,
839+
List<UploadedIndexMetadata> indicesRoutingToRead,
840+
boolean readHashesOfConsistentSettings,
841+
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead
842+
) throws IOException {
843+
int totalReadTasks =
844+
indicesToRead.size() + customToRead.size() + indicesRoutingToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (
845+
readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0)
846+
+ clusterStateCustomToRead.size();
847+
CountDownLatch latch = new CountDownLatch(totalReadTasks);
848+
List<CheckedRunnable<IOException>> asyncMetadataReadActions = new ArrayList<>();
849+
List<RemoteReadResult> readResults = Collections.synchronizedList(new ArrayList<>());
850+
List<IndexRoutingTable> readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>());
851+
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));
852+
853+
LatchedActionListener<RemoteReadResult> listener = new LatchedActionListener<>(
854+
ActionListener.wrap(
855+
response -> {
856+
logger.debug("Successfully read cluster state component from remote");
857+
readResults.add(response);
858+
},
859+
ex -> {
860+
logger.error("Failed to read cluster state from remote", ex);
861+
exceptionList.add(ex);
862+
}
863+
),
864+
latch
865+
);
866+
867+
for (UploadedIndexMetadata indexMetadata : indicesToRead) {
868+
asyncMetadataReadActions.add(
869+
remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(
870+
clusterUUID,
871+
indexMetadata.getUploadedFilename(),
872+
listener
873+
)
874+
);
875+
}
876+
877+
LatchedActionListener<IndexRoutingTable> routingTableLatchedActionListener = new LatchedActionListener<>(
878+
ActionListener.wrap(
879+
response -> {
880+
logger.debug("Successfully read cluster state component from remote");
881+
readIndexRoutingTableResults.add(response);
882+
},
883+
ex -> {
884+
logger.error("Failed to read cluster state from remote", ex);
885+
exceptionList.add(ex);
886+
}
887+
),
888+
latch
889+
);
890+
891+
for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) {
892+
asyncMetadataReadActions.add(
893+
remoteRoutingTableService.get().getAsyncIndexMetadataReadAction(
894+
indexRouting.getUploadedFilename(),
895+
new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()),
896+
routingTableLatchedActionListener
897+
)
898+
);
899+
}
900+
901+
for (CheckedRunnable<IOException> asyncMetadataReadAction : asyncMetadataReadActions) {
902+
asyncMetadataReadAction.run();
903+
}
904+
905+
try {
906+
if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) {
907+
RemoteStateTransferException exception = new RemoteStateTransferException(
908+
"Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout
909+
);
910+
exceptionList.forEach(exception::addSuppressed);
911+
throw exception;
912+
}
913+
} catch (InterruptedException e) {
914+
exceptionList.forEach(e::addSuppressed);
915+
RemoteStateTransferException ex = new RemoteStateTransferException("Interrupted while waiting to read cluster state from metadata");
916+
Thread.currentThread().interrupt();
917+
throw ex;
918+
}
919+
920+
if (!exceptionList.isEmpty()) {
921+
RemoteStateTransferException exception = new RemoteStateTransferException("Exception during reading cluster state from remote");
922+
exceptionList.forEach(exception::addSuppressed);
923+
throw exception;
924+
}
925+
926+
ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState);
927+
Map<String, IndexRoutingTable> indicesRouting = new HashMap<>(previousState.routingTable().getIndicesRouting());
928+
929+
930+
readIndexRoutingTableResults.forEach(indexRoutingTable -> {
931+
indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable);
932+
});
933+
934+
return clusterStateBuilder.metadata(metadataBuilder)
935+
.version(manifest.getStateVersion())
936+
.stateUUID(manifest.getStateUUID())
937+
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting))
938+
.build();
939+
}
940+
819941
private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
820942
throws IOException {
821943
AtomicReference<String> result = new AtomicReference<String>();

0 commit comments

Comments
 (0)