Skip to content

Commit ffc031d

Browse files
committed
Close scanner in AmpleImple.getExternalCompactionFinalStates()
1 parent 7f8f120 commit ffc031d

File tree

3 files changed

+16
-13
lines changed

3 files changed

+16
-13
lines changed

server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {
320320

321321
scanner.setRange(ExternalCompactionSection.getRange());
322322
int pLen = ExternalCompactionSection.getRowPrefix().length();
323-
return scanner.stream()
323+
return scanner.stream().onClose(scanner::close)
324324
.map(e -> ExternalCompactionFinalState.fromJson(
325325
ExternalCompactionId.of(e.getKey().getRowData().toString().substring(pLen)),
326326
e.getValue().toString()));

server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,8 @@ private void processPending() {
214214
}
215215

216216
private void notifyTservers() {
217-
try {
218-
Iterator<ExternalCompactionFinalState> finalStates =
219-
context.getAmple().getExternalCompactionFinalStates().iterator();
217+
try (var finalStatesStream = context.getAmple().getExternalCompactionFinalStates()) {
218+
Iterator<ExternalCompactionFinalState> finalStates = finalStatesStream.iterator();
220219
while (finalStates.hasNext()) {
221220
ExternalCompactionFinalState state = finalStates.next();
222221
LOG.debug("Found external compaction in final state: {}, queueing for tserver notification",

server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java

+13-9
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.accumulo.core.conf.Property;
3232
import org.apache.accumulo.core.dataImpl.KeyExtent;
3333
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
34+
import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
3435
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
3536
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
3637
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
@@ -106,15 +107,18 @@ private void detectDeadCompactions() {
106107
});
107108

108109
// Determine which compactions are currently committing and remove those
109-
context.getAmple().getExternalCompactionFinalStates()
110-
.map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
111-
if (tabletCompactions.remove(ecid) != null) {
112-
log.trace("Removed compaction {} that is committing", ecid);
113-
}
114-
if (this.deadCompactions.remove(ecid) != null) {
115-
log.trace("Removed {} from the dead compaction map, it's committing", ecid);
116-
}
117-
});
110+
try (
111+
var externalCompactionFinalStates = context.getAmple().getExternalCompactionFinalStates()) {
112+
externalCompactionFinalStates.map(ExternalCompactionFinalState::getExternalCompactionId)
113+
.forEach(ecid -> {
114+
if (tabletCompactions.remove(ecid) != null) {
115+
log.trace("Removed compaction {} that is committing", ecid);
116+
}
117+
if (this.deadCompactions.remove(ecid) != null) {
118+
log.trace("Removed {} from the dead compaction map, it's committing", ecid);
119+
}
120+
});
121+
}
118122

119123
tabletCompactions.forEach((ecid, extent) -> {
120124
var count = this.deadCompactions.merge(ecid, 1L, Long::sum);

0 commit comments

Comments
 (0)