@@ -52,6 +52,7 @@ public class MaxWalReferencedIT extends ConfigurableMacBase {
52
52
private static final Logger log = LoggerFactory .getLogger (MaxWalReferencedIT .class );
53
53
54
54
final int WAL_MAX_REFERENCED = 3 ;
55
+ final int hdfsMinBlockSize = 1048576 ;
55
56
56
57
@ Override
57
58
protected Duration defaultTimeout () {
@@ -60,10 +61,8 @@ protected Duration defaultTimeout() {
60
61
61
62
@ Override
62
63
protected void configure (MiniAccumuloConfigImpl cfg , Configuration hadoopCoreSite ) {
63
- final String hdfsMinBlockSize = "1048576" ;
64
-
65
64
// Set a small WAL size so we roll frequently
66
- cfg .setProperty (Property .TSERV_WAL_MAX_SIZE , hdfsMinBlockSize );
65
+ cfg .setProperty (Property .TSERV_WAL_MAX_SIZE , Integer . toString ( hdfsMinBlockSize ) );
67
66
// Set the max number of WALs that can be referenced
68
67
cfg .setProperty (Property .TSERV_WAL_MAX_REFERENCED , Integer .toString (WAL_MAX_REFERENCED ));
69
68
cfg .setNumTservers (1 );
@@ -89,16 +88,13 @@ public void testWALMaxReferenced() throws Exception {
89
88
final int rowsPerIteration = 30000 ;
90
89
AtomicInteger iteration = new AtomicInteger (0 );
91
90
Wait .waitFor (() -> {
92
- int startRow = iteration .get () * rowsPerIteration ;
93
- int endRow = (iteration .get () + 1 ) * rowsPerIteration ;
94
91
95
92
// Write data that should fill or partially fill the WAL
96
- writeData (client , tableName , startRow , endRow );
93
+ writeData (client , tableName );
97
94
98
95
// Check the current number of WALs in use
99
96
long walCount = getWalCount (getServerContext ());
100
- log .info ("After iteration {}, wrote rows [{}..{}), WAL count is {}" , iteration , startRow ,
101
- endRow , walCount );
97
+ log .info ("After iteration {}, WAL count is {}" , iteration , walCount );
102
98
iteration .getAndIncrement ();
103
99
104
100
if (walCount > WAL_MAX_REFERENCED ) {
@@ -108,21 +104,25 @@ public void testWALMaxReferenced() throws Exception {
108
104
} else {
109
105
return false ;
110
106
}
111
- }, 60000 , 250 , "Expected to see WAL count exceed " + WAL_MAX_REFERENCED );
107
+ }, 60000 , 10 , "Expected to see WAL count exceed " + WAL_MAX_REFERENCED );
112
108
113
109
// wait for minor compactions to reduce the WAL count
114
110
Wait .waitFor (() -> getWalCount (getServerContext ()) <= WAL_MAX_REFERENCED , 30000 , 1000 ,
115
111
"WAL count never dropped within 30 seconds" );
116
112
}
117
113
}
118
114
119
- private void writeData (AccumuloClient client , String table , int startRow , int endRow )
120
- throws Exception {
115
+ /**
116
+ * Writes data to a single tablet until the total written data size exceeds 2 * TSERV_WAL_MAX_SIZE
117
+ */
118
+ private void writeData (AccumuloClient client , String table ) throws Exception {
121
119
try (BatchWriter bw = client .createBatchWriter (table , new BatchWriterConfig ())) {
122
- for (int r = startRow ; r < endRow ; r ++) {
123
- Mutation m = new Mutation (String .format ("row_%07d" , r ));
124
- m .put ("cf" , "cq" , String .format ("val_%d" , r ));
120
+ long totalWritten = 0 ;
121
+ while (totalWritten < 2 * hdfsMinBlockSize ) {
122
+ Mutation m = new Mutation ("target_row" );
123
+ m .put ("cf" , "cq" , "value" );
125
124
bw .addMutation (m );
125
+ totalWritten += m .estimatedMemoryUsed ();
126
126
}
127
127
}
128
128
}
0 commit comments