forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncShardBatchFetch.java
251 lines (226 loc) · 9.99 KB
/
AsyncShardBatchFetch.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.gateway;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;
import java.lang.reflect.Array;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import reactor.util.annotation.NonNull;
/**
* Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch
* part using the base class {@link AsyncShardFetch}. Other functionalities needed for a batch are only written here.
* This separation also takes care of the extra generic type V which is only needed for batch
* transport actions like {@link TransportNodesListGatewayStartedShardsBatch} and
* {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch}.
*
* @param <T> Response type of the transport action.
* @param <V> Data type of shard level response.
*
* @opensearch.internal
*/
public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extends AsyncShardFetch<T> {
@SuppressWarnings("unchecked")
AsyncShardBatchFetch(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
AsyncShardFetch.Lister<? extends BaseNodesResponse<T>, T> action,
String batchId,
Class<V> clazz,
V emptyShardResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(
logger,
type,
shardAttributesMap,
action,
batchId,
new ShardBatchCache<>(
logger,
type,
shardAttributesMap,
"BatchID=[" + batchId + "]",
clazz,
emptyShardResponse,
emptyShardResponsePredicate,
responseFactory
)
);
}
/**
* Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's
* assigned or failed.
*
* @param shardId shardId to be removed from the batch.
*/
public synchronized void clearShard(ShardId shardId) {
this.shardAttributesMap.remove(shardId);
this.cache.deleteShard(shardId);
}
public boolean hasEmptyCache() {
return this.cache.getCache().isEmpty();
}
public AsyncShardFetchCache<T> getCache() {
return this.cache;
}
/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
* {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching
* approach. This cache class is not thread safe, all of its methods are being called from
* {@link AsyncShardFetch} class which has synchronized blocks present to handle multiple threads.
*
* @param <T> Response type of transport action.
* @param <V> Data type of shard level response.
*/
static class ShardBatchCache<T extends BaseNodeResponse, V> extends AsyncShardFetchCache<T> {
private final Map<String, NodeEntry<V>> cache;
private final Map<ShardId, Integer> shardIdToArray;
private final int batchSize;
private final Class<V> shardResponseClass;
private final ShardBatchResponseFactory<T, V> responseFactory;
private final V emptyResponse;
private final Predicate<V> emptyShardResponsePredicate;
private final Logger logger;
public ShardBatchCache(
Logger logger,
String type,
Map<ShardId, ShardAttributes> shardAttributesMap,
String logKey,
Class<V> clazz,
V emptyResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
) {
super(Loggers.getLogger(logger, "_" + logKey), type);
this.batchSize = shardAttributesMap.size();
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
cache = new HashMap<>();
shardIdToArray = new HashMap<>();
fillShardIdKeys(shardAttributesMap.keySet());
this.shardResponseClass = clazz;
this.emptyResponse = emptyResponse;
this.logger = logger;
this.responseFactory = responseFactory;
}
@Override
@NonNull
public Map<String, ? extends BaseNodeEntry> getCache() {
return cache;
}
@Override
public void deleteShard(ShardId shardId) {
if (shardIdToArray.containsKey(shardId)) {
Integer shardIdIndex = shardIdToArray.remove(shardId);
for (String nodeId : cache.keySet()) {
cache.get(nodeId).clearShard(shardIdIndex);
}
}
}
@Override
public void initData(DiscoveryNode node) {
cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate));
}
/**
* Put the response received from data nodes into the cache.
* Get shard level data from batch, then filter out if any shards received failures.
* After that complete storing the data at node level and mark fetching as done.
*
* @param node node from which we got the response.
* @param response shard metadata coming from node.
*/
@Override
public void putData(DiscoveryNode node, T response) {
NodeEntry<V> nodeEntry = cache.get(node.getId());
Map<ShardId, V> batchResponse = responseFactory.getShardBatchData(response);
nodeEntry.doneFetching(batchResponse, shardIdToArray);
}
@Override
public T getData(DiscoveryNode node) {
return this.responseFactory.getNewResponse(node, getBatchData(cache.get(node.getId())));
}
private HashMap<ShardId, V> getBatchData(NodeEntry<V> nodeEntry) {
V[] nodeShardEntries = nodeEntry.getData();
boolean[] emptyResponses = nodeEntry.getEmptyShardResponse();
HashMap<ShardId, V> shardData = new HashMap<>();
for (Map.Entry<ShardId, Integer> shardIdEntry : shardIdToArray.entrySet()) {
ShardId shardId = shardIdEntry.getKey();
Integer arrIndex = shardIdEntry.getValue();
if (emptyResponses[arrIndex]) {
shardData.put(shardId, emptyResponse);
} else if (nodeShardEntries[arrIndex] != null) {
// ignore null responses here
shardData.put(shardId, nodeShardEntries[arrIndex]);
}
}
return shardData;
}
private void fillShardIdKeys(Set<ShardId> shardIds) {
int shardIdIndex = 0;
for (ShardId shardId : shardIds) {
this.shardIdToArray.putIfAbsent(shardId, shardIdIndex++);
}
}
/**
* A node entry, holding the state of the fetched data for a specific shard
* for a giving node.
*/
static class NodeEntry<V> extends BaseNodeEntry {
private final V[] shardData;
private final boolean[] emptyShardResponse; // we can not rely on null entries of the shardData array,
// those null entries means that we need to ignore those entries. Empty responses on the other hand are
// actually needed in allocation/explain API response. So instead of storing full empty response object
// in cache, it's better to just store a boolean and create that object on the fly just before
// decision-making.
private final Predicate<V> emptyShardResponsePredicate;
NodeEntry(String nodeId, Class<V> clazz, int batchSize, Predicate<V> emptyShardResponsePredicate) {
super(nodeId);
this.shardData = (V[]) Array.newInstance(clazz, batchSize);
this.emptyShardResponse = new boolean[batchSize];
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
}
void doneFetching(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
fillShardData(shardDataFromNode, shardIdKey);
super.doneFetching();
}
void clearShard(Integer shardIdIndex) {
this.shardData[shardIdIndex] = null;
emptyShardResponse[shardIdIndex] = false;
}
V[] getData() {
return this.shardData;
}
boolean[] getEmptyShardResponse() {
return emptyShardResponse;
}
private void fillShardData(Map<ShardId, V> shardDataFromNode, Map<ShardId, Integer> shardIdKey) {
for (Map.Entry<ShardId, V> shardData : shardDataFromNode.entrySet()) {
if (shardData.getValue() != null) {
ShardId shardId = shardData.getKey();
if (emptyShardResponsePredicate.test(shardData.getValue())) {
this.emptyShardResponse[shardIdKey.get(shardId)] = true;
this.shardData[shardIdKey.get(shardId)] = null;
} else {
this.shardData[shardIdKey.get(shardId)] = shardData.getValue();
}
}
}
}
}
}
}