18
18
*/
19
19
package org .apache .accumulo .test .functional ;
20
20
21
+ import static org .apache .accumulo .test .TestIngest .generateRow ;
21
22
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .countTablets ;
23
+ import static org .junit .Assert .assertFalse ;
24
+ import static org .junit .jupiter .api .Assertions .assertEquals ;
22
25
import static org .junit .jupiter .api .Assertions .assertThrows ;
23
26
import static org .junit .jupiter .api .Assertions .assertTrue ;
24
27
34
37
35
38
import org .apache .accumulo .core .client .Accumulo ;
36
39
import org .apache .accumulo .core .client .AccumuloClient ;
40
+ import org .apache .accumulo .core .client .BatchWriter ;
37
41
import org .apache .accumulo .core .client .admin .CompactionConfig ;
38
42
import org .apache .accumulo .core .client .admin .NewTableConfiguration ;
39
43
import org .apache .accumulo .core .client .admin .TabletAvailability ;
40
44
import org .apache .accumulo .core .client .admin .TabletMergeability ;
41
45
import org .apache .accumulo .core .clientImpl .TabletMergeabilityUtil ;
42
46
import org .apache .accumulo .core .conf .Property ;
47
+ import org .apache .accumulo .core .data .Mutation ;
43
48
import org .apache .accumulo .core .data .Range ;
44
49
import org .apache .accumulo .core .data .TableId ;
50
+ import org .apache .accumulo .core .data .Value ;
45
51
import org .apache .accumulo .core .dataImpl .KeyExtent ;
46
52
import org .apache .accumulo .core .metadata .schema .TabletMetadata ;
47
53
import org .apache .accumulo .core .security .Authorizations ;
53
59
import org .apache .accumulo .test .VerifyIngest ;
54
60
import org .apache .accumulo .test .VerifyIngest .VerifyParams ;
55
61
import org .apache .accumulo .test .util .Wait ;
62
+ import org .apache .commons .lang3 .StringUtils ;
56
63
import org .apache .hadoop .conf .Configuration ;
57
64
import org .apache .hadoop .io .Text ;
58
65
import org .junit .jupiter .api .AfterAll ;
@@ -150,6 +157,70 @@ public void testMergeabilityMultipleRanges() throws Exception {
150
157
}
151
158
}
152
159
160
+ @ Test
161
+ public void testMergeabilityThresholdMultipleRanges () throws Exception {
162
+ String tableName = getUniqueNames (1 )[0 ];
163
+ try (AccumuloClient c = Accumulo .newClient ().from (getClientProps ()).build ()) {
164
+ Map <String ,String > props = new HashMap <>();
165
+ props .put (Property .TABLE_SPLIT_THRESHOLD .getKey (), "32K" );
166
+ props .put (Property .TABLE_MAX_MERGEABILITY_THRESHOLD .getKey (), ".5" );
167
+ c .tableOperations ().create (tableName , new NewTableConfiguration ()
168
+ .withInitialTabletAvailability (TabletAvailability .HOSTED ).setProperties (props ));
169
+ var tableId = TableId .of (c .tableOperations ().tableIdMap ().get (tableName ));
170
+
171
+ SortedMap <Text ,TabletMergeability > splits = new TreeMap <>();
172
+ // Create new tablets that won't merge automatically
173
+ for (int i = 10000 ; i <= 90000 ; i += 10000 ) {
174
+ splits .put (row (i ), TabletMergeability .never ());
175
+ }
176
+
177
+ c .tableOperations ().putSplits (tableName , splits );
178
+ // Verify we now have 10 tablets
179
+ // [row_0000010000, row_0000020000, row_0000030000, row_0000040000, row_0000050000,
180
+ // row_0000060000, row_0000070000, row_0000080000, row_0000090000, default]
181
+ Wait .waitFor (() -> countTablets (getCluster ().getServerContext (), tableName , tm -> true ) == 10 ,
182
+ 5000 , 500 );
183
+
184
+ // Insert rows into each tablet with different numbers of rows
185
+ // Tablets with end rows row_0000020000 - row_0000040000, row_0000060000 - row_0000080000,
186
+ // default will have 1000 rows
187
+ // Tablets with end rows row_0000010000, row_0000050000, row_0000090000 will have 5000 rows
188
+ try (BatchWriter bw = c .createBatchWriter (tableName )) {
189
+ final var value = StringUtils .repeat ("a" , 1024 );
190
+ for (int i = 0 ; i < 100000 ; i += 10000 ) {
191
+ var rows = 1000 ;
192
+ if (i % 40000 == 0 ) {
193
+ rows = 5000 ;
194
+ }
195
+ for (int j = 0 ; j < rows ; j ++) {
196
+ Mutation m = new Mutation (row (i + j ));
197
+ m .put (new Text ("cf1" ), new Text ("cq1" ), new Value (value ));
198
+ bw .addMutation (m );
199
+ }
200
+ }
201
+ }
202
+ c .tableOperations ().flush (tableName , null , null , true );
203
+
204
+ // Set all 10 tablets to be auto-mergeable
205
+ for (int i = 10000 ; i <= 90000 ; i += 10000 ) {
206
+ splits .put (row (i ), TabletMergeability .always ());
207
+ }
208
+ c .tableOperations ().putSplits (tableName , splits );
209
+
210
+ // With the mergeability threshold set to 50% of 32KB we should be able to merge together
211
+ // the tablets with 1000 rows, but not 5000 rows. This should produce the following
212
+ // 6 tablets after merger.
213
+ Wait .waitFor (() -> hasExactTablets (getCluster ().getServerContext (), tableId ,
214
+ Set .of (new KeyExtent (tableId , row (10000 ), null ),
215
+ new KeyExtent (tableId , row (40000 ), row (10000 )),
216
+ new KeyExtent (tableId , row (50000 ), row (40000 )),
217
+ new KeyExtent (tableId , row (80000 ), row (50000 )),
218
+ new KeyExtent (tableId , row (90000 ), row (80000 )),
219
+ new KeyExtent (tableId , null , row (90000 )))),
220
+ 10000 , 200 );
221
+ }
222
+ }
223
+
153
224
@ Test
154
225
public void testSplitAndMergeAll () throws Exception {
155
226
String tableName = getUniqueNames (1 )[0 ];
@@ -215,6 +286,9 @@ public void testMergeabilityThreshold() throws Exception {
215
286
assertThrows (IllegalStateException .class ,
216
287
() -> Wait .waitFor (() -> hasExactTablets (getCluster ().getServerContext (), tableId ,
217
288
Set .of (new KeyExtent (tableId , null , null ))), 5000 , 500 ));
289
+ // Make sure we failed because of exact tablets and not a different IllegalStateException
290
+ assertFalse (hasExactTablets (getCluster ().getServerContext (), tableId ,
291
+ Set .of (new KeyExtent (tableId , null , null ))));
218
292
219
293
// With a 10% threshold we should be able to merge
220
294
c .tableOperations ().setProperty (tableName , Property .TABLE_MAX_MERGEABILITY_THRESHOLD .getKey (),
@@ -255,6 +329,60 @@ public void testMergeAfter() throws Exception {
255
329
}
256
330
}
257
331
332
+ @ Test
333
+ public void testMergeabilityMaxFiles () throws Exception {
334
+ String tableName = getUniqueNames (1 )[0 ];
335
+ try (AccumuloClient c = Accumulo .newClient ().from (getClientProps ()).build ()) {
336
+ Map <String ,String > props = new HashMap <>();
337
+ // disable compactions and set a low merge file max
338
+ props .put (Property .TABLE_MAJC_RATIO .getKey (), "9999" );
339
+ props .put (Property .TABLE_MERGE_FILE_MAX .getKey (), "3" );
340
+ c .tableOperations ().create (tableName , new NewTableConfiguration ().setProperties (props )
341
+ .withInitialTabletAvailability (TabletAvailability .HOSTED ));
342
+ var tableId = TableId .of (c .tableOperations ().tableIdMap ().get (tableName ));
343
+
344
+ // Create new tablets that won't merge automatically
345
+ SortedMap <Text ,TabletMergeability > splits = new TreeMap <>();
346
+ for (int i = 500 ; i < 5000 ; i += 500 ) {
347
+ splits .put (row (i ), TabletMergeability .never ());
348
+ }
349
+ c .tableOperations ().putSplits (tableName , splits );
350
+
351
+ // Verify we now have 10 tablets
352
+ Wait .waitFor (() -> countTablets (getCluster ().getServerContext (), tableName , tm -> true ) == 10 ,
353
+ 5000 , 500 );
354
+
355
+ // Ingest data so tablet will split, each tablet will have several files because
356
+ // of the flush setting
357
+ VerifyParams params = new VerifyParams (getClientProps (), tableName , 5_000 );
358
+ params .startRow = 0 ;
359
+ params .flushAfterRows = 100 ;
360
+ TestIngest .ingest (c , params );
361
+ VerifyIngest .verifyIngest (c , params );
362
+
363
+ // Mark all tablets as mergeable
364
+ for (int i = 500 ; i < 5000 ; i += 500 ) {
365
+ splits .put (row (i ), TabletMergeability .always ());
366
+ }
367
+ c .tableOperations ().putSplits (tableName , splits );
368
+
369
+ // Should not merge as we set max file count to only 3 and there are more files than that
370
+ // per tablet, so make sure it throws IllegalStateException
371
+ assertThrows (IllegalStateException .class ,
372
+ () -> Wait .waitFor (() -> hasExactTablets (getCluster ().getServerContext (), tableId ,
373
+ Set .of (new KeyExtent (tableId , null , null ))), 5000 , 500 ));
374
+ // Make sure tablets is still 10, not merged
375
+ assertEquals (10 , countTablets (getCluster ().getServerContext (), tableName , tm -> true ));
376
+
377
+ // Set max merge file count back to default of 10k
378
+ c .tableOperations ().setProperty (tableName , Property .TABLE_MERGE_FILE_MAX .getKey (), "10000" );
379
+
380
+ // Should merge back to 1 tablet
381
+ Wait .waitFor (() -> hasExactTablets (getCluster ().getServerContext (), tableId ,
382
+ Set .of (new KeyExtent (tableId , null , null ))), 10000 , 200 );
383
+ }
384
+ }
385
+
258
386
private static boolean hasExactTablets (ServerContext ctx , TableId tableId ,
259
387
Set <KeyExtent > expected ) {
260
388
try (var tabletsMetadata = ctx .getAmple ().readTablets ().forTable (tableId ).build ()) {
@@ -271,4 +399,8 @@ private static boolean hasExactTablets(ServerContext ctx, TableId tableId,
271
399
return expectedTablets .isEmpty ();
272
400
}
273
401
}
402
+
403
+ private static Text row (int row ) {
404
+ return generateRow (row , 0 );
405
+ }
274
406
}
0 commit comments