Skip to content

Commit a089f86

Browse files
Replace iterator with stream to close resources in GarbageCollectWriteAheadLogs (apache#4207)
* make sure collect method can only be called once per object * make sure the underlying resources are closed after they are needed --------- Co-authored-by: Keith Turner <kturner@apache.org>
1 parent 252b1e2 commit a089f86

File tree

2 files changed

+126
-77
lines changed

2 files changed

+126
-77
lines changed

server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java

+52-45
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.Map.Entry;
3535
import java.util.Set;
3636
import java.util.UUID;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.stream.Stream;
3739

3840
import org.apache.accumulo.core.gc.thrift.GCStatus;
3941
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
@@ -42,6 +44,7 @@
4244
import org.apache.accumulo.core.metadata.TabletState;
4345
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
4446
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
47+
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
4548
import org.apache.accumulo.core.tabletserver.log.LogEntry;
4649
import org.apache.accumulo.core.trace.TraceUtil;
4750
import org.apache.accumulo.core.util.Pair;
@@ -57,7 +60,8 @@
5760
import org.slf4j.LoggerFactory;
5861

5962
import com.google.common.annotations.VisibleForTesting;
60-
import com.google.common.collect.Iterators;
63+
import com.google.common.base.Preconditions;
64+
import com.google.common.collect.Streams;
6165

6266
import io.opentelemetry.api.trace.Span;
6367
import io.opentelemetry.context.Scope;
@@ -69,7 +73,7 @@ public class GarbageCollectWriteAheadLogs {
6973
private final VolumeManager fs;
7074
private final LiveTServerSet liveServers;
7175
private final WalStateManager walMarker;
72-
private final Iterable<TabletMetadata> store;
76+
private final AtomicBoolean hasCollected;
7377

7478
/**
7579
* Creates a new GC WAL object.
@@ -82,34 +86,32 @@ public class GarbageCollectWriteAheadLogs {
8286
this.context = context;
8387
this.fs = fs;
8488
this.liveServers = liveServers;
85-
this.walMarker = new WalStateManager(context);
86-
this.store = () -> Iterators.concat(
87-
context.getAmple().readTablets().forLevel(DataLevel.ROOT).filter(new HasWalsFilter())
88-
.fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator(),
89-
context.getAmple().readTablets().forLevel(DataLevel.METADATA).filter(new HasWalsFilter())
90-
.fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator(),
91-
context.getAmple().readTablets().forLevel(DataLevel.USER).filter(new HasWalsFilter())
92-
.fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build().iterator());
89+
this.walMarker = createWalStateManager(context);
90+
this.hasCollected = new AtomicBoolean(false);
9391
}
9492

95-
/**
96-
* Creates a new GC WAL object. Meant for testing -- allows mocked objects.
97-
*
98-
* @param context the collection server's context
99-
* @param fs volume manager to use
100-
* @param liveTServerSet a started LiveTServerSet instance
101-
*/
10293
@VisibleForTesting
103-
GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs,
104-
LiveTServerSet liveTServerSet, WalStateManager walMarker, Iterable<TabletMetadata> store) {
105-
this.context = context;
106-
this.fs = fs;
107-
this.liveServers = liveTServerSet;
108-
this.walMarker = walMarker;
109-
this.store = store;
94+
WalStateManager createWalStateManager(ServerContext context) {
95+
return new WalStateManager(context);
96+
}
97+
98+
@VisibleForTesting
99+
Stream<TabletMetadata> createStore() {
100+
TabletsMetadata root = context.getAmple().readTablets().forLevel(DataLevel.ROOT)
101+
.filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
102+
TabletsMetadata metadata = context.getAmple().readTablets().forLevel(DataLevel.METADATA)
103+
.filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
104+
TabletsMetadata user = context.getAmple().readTablets().forLevel(DataLevel.USER)
105+
.filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
106+
return Streams.concat(root.stream(), metadata.stream(), user.stream()).onClose(() -> {
107+
root.close();
108+
metadata.close();
109+
user.close();
110+
});
110111
}
111112

112113
public void collect(GCStatus status) {
114+
Preconditions.checkState(!hasCollected.get(), "Can only call collect once per object");
113115
try {
114116
long count;
115117
long fileScanStop;
@@ -197,7 +199,7 @@ public void collect(GCStatus status) {
197199
} finally {
198200
span5.end();
199201
}
200-
202+
hasCollected.set(true);
201203
} catch (Exception e) {
202204
log.error("exception occurred while garbage collecting write ahead logs", e);
203205
} finally {
@@ -279,28 +281,32 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
279281
}
280282

281283
// remove any entries if there's a log reference (recovery hasn't finished)
282-
for (TabletMetadata tabletMetadata : store) {
283-
// Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
284-
// Easiest to just ignore all the WALs for the dead server.
285-
if (TabletState.compute(tabletMetadata, liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
286-
Set<UUID> idsToIgnore = candidates.remove(tabletMetadata.getLocation().getServerInstance());
287-
if (idsToIgnore != null) {
288-
result.keySet().removeAll(idsToIgnore);
289-
recoveryLogs.keySet().removeAll(idsToIgnore);
284+
try (Stream<TabletMetadata> store = createStore()) {
285+
store.forEach(tabletMetadata -> {
286+
// Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
287+
// Easiest to just ignore all the WALs for the dead server.
288+
if (TabletState.compute(tabletMetadata, liveServers)
289+
== TabletState.ASSIGNED_TO_DEAD_SERVER) {
290+
Set<UUID> idsToIgnore =
291+
candidates.remove(tabletMetadata.getLocation().getServerInstance());
292+
if (idsToIgnore != null) {
293+
result.keySet().removeAll(idsToIgnore);
294+
recoveryLogs.keySet().removeAll(idsToIgnore);
295+
}
290296
}
291-
}
292-
// Tablet is being recovered and has WAL references, remove all the WALs for the dead server
293-
// that made the WALs.
294-
for (LogEntry wal : tabletMetadata.getLogs()) {
295-
UUID walUUID = wal.getUniqueID();
296-
TServerInstance dead = result.get(walUUID);
297-
// There's a reference to a log file, so skip that server's logs
298-
Set<UUID> idsToIgnore = candidates.remove(dead);
299-
if (idsToIgnore != null) {
300-
result.keySet().removeAll(idsToIgnore);
301-
recoveryLogs.keySet().removeAll(idsToIgnore);
297+
// Tablet is being recovered and has WAL references, remove all the WALs for the dead server
298+
// that made the WALs.
299+
for (LogEntry wal : tabletMetadata.getLogs()) {
300+
UUID walUUID = wal.getUniqueID();
301+
TServerInstance dead = result.get(walUUID);
302+
// There's a reference to a log file, so skip that server's logs
303+
Set<UUID> idsToIgnore = candidates.remove(dead);
304+
if (idsToIgnore != null) {
305+
result.keySet().removeAll(idsToIgnore);
306+
recoveryLogs.keySet().removeAll(idsToIgnore);
307+
}
302308
}
303-
}
309+
});
304310
}
305311

306312
// Remove OPEN and CLOSED logs for live servers: they are still in use
@@ -369,4 +375,5 @@ protected Map<UUID,Path> getSortedWALogs() throws IOException {
369375
}
370376
return result;
371377
}
378+
372379
}

server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java

+74-32
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
2222
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
2323
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
2425

2526
import java.util.Collections;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.UUID;
30+
import java.util.stream.Stream;
2931

3032
import org.apache.accumulo.core.client.admin.TabletAvailability;
3133
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -74,10 +76,8 @@ public class GarbageCollectWriteAheadLogsTest {
7476
}
7577
}
7678

77-
private final Iterable<TabletMetadata> tabletOnServer1List =
78-
Collections.singletonList(tabletAssignedToServer1);
79-
private final Iterable<TabletMetadata> tabletOnServer2List =
80-
Collections.singletonList(tabletAssignedToServer2);
79+
private final Stream<TabletMetadata> tabletOnServer1List = Stream.of(tabletAssignedToServer1);
80+
private final Stream<TabletMetadata> tabletOnServer2List = Stream.of(tabletAssignedToServer2);
8181

8282
@Test
8383
public void testRemoveUnusedLog() throws Exception {
@@ -99,14 +99,26 @@ public void testRemoveUnusedLog() throws Exception {
9999
marker.removeWalMarker(server1, id);
100100
EasyMock.expectLastCall().once();
101101
EasyMock.replay(context, fs, marker, tserverSet);
102-
GarbageCollectWriteAheadLogs gc =
103-
new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) {
104-
@Override
105-
protected Map<UUID,Path> getSortedWALogs() {
106-
return Collections.emptyMap();
107-
}
108-
};
102+
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
103+
@Override
104+
protected Map<UUID,Path> getSortedWALogs() {
105+
return Collections.emptyMap();
106+
}
107+
108+
@Override
109+
WalStateManager createWalStateManager(ServerContext serverContext) {
110+
return marker;
111+
}
112+
113+
@Override
114+
Stream<TabletMetadata> createStore() {
115+
return tabletOnServer1List;
116+
}
117+
};
109118
gc.collect(status);
119+
assertThrows(IllegalStateException.class, () -> gc.collect(status),
120+
"Should only be able to call collect once");
121+
110122
EasyMock.verify(context, fs, marker, tserverSet);
111123
}
112124

@@ -126,14 +138,24 @@ public void testKeepClosedLog() throws Exception {
126138
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
127139
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path));
128140
EasyMock.replay(context, marker, tserverSet, fs);
129-
GarbageCollectWriteAheadLogs gc =
130-
new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) {
131-
@Override
132-
protected Map<UUID,Path> getSortedWALogs() {
133-
return Collections.emptyMap();
134-
}
135-
};
141+
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
142+
@Override
143+
protected Map<UUID,Path> getSortedWALogs() {
144+
return Collections.emptyMap();
145+
}
146+
147+
@Override
148+
WalStateManager createWalStateManager(ServerContext serverContext) {
149+
return marker;
150+
}
151+
152+
@Override
153+
Stream<TabletMetadata> createStore() {
154+
return tabletOnServer1List;
155+
}
156+
};
136157
gc.collect(status);
158+
137159
EasyMock.verify(context, marker, tserverSet, fs);
138160
}
139161

@@ -160,14 +182,24 @@ public void deleteUnreferenceLogOnDeadServer() throws Exception {
160182
marker.forget(server2);
161183
EasyMock.expectLastCall().once();
162184
EasyMock.replay(context, fs, marker, tserverSet);
163-
GarbageCollectWriteAheadLogs gc =
164-
new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer1List) {
165-
@Override
166-
protected Map<UUID,Path> getSortedWALogs() {
167-
return Collections.emptyMap();
168-
}
169-
};
185+
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
186+
@Override
187+
protected Map<UUID,Path> getSortedWALogs() {
188+
return Collections.emptyMap();
189+
}
190+
191+
@Override
192+
WalStateManager createWalStateManager(ServerContext serverContext) {
193+
return marker;
194+
}
195+
196+
@Override
197+
Stream<TabletMetadata> createStore() {
198+
return tabletOnServer1List;
199+
}
200+
};
170201
gc.collect(status);
202+
171203
EasyMock.verify(context, fs, marker, tserverSet);
172204
}
173205

@@ -188,14 +220,24 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
188220
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
189221

190222
EasyMock.replay(context, fs, marker, tserverSet);
191-
GarbageCollectWriteAheadLogs gc =
192-
new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker, tabletOnServer2List) {
193-
@Override
194-
protected Map<UUID,Path> getSortedWALogs() {
195-
return Collections.emptyMap();
196-
}
197-
};
223+
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet) {
224+
@Override
225+
protected Map<UUID,Path> getSortedWALogs() {
226+
return Collections.emptyMap();
227+
}
228+
229+
@Override
230+
WalStateManager createWalStateManager(ServerContext serverContext) {
231+
return marker;
232+
}
233+
234+
@Override
235+
Stream<TabletMetadata> createStore() {
236+
return tabletOnServer2List;
237+
}
238+
};
198239
gc.collect(status);
240+
199241
EasyMock.verify(context, fs, marker, tserverSet);
200242
}
201243

0 commit comments

Comments
 (0)