8
8
9
9
package org .opensearch .remotemigration ;
10
10
11
+ import org .opensearch .action .DocWriteResponse ;
11
12
import org .opensearch .action .admin .cluster .repositories .get .GetRepositoriesRequest ;
12
13
import org .opensearch .action .admin .cluster .repositories .get .GetRepositoriesResponse ;
14
+ import org .opensearch .action .bulk .BulkRequest ;
15
+ import org .opensearch .action .bulk .BulkResponse ;
16
+ import org .opensearch .action .delete .DeleteResponse ;
17
+ import org .opensearch .action .index .IndexRequest ;
18
+ import org .opensearch .action .index .IndexResponse ;
13
19
import org .opensearch .cluster .metadata .RepositoryMetadata ;
20
+ import org .opensearch .common .UUIDs ;
14
21
import org .opensearch .common .settings .Settings ;
15
22
import org .opensearch .common .util .FeatureFlags ;
16
23
import org .opensearch .repositories .fs .ReloadableFsRepository ;
17
24
import org .opensearch .test .OpenSearchIntegTestCase ;
18
25
19
26
import java .nio .file .Path ;
27
+ import java .util .List ;
20
28
import java .util .concurrent .ExecutionException ;
29
+ import java .util .concurrent .atomic .AtomicBoolean ;
30
+ import java .util .concurrent .atomic .AtomicLong ;
21
31
32
+ import static org .opensearch .node .remotestore .RemoteStoreNodeService .MIGRATION_DIRECTION_SETTING ;
33
+ import static org .opensearch .node .remotestore .RemoteStoreNodeService .REMOTE_STORE_COMPATIBILITY_MODE_SETTING ;
22
34
import static org .opensearch .repositories .fs .ReloadableFsRepository .REPOSITORIES_FAILRATE_SETTING ;
23
35
import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
24
36
@@ -28,8 +40,16 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
28
40
29
41
protected Path segmentRepoPath ;
30
42
protected Path translogRepoPath ;
31
-
32
43
boolean addRemote = false ;
44
+ Settings extraSettings = Settings .EMPTY ;
45
+
46
+ private final List <String > documentKeys = List .of (
47
+ randomAlphaOfLength (5 ),
48
+ randomAlphaOfLength (5 ),
49
+ randomAlphaOfLength (5 ),
50
+ randomAlphaOfLength (5 ),
51
+ randomAlphaOfLength (5 )
52
+ );
33
53
34
54
protected Settings nodeSettings (int nodeOrdinal ) {
35
55
if (segmentRepoPath == null || translogRepoPath == null ) {
@@ -40,6 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
40
60
logger .info ("Adding remote store node" );
41
61
return Settings .builder ()
42
62
.put (super .nodeSettings (nodeOrdinal ))
63
+ .put (extraSettings )
43
64
.put (remoteStoreClusterSettings (REPOSITORY_NAME , segmentRepoPath , REPOSITORY_2_NAME , translogRepoPath ))
44
65
.build ();
45
66
} else {
@@ -64,4 +85,76 @@ protected void setFailRate(String repoName, int value) throws ExecutionException
64
85
client ().admin ().cluster ().preparePutRepository (repoName ).setType (ReloadableFsRepository .TYPE ).setSettings (settings ).get ()
65
86
);
66
87
}
88
+
89
+ public void initDocRepToRemoteMigration () {
90
+ assertTrue (
91
+ internalCluster ().client ()
92
+ .admin ()
93
+ .cluster ()
94
+ .prepareUpdateSettings ()
95
+ .setPersistentSettings (
96
+ Settings .builder ()
97
+ .put (REMOTE_STORE_COMPATIBILITY_MODE_SETTING .getKey (), "mixed" )
98
+ .put (MIGRATION_DIRECTION_SETTING .getKey (), "remote_store" )
99
+ )
100
+ .get ()
101
+ .isAcknowledged ()
102
+ );
103
+ }
104
+
105
+ public BulkResponse indexBulk (String indexName , int numDocs ) {
106
+ BulkRequest bulkRequest = new BulkRequest ();
107
+ for (int i = 0 ; i < numDocs ; i ++) {
108
+ final IndexRequest request = client ().prepareIndex (indexName )
109
+ .setId (UUIDs .randomBase64UUID ())
110
+ .setSource (documentKeys .get (randomIntBetween (0 , documentKeys .size () - 1 )), randomAlphaOfLength (5 ))
111
+ .request ();
112
+ bulkRequest .add (request );
113
+ }
114
+ return client ().bulk (bulkRequest ).actionGet ();
115
+ }
116
+
117
+ private void indexSingleDoc (String indexName ) {
118
+ IndexResponse indexResponse = client ().prepareIndex (indexName ).setId ("id" ).setSource ("field" , "value" ).get ();
119
+ assertEquals (DocWriteResponse .Result .CREATED , indexResponse .getResult ());
120
+ DeleteResponse deleteResponse = client ().prepareDelete (indexName , "id" ).get ();
121
+ assertEquals (DocWriteResponse .Result .DELETED , deleteResponse .getResult ());
122
+ client ().prepareIndex (indexName ).setSource ("auto" , true ).get ();
123
+ }
124
+
125
+ public class AsyncIndexingService {
126
+ private String indexName ;
127
+ private AtomicLong indexedDocs = new AtomicLong (0 );
128
+ private AtomicBoolean finished = new AtomicBoolean ();
129
+ private Thread indexingThread ;
130
+
131
+ AsyncIndexingService (String indexName ) {
132
+ this .indexName = indexName ;
133
+ }
134
+
135
+ public void startIndexing () {
136
+ indexingThread = getIndexingThread ();
137
+ indexingThread .start ();
138
+ }
139
+
140
+ public void stopIndexing () throws InterruptedException {
141
+ finished .set (true );
142
+ indexingThread .join ();
143
+ }
144
+
145
+ public long getIndexedDocs () {
146
+ return indexedDocs .get ();
147
+ }
148
+
149
+ private Thread getIndexingThread () {
150
+ return new Thread (() -> {
151
+ while (finished .get () == false ) {
152
+ indexSingleDoc (indexName );
153
+ long currentDocCount = indexedDocs .incrementAndGet ();
154
+ logger .info ("Completed ingestion of {} docs" , currentDocCount );
155
+
156
+ }
157
+ });
158
+ }
159
+ }
67
160
}
0 commit comments