Skip to content

Commit 633b183

Browse files
committed
Fix snapshot de/ser
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 65a04a2 commit 633b183

File tree

4 files changed

+122
-20
lines changed

4 files changed

+122
-20
lines changed

server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
209209
builder.field("repository_state_id", entry.repositoryStateId);
210210
if (params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API).equals(Metadata.CONTEXT_MODE_GATEWAY)) {
211211
builder.field("state", entry.state().value);
212+
builder.field("uuid", entry.uuid());
212213
} // else we don't serialize it
213214
}
214215
builder.endObject();
@@ -218,6 +219,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
218219
}
219220

220221
public static SnapshotDeletionsInProgress fromXContent(XContentParser parser) throws IOException {
222+
if (parser.currentToken() == null) { // fresh parser? move to the first token
223+
parser.nextToken();
224+
}
225+
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
226+
parser.nextToken();
227+
}
221228
ensureFieldName(parser, parser.currentToken(), TYPE);
222229
parser.nextToken();
223230
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
@@ -229,6 +236,7 @@ public static SnapshotDeletionsInProgress fromXContent(XContentParser parser) th
229236
byte stateValue = -1;
230237
List<SnapshotId> snapshotIds = new ArrayList<>();
231238
TimeValue startTime = null;
239+
String entryUUID = null;
232240
while ((parser.nextToken()) != XContentParser.Token.END_OBJECT) {
233241
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
234242
final String fieldName = parser.currentName();
@@ -270,12 +278,15 @@ public static SnapshotDeletionsInProgress fromXContent(XContentParser parser) th
270278
case "state":
271279
stateValue = (byte) parser.intValue();
272280
break;
281+
case "uuid":
282+
entryUUID = parser.text();
283+
break;
273284
default:
274285
throw new IllegalArgumentException("unknown field [" + fieldName + "]");
275286
}
276287
}
277288
assert startTime != null;
278-
entries.add(new Entry(snapshotIds, repository, startTime.millis(), repositoryStateId, State.fromValue(stateValue)));
289+
entries.add(new Entry(snapshotIds, repository, startTime.millis(), repositoryStateId, State.fromValue(stateValue), entryUUID));
279290
}
280291
return SnapshotDeletionsInProgress.of(entries);
281292
}

server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java

+59-16
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.core.common.io.stream.StreamInput;
4343
import org.opensearch.core.common.io.stream.StreamOutput;
4444
import org.opensearch.core.common.io.stream.Writeable;
45+
import org.opensearch.core.index.Index;
4546
import org.opensearch.core.index.shard.ShardId;
4647
import org.opensearch.core.xcontent.MediaTypeRegistry;
4748
import org.opensearch.core.xcontent.ToXContent;
@@ -66,6 +67,7 @@
6667
import java.util.Set;
6768
import java.util.stream.Collectors;
6869

70+
import static org.opensearch.common.xcontent.XContentUtils.readValue;
6971
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
7072
import static org.opensearch.core.xcontent.XContentParserUtils.ensureFieldName;
7173
import static org.opensearch.core.xcontent.XContentParserUtils.parseStringList;
@@ -740,7 +742,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
740742
builder.field(STATE, status.state());
741743
builder.field(NODE, status.nodeId());
742744
if (params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API).equals(Metadata.CONTEXT_MODE_GATEWAY)) {
743-
builder.field(INDEX_UUID, shardId.getIndex().getUUID());
744745
if (status.generation() != null) builder.field(GENERATION, status.generation());
745746
if (status.reason() != null) builder.field(REASON, status.reason());
746747
}
@@ -770,6 +771,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
770771
if (status.reason() != null) builder.field(REASON, status.reason());
771772
builder.endObject();
772773
}
774+
builder.endArray();
773775
builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy);
774776
}
775777
builder.endObject();
@@ -786,19 +788,18 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
786788
Version version = null;
787789
SnapshotId source = null;
788790
Map<String, Object> metadata = null;
789-
byte state = -1;
791+
State state = null;
790792
List<IndexId> indices = new ArrayList<>();
791793
long startTime = 0;
792794
long repositoryStateId = -1L;
793795
Map<ShardId, ShardSnapshotStatus> shards = new HashMap<>();
794796
List<String> dataStreams = new ArrayList<>();
795797
Map<RepositoryShardId, ShardSnapshotStatus> clones = new HashMap<>();
796798
boolean remoteStoreIndexShallowCopy = false;
797-
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
799+
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
798800
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
799801
String currentFieldName = parser.currentName();
800802
parser.nextToken();
801-
802803
switch (currentFieldName) {
803804
case REPOSITORY:
804805
repository = parser.text();
@@ -816,7 +817,7 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
816817
partial = parser.booleanValue();
817818
break;
818819
case STATE:
819-
state = (byte) parser.intValue();
820+
state = State.fromString(parser.text());
820821
break;
821822
case INDICES:
822823
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
@@ -836,32 +837,28 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
836837
case SHARDS:
837838
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
838839
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
839-
String index = null;
840-
String indexUUID = null;
840+
Index index = null;
841841
int shardId = -1;
842842
String nodeId = null;
843843
ShardState shardState = null;
844844
String reason = null;
845845
String generation = null;
846-
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
846+
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
847847
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
848848
final String currentShardField = parser.currentName();
849849
parser.nextToken();
850850
switch (currentShardField) {
851851
case INDEX:
852-
index = parser.text();
852+
index = Index.fromXContent(parser);
853853
break;
854854
case SHARD:
855855
shardId = parser.intValue();
856856
break;
857-
case INDEX_UUID:
858-
indexUUID = parser.text();
859-
break;
860857
case NODE:
861-
nodeId = parser.text();
858+
nodeId = (String) readValue(parser, parser.currentToken());
862859
break;
863860
case STATE:
864-
shardState = ShardState.fromValue((byte) parser.intValue());
861+
shardState = ShardState.fromString(parser.text());
865862
break;
866863
case REASON:
867864
reason = parser.text();
@@ -873,7 +870,7 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
873870
throw new IllegalArgumentException("unknown field [" + currentShardField + "]");
874871
}
875872
}
876-
shards.put(new ShardId(index, indexUUID, shardId),
873+
shards.put(new ShardId(index, shardId),
877874
reason != null ? new ShardSnapshotStatus(nodeId, shardState, reason, generation) :
878875
new ShardSnapshotStatus(nodeId, shardState, generation));
879876
}
@@ -950,7 +947,7 @@ public static Entry fromXContent(XContentParser parser) throws IOException {
950947
snapshot,
951948
includeGlobalState,
952949
partial,
953-
State.fromValue(state),
950+
state,
954951
indices,
955952
dataStreams,
956953
startTime,
@@ -1203,6 +1200,25 @@ public static State fromValue(byte value) {
12031200
throw new IllegalArgumentException("No snapshot state for value [" + value + "]");
12041201
}
12051202
}
1203+
1204+
public static State fromString(String value) {
1205+
switch(value) {
1206+
case "INIT":
1207+
return INIT;
1208+
case "STARTED":
1209+
return STARTED;
1210+
case "SUCCESS":
1211+
return SUCCESS;
1212+
case "FAILED":
1213+
return FAILED;
1214+
case "ABORTED":
1215+
return ABORTED;
1216+
case "PARTIAL":
1217+
return PARTIAL;
1218+
default:
1219+
throw new IllegalArgumentException("No snapshot state for value [" + value + "]");
1220+
}
1221+
}
12061222
}
12071223

12081224
private final List<Entry> entries;
@@ -1311,6 +1327,12 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
13111327
}
13121328

13131329
public static SnapshotsInProgress fromXContent(XContentParser parser) throws IOException {
1330+
if (parser.currentToken() == null) { // fresh parser? move to the first token
1331+
parser.nextToken();
1332+
}
1333+
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
1334+
parser.nextToken();
1335+
}
13141336
ensureFieldName(parser, parser.currentToken(), SNAPSHOTS);
13151337
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
13161338
List<Entry> entries = new ArrayList<>();
@@ -1380,5 +1402,26 @@ public static ShardState fromValue(byte value) {
13801402
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
13811403
}
13821404
}
1405+
1406+
public static ShardState fromString(String state) {
1407+
switch (state) {
1408+
case "INIT":
1409+
return INIT;
1410+
case "SUCCESS":
1411+
return SUCCESS;
1412+
case "FAILED":
1413+
return FAILED;
1414+
case "ABORTED":
1415+
return ABORTED;
1416+
case "MISSING":
1417+
return MISSING;
1418+
case "WAITING":
1419+
return WAITING;
1420+
case "QUEUED":
1421+
return QUEUED;
1422+
default:
1423+
throw new IllegalArgumentException("No shard snapshot state for value [" + state + "]");
1424+
}
1425+
}
13831426
}
13841427
}

server/src/main/java/org/opensearch/repositories/IndexId.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
138138
}
139139

140140
public static IndexId fromXContent(XContentParser parser) throws IOException {
141-
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
141+
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
142142
String name = null;
143143
String id = null;
144144
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {

server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java

+50-2
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,27 @@
3636
import org.opensearch.cluster.ClusterModule;
3737
import org.opensearch.cluster.ClusterState.Custom;
3838
import org.opensearch.cluster.Diff;
39+
import org.opensearch.cluster.SnapshotDeletionsInProgress;
3940
import org.opensearch.cluster.SnapshotsInProgress;
4041
import org.opensearch.cluster.SnapshotsInProgress.Entry;
4142
import org.opensearch.cluster.SnapshotsInProgress.ShardState;
4243
import org.opensearch.cluster.SnapshotsInProgress.State;
44+
import org.opensearch.cluster.metadata.Metadata;
45+
import org.opensearch.cluster.node.DiscoveryNodes;
4346
import org.opensearch.common.UUIDs;
4447
import org.opensearch.common.io.stream.BytesStreamOutput;
48+
import org.opensearch.common.xcontent.json.JsonXContent;
49+
import org.opensearch.core.common.bytes.BytesReference;
4550
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
4651
import org.opensearch.core.common.io.stream.StreamInput;
4752
import org.opensearch.core.common.io.stream.Writeable;
4853
import org.opensearch.core.index.Index;
4954
import org.opensearch.core.index.shard.ShardId;
55+
import org.opensearch.core.xcontent.MediaType;
56+
import org.opensearch.core.xcontent.MediaTypeRegistry;
57+
import org.opensearch.core.xcontent.ToXContent;
58+
import org.opensearch.core.xcontent.XContentBuilder;
59+
import org.opensearch.core.xcontent.XContentParser;
5060
import org.opensearch.repositories.IndexId;
5161
import org.opensearch.test.AbstractDiffableWireSerializationTestCase;
5262
import org.opensearch.test.VersionUtils;
@@ -60,6 +70,9 @@
6070
import java.util.Map;
6171
import java.util.stream.Collectors;
6272

73+
import static java.lang.Math.abs;
74+
import static java.util.Collections.singletonMap;
75+
import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_GATEWAY;
6376
import static org.opensearch.test.VersionUtils.randomVersion;
6477

6578
public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireSerializationTestCase<Custom> {
@@ -84,7 +97,7 @@ private Entry randomSnapshot() {
8497
for (int i = 0; i < numberOfIndices; i++) {
8598
indices.add(new IndexId(randomAlphaOfLength(10), randomAlphaOfLength(10)));
8699
}
87-
long startTime = randomLong();
100+
long startTime = abs(randomLong());
88101
long repositoryStateId = randomLong();
89102
Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = new HashMap<>();
90103
final List<Index> esIndices = indices.stream()
@@ -183,6 +196,41 @@ protected Custom mutateInstance(Custom instance) {
183196
return SnapshotsInProgress.of(entries);
184197
}
185198

199+
public void testToXContent() throws IOException {
200+
SnapshotsInProgress sip = SnapshotsInProgress.of(List.of(randomSnapshot(), randomSnapshot()));
201+
boolean humanReadable = false;
202+
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
203+
final MediaType mediaType = MediaTypeRegistry.JSON;
204+
BytesReference originalBytes = toShuffledXContent(
205+
sip,
206+
mediaType,
207+
new ToXContent.MapParams(singletonMap(Metadata.CONTEXT_MODE_PARAM, CONTEXT_MODE_GATEWAY)),
208+
humanReadable
209+
);
210+
try (XContentParser parser = createParser(mediaType.xContent(), originalBytes)) {
211+
SnapshotsInProgress parsed = SnapshotsInProgress.fromXContent(parser);
212+
assertEquals(sip, parsed);
213+
}
214+
}
215+
216+
public void testToXContent_deletion() throws IOException {
217+
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(List.of(new SnapshotId("name1", "uuid1")), "repo", 10000000L, 10000L, SnapshotDeletionsInProgress.State.WAITING);
218+
SnapshotDeletionsInProgress sdip = SnapshotDeletionsInProgress.of(List.of(entry));
219+
boolean humanReadable = false;
220+
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
221+
final MediaType mediaType = MediaTypeRegistry.JSON;
222+
BytesReference originalBytes = toShuffledXContent(
223+
sdip,
224+
mediaType,
225+
new ToXContent.MapParams(singletonMap(Metadata.CONTEXT_MODE_PARAM, CONTEXT_MODE_GATEWAY)),
226+
humanReadable
227+
);
228+
try (XContentParser parser = createParser(mediaType.xContent(), originalBytes)) {
229+
SnapshotDeletionsInProgress parsed = SnapshotDeletionsInProgress.fromXContent(parser);
230+
assertEquals(sdip, parsed);
231+
}
232+
}
233+
186234
public void testSerDeRemoteStoreIndexShallowCopy() throws IOException {
187235
SnapshotsInProgress.Entry entry = new SnapshotsInProgress.Entry(
188236
new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())),
@@ -191,7 +239,7 @@ public void testSerDeRemoteStoreIndexShallowCopy() throws IOException {
191239
SnapshotsInProgressSerializationTests.randomState(Map.of()),
192240
Collections.emptyList(),
193241
Collections.emptyList(),
194-
Math.abs(randomLong()),
242+
abs(randomLong()),
195243
randomIntBetween(0, 1000),
196244
Map.of(),
197245
null,

0 commit comments

Comments
 (0)