32
32
33
33
package org .opensearch .action .bulk ;
34
34
35
+ import org .opensearch .action .DocWriteRequest ;
36
+ import org .opensearch .action .delete .DeleteRequest ;
37
+ import org .opensearch .action .index .IndexRequest ;
35
38
import org .opensearch .action .support .WriteRequest .RefreshPolicy ;
39
+ import org .opensearch .action .update .UpdateRequest ;
40
+ import org .opensearch .common .io .stream .BytesStreamOutput ;
36
41
import org .opensearch .core .index .shard .ShardId ;
37
42
import org .opensearch .test .OpenSearchTestCase ;
38
43
44
+ import java .io .IOException ;
45
+
39
46
import static org .apache .lucene .tests .util .TestUtil .randomSimpleString ;
40
47
41
48
public class BulkShardRequestTests extends OpenSearchTestCase {
@@ -55,4 +62,73 @@ public void testToString() {
55
62
assertEquals ("BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh" , r .toString ());
56
63
assertEquals ("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]" , r .getDescription ());
57
64
}
65
+
66
+ public void testBulkShardRequestSerialization () throws IOException {
67
+ final String index = randomSimpleString (random (), 10 );
68
+ final int count = between (2 , 100 );
69
+ final ShardId shardId = new ShardId (index , "ignored" , 0 );
70
+ final RefreshPolicy refreshPolicy = randomFrom (RefreshPolicy .values ());
71
+ final BulkShardRequest expected = new BulkShardRequest (shardId , refreshPolicy , generateBulkItemRequests (count ));
72
+
73
+ final BytesStreamOutput out = new BytesStreamOutput ();
74
+
75
+ expected .writeTo (out );
76
+
77
+ final BulkShardRequest actual = new BulkShardRequest (out .bytes ().streamInput ());
78
+
79
+ assertEquals (expected .getParentTask ().getId (), actual .getParentTask ().getId ());
80
+ assertEquals (expected .getParentTask ().getNodeId (), actual .getParentTask ().getNodeId ());
81
+
82
+ assertEquals (expected .shardId (), actual .shardId ());
83
+ assertEquals (expected .waitForActiveShards (), actual .waitForActiveShards ());
84
+ assertEquals (expected .timeout (), actual .timeout ());
85
+ assertEquals (expected .index (), actual .index ());
86
+ assertEquals (expected .routedBasedOnClusterVersion (), actual .routedBasedOnClusterVersion ());
87
+
88
+ assertEquals (expected .getRefreshPolicy (), actual .getRefreshPolicy ());
89
+
90
+ assertEquals (expected .items ().length , actual .items ().length );
91
+ for (int i = 0 ; i < count ; ++i ) {
92
+ final BulkItemRequest expectedItem = expected .items ()[i ];
93
+ final BulkItemRequest actualItem = actual .items ()[i ];
94
+ if (null == expectedItem ) {
95
+ assertNull (actualItem );
96
+ continue ;
97
+ }
98
+ assertEquals (expectedItem .id (), actualItem .id ());
99
+ assertEquals (expectedItem .request ().id (), actualItem .request ().id ());
100
+ assertEquals (expectedItem .request ().index (), actualItem .request ().index ());
101
+ assertEquals (expectedItem .request ().opType (), actualItem .request ().opType ());
102
+ }
103
+ }
104
+
105
+ private BulkItemRequest [] generateBulkItemRequests (final int count ) {
106
+ final BulkItemRequest [] items = new BulkItemRequest [count ];
107
+ final int nullIdx = randomIntBetween (0 , count - 1 );
108
+ for (int i = 0 ; i < count ; i ++) {
109
+ if (i == nullIdx ) {
110
+ items [i ] = null ;
111
+ continue ;
112
+ }
113
+ final DocWriteRequest <?> request ;
114
+ switch (randomFrom (DocWriteRequest .OpType .values ())) {
115
+ case INDEX :
116
+ request = new IndexRequest ("index" ).id ("id_" + i );
117
+ break ;
118
+ case CREATE :
119
+ request = new IndexRequest ("index" ).id ("id_" + i ).create (true );
120
+ break ;
121
+ case UPDATE :
122
+ request = new UpdateRequest ("index" , "id_" + i );
123
+ break ;
124
+ case DELETE :
125
+ request = new DeleteRequest ("index" , "id_" + i );
126
+ break ;
127
+ default :
128
+ throw new AssertionError ("unknown type" );
129
+ }
130
+ items [i ] = new BulkItemRequest (i , request );
131
+ }
132
+ return items ;
133
+ }
58
134
}
0 commit comments