Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --format flag to ec-admin running for structured output (CSV/JSON) #5332

Open
wants to merge 9 commits into
base: 2.1
Choose a base branch
from
145 changes: 116 additions & 29 deletions server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
*/
package org.apache.accumulo.server.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
Expand All @@ -41,6 +49,8 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.auto.service.AutoService;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand All @@ -62,6 +72,10 @@ static class RunningCommand {
@Parameter(names = {"-d", "--details"},
description = "display details about the running compactions")
boolean details = false;

@Parameter(names = {"-f", "--format"},
description = "output format: plain (default), csv, json")
String format = "plain";
}

@Parameters(commandDescription = "list all compactors in zookeeper")
Expand Down Expand Up @@ -116,7 +130,7 @@ public void execute(final String[] args) {
} else if (cl.getParsedCommand().equals("cancel")) {
cancelCompaction(context, cancelOps.ecid);
} else if (cl.getParsedCommand().equals("running")) {
runningCompactions(context, runningOpts.details);
runningCompactions(context, runningOpts.details, runningOpts.format);
} else {
log.error("Unknown command {}", cl.getParsedCommand());
cl.usage();
Expand Down Expand Up @@ -153,43 +167,116 @@ private void listCompactorsByQueue(ServerContext context) {
}
}

private void runningCompactions(ServerContext context, boolean details) {
private void runningCompactions(ServerContext context, boolean details, String format) {
CompactionCoordinatorService.Client coordinatorClient = null;
TExternalCompactionList running;
Map<String,TExternalCompaction> runningCompactionsMap = new HashMap<>();

// Default to "plain" format if null or empty
if (format == null || format.trim().isEmpty()) {
format = "plain";
} else {
// Validate format
Set<String> validFormats = Set.of("plain", "csv", "json");
if (!validFormats.contains(format.toLowerCase())) {
throw new IllegalArgumentException(
"Invalid format: " + format + ". Expected: plain, csv, or json.");
}
}

try {
coordinatorClient = getCoordinatorClient(context);
running = coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
if (running == null) {

// Fetch running compactions as a list and convert to a map
TExternalCompactionList running =
coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());

if (running == null || running.getCompactions().isEmpty()) {
System.out.println("No running compactions found.");
return;
}
var ecidMap = running.getCompactions();
if (ecidMap == null) {
System.out.println("No running compactions found.");
return;

for (Map.Entry<String,TExternalCompaction> entry : running.getCompactions().entrySet()) {
runningCompactionsMap.put(entry.getKey(), entry.getValue());
}
ecidMap.forEach((ecid, ec) -> {
if (ec != null) {
var runningCompaction = new RunningCompaction(ec);
var addr = runningCompaction.getCompactorAddress();
var kind = runningCompaction.getJob().kind;
var queue = runningCompaction.getQueueName();
var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queue, ke.tableId());
if (details) {
var runningCompactionInfo = new RunningCompactionInfo(ec);
var status = runningCompactionInfo.status;
var last = runningCompactionInfo.lastUpdate;
var duration = runningCompactionInfo.duration;
var numFiles = runningCompactionInfo.numFiles;
var progress = runningCompactionInfo.progress;
System.out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n",
status, last, duration, numFiles, progress);
}

List<Map<String,Object>> jsonOutput = new ArrayList<>();

if ("csv".equalsIgnoreCase(format)) {
System.out.println(
"ECID,Compactor,Kind,Queue,TableId,Status,LastUpdate,Duration,NumFiles,Progress");
}

for (Map.Entry<String,TExternalCompaction> entry : runningCompactionsMap.entrySet()) {
TExternalCompaction ec = entry.getValue();
if (ec == null) {
continue;
}
});

var runningCompaction = new RunningCompaction(ec);
String ecid = runningCompaction.getJob().getExternalCompactionId();
var addr = runningCompaction.getCompactorAddress();
var kind = runningCompaction.getJob().kind;
var queueName = runningCompaction.getQueueName();
var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
String tableId = ke.tableId().canonical();

String status = "";
long lastUpdate = 0, duration = 0;
int numFiles = 0;
double progress = 0.0;

if (details) {
var runningCompactionInfo = new RunningCompactionInfo(ec);
status = runningCompactionInfo.status;
lastUpdate = runningCompactionInfo.lastUpdate;
duration = runningCompactionInfo.duration;
numFiles = runningCompactionInfo.numFiles;
progress = runningCompactionInfo.progress;
}

switch (format.toLowerCase()) {
case "plain":
System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queueName, tableId);
if (details) {
System.out.format(
" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n", status,
lastUpdate, duration, numFiles, progress);
}
break;
case "csv":
System.out.printf("%s,%s,%s,%s,%s,%s,%d,%d,%d,%.2f\n", ecid, addr, kind, queueName,
tableId, status, lastUpdate, duration, numFiles, progress);
break;
case "json":
Map<String,Object> jsonEntry = new LinkedHashMap<>();
jsonEntry.put("ecid", ecid);
jsonEntry.put("compactor", addr);
jsonEntry.put("kind", kind);
jsonEntry.put("queue", queueName);
jsonEntry.put("tableId", tableId);
if (details) {
jsonEntry.put("status", status);
jsonEntry.put("lastUpdate", lastUpdate);
jsonEntry.put("duration", duration);
jsonEntry.put("numFiles", numFiles);
jsonEntry.put("progress", progress);
}
jsonOutput.add(jsonEntry);
break;
}
}

if ("json".equalsIgnoreCase(format)) {
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(jsonOutput));
} catch (Exception e) {
log.error("Error generating JSON output", e);
}
}

} catch (Exception e) {
throw new RuntimeException("Unable to get running compactions.", e);
throw new IllegalStateException("Unable to get running compactions.", e);
} finally {
ThriftUtil.returnClient(coordinatorClient, context);
}
Expand Down