18
18
*/
19
19
package org .apache .accumulo .test .functional ;
20
20
21
+ import static org .apache .accumulo .core .metrics .Metric .LOW_MEMORY ;
21
22
import static org .apache .accumulo .core .metrics .Metric .MINC_PAUSED ;
22
23
import static org .apache .accumulo .test .compaction .ExternalCompactionTestUtils .getActiveCompactions ;
24
+ import static org .apache .accumulo .test .util .Wait .waitFor ;
23
25
import static org .junit .jupiter .api .Assertions .assertEquals ;
24
26
import static org .junit .jupiter .api .Assertions .assertNull ;
25
27
import static org .junit .jupiter .api .Assertions .assertTrue ;
26
28
27
29
import java .util .List ;
28
30
import java .util .Map ;
31
+ import java .util .concurrent .atomic .AtomicInteger ;
29
32
import java .util .concurrent .atomic .AtomicReference ;
30
33
import java .util .concurrent .atomic .DoubleAdder ;
31
34
32
35
import org .apache .accumulo .core .client .Accumulo ;
33
36
import org .apache .accumulo .core .client .AccumuloClient ;
34
37
import org .apache .accumulo .core .client .Scanner ;
35
38
import org .apache .accumulo .core .client .admin .TableOperations ;
39
+ import org .apache .accumulo .core .client .admin .servers .ServerId ;
36
40
import org .apache .accumulo .core .conf .Property ;
41
+ import org .apache .accumulo .core .metrics .MetricsInfo ;
37
42
import org .apache .accumulo .harness .MiniClusterConfigurationCallback ;
38
43
import org .apache .accumulo .harness .SharedMiniClusterBase ;
39
44
import org .apache .accumulo .minicluster .MemoryUnit ;
@@ -72,6 +77,7 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS
72
77
}
73
78
}
74
79
80
+ private static final AtomicInteger LOW_MEM_DETECTED = new AtomicInteger (0 );
75
81
private static final DoubleAdder MINC_PAUSED_COUNT = new DoubleAdder ();
76
82
private static TestStatsDSink sink ;
77
83
private static Thread metricConsumer ;
@@ -91,6 +97,12 @@ public static void start() throws Exception {
91
97
if (MINC_PAUSED .getName ().equals (metric .getName ())) {
92
98
double val = Double .parseDouble (metric .getValue ());
93
99
MINC_PAUSED_COUNT .add (val );
100
+ } else if (metric .getName ().equals (LOW_MEMORY .getName ())) {
101
+ String process = metric .getTags ().get (MetricsInfo .PROCESS_NAME_TAG_KEY );
102
+ if (process != null && process .contains (ServerId .Type .TABLET_SERVER .name ())) {
103
+ int val = Integer .parseInt (metric .getValue ());
104
+ LOW_MEM_DETECTED .set (val );
105
+ }
94
106
}
95
107
}
96
108
}
@@ -140,7 +152,9 @@ public void testMinCPauses() throws Exception {
140
152
141
153
try (Scanner scanner = client .createScanner (table )) {
142
154
155
+ waitFor (() -> 0 == LOW_MEM_DETECTED .get ());
143
156
MemoryStarvedScanIT .consumeServerMemory (scanner );
157
+ waitFor (() -> 1 == LOW_MEM_DETECTED .get ());
144
158
145
159
int paused = MINC_PAUSED_COUNT .intValue ();
146
160
assertEquals (0 , paused );
@@ -151,13 +165,14 @@ public void testMinCPauses() throws Exception {
151
165
Thread .sleep (1000 );
152
166
paused = MINC_PAUSED_COUNT .intValue ();
153
167
}
168
+ assertTrue (getActiveCompactions (client .instanceOperations ()).stream ()
169
+ .anyMatch (ac -> ac .getPausedCount () > 0 ));
154
170
155
171
MemoryStarvedScanIT .freeServerMemory (client );
156
172
ingestThread .interrupt ();
157
173
ingestThread .join ();
158
174
assertNull (error .get ());
159
- assertTrue (getActiveCompactions (client .instanceOperations ()).stream ()
160
- .anyMatch (ac -> ac .getPausedCount () > 0 ));
175
+ waitFor (() -> 0 == LOW_MEM_DETECTED .get ());
161
176
}
162
177
}
163
178
}
0 commit comments