Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --format flag to ec-admin running for structured output (CSV/JSON) #5332

Open
wants to merge 9 commits into
base: 2.1
Choose a base branch
from
127 changes: 100 additions & 27 deletions server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
*/
package org.apache.accumulo.server.util;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionMap;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonManager.Mode;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
Expand All @@ -41,6 +45,9 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.auto.service.AutoService;
import com.google.common.net.HostAndPort;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand All @@ -62,6 +69,10 @@ static class RunningCommand {
@Parameter(names = {"-d", "--details"},
description = "display details about the running compactions")
boolean details = false;

@Parameter(names = {"-f", "--format"},
description = "output format: plain (default), csv, json")
String format = "plain";
}

@Parameters(commandDescription = "list all compactors in zookeeper")
Expand Down Expand Up @@ -116,7 +127,7 @@ public void execute(final String[] args) {
} else if (cl.getParsedCommand().equals("cancel")) {
cancelCompaction(context, cancelOps.ecid);
} else if (cl.getParsedCommand().equals("running")) {
runningCompactions(context, runningOpts.details);
runningCompactions(context, runningOpts.details, runningOpts.format);
} else {
log.error("Unknown command {}", cl.getParsedCommand());
cl.usage();
Expand Down Expand Up @@ -153,43 +164,105 @@ private void listCompactorsByQueue(ServerContext context) {
}
}

private void runningCompactions(ServerContext context, boolean details) {
private void runningCompactions(ServerContext context, boolean details, String format) {
CompactionCoordinatorService.Client coordinatorClient = null;
TExternalCompactionList running;
TExternalCompactionMap running;
try {
coordinatorClient = getCoordinatorClient(context);
running = coordinatorClient.getRunningCompactions(TraceUtil.traceInfo(), context.rpcCreds());
if (running == null) {
if (running == null || running.getCompactions() == null
|| running.getCompactions().isEmpty()) {
System.out.println("No running compactions found.");
return;
}
var ecidMap = running.getCompactions();
if (ecidMap == null) {
System.out.println("No running compactions found.");
return;

// Use StringBuilder for CSV
StringBuilder csvOutput = new StringBuilder();
List<Map<String,Object>> jsonOutput = new ArrayList<>();

if ("csv".equalsIgnoreCase(format)) {
csvOutput.append(
"ECID,Compactor,Kind,Queue,TableId,Status,LastUpdate,Duration,NumFiles,Progress\n");
}
ecidMap.forEach((ecid, ec) -> {
if (ec != null) {
var runningCompaction = new RunningCompaction(ec);
var addr = runningCompaction.getCompactorAddress();
var kind = runningCompaction.getJob().kind;
var queue = runningCompaction.getQueueName();
var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, queue, ke.tableId());

for (var entry : running.getCompactions().entrySet()) {
String ecid = entry.getKey();
var ec = entry.getValue();
if (ec == null) {
continue;
}

var runningCompaction = new RunningCompaction(ec);
var addr = runningCompaction.getCompactorAddress();
var kind = runningCompaction.getJob().kind;
var group = runningCompaction.getGroupName();
var ke = KeyExtent.fromThrift(runningCompaction.getJob().extent);
String tableId = ke.tableId().canonical();

String status = "";
long lastUpdate = 0, duration = 0;
int numFiles = 0;
double progress = 0.0;

if (details) {
var runningCompactionInfo = new RunningCompactionInfo(ec);
status = runningCompactionInfo.status;
lastUpdate = runningCompactionInfo.lastUpdate;
duration = runningCompactionInfo.duration;
numFiles = runningCompactionInfo.numFiles;
progress = runningCompactionInfo.progress;
}

// Handle plain output
if ("plain".equalsIgnoreCase(format)) {
System.out.format("%s %s %s %s TableId: %s\n", ecid, addr, kind, group, tableId);
if (details) {
var runningCompactionInfo = new RunningCompactionInfo(ec);
var status = runningCompactionInfo.status;
var last = runningCompactionInfo.lastUpdate;
var duration = runningCompactionInfo.duration;
var numFiles = runningCompactionInfo.numFiles;
var progress = runningCompactionInfo.progress;
System.out.format(" %s Last Update: %dms Duration: %dms Files: %d Progress: %.2f%%\n",
status, last, duration, numFiles, progress);
status, lastUpdate, duration, numFiles, progress);
}
}
});

// Handle CSV output
if ("csv".equalsIgnoreCase(format)) {
csvOutput.append(String.format("%s,%s,%s,%s,%s,%s,%d,%d,%d,%.2f\n", ecid, addr, kind,
group, tableId, status, lastUpdate, duration, numFiles, progress));
}

// Handle JSON output
if ("json".equalsIgnoreCase(format)) {
Map<String,Object> jsonEntry = new LinkedHashMap<String,Object>();
jsonEntry.put("ecid", ecid);
jsonEntry.put("compactor", addr);
jsonEntry.put("kind", kind);
jsonEntry.put("queue", group);
jsonEntry.put("tableId", tableId);
if (details) {
jsonEntry.put("status", status);
jsonEntry.put("lastUpdate", lastUpdate);
jsonEntry.put("duration", duration);
jsonEntry.put("numFiles", numFiles);
jsonEntry.put("progress", progress);
}
jsonOutput.add(jsonEntry);
}
}

// Print CSV
if ("csv".equalsIgnoreCase(format)) {
System.out.print(csvOutput.toString());
}

// Print JSON
if ("json".equalsIgnoreCase(format)) {
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(jsonOutput));
} catch (Exception e) {
log.error("Error generating JSON output", e);
}
}
} catch (Exception e) {
throw new RuntimeException("Unable to get running compactions.", e);
throw new IllegalStateException("Unable to get running compactions.", e);
} finally {
ThriftUtil.returnClient(coordinatorClient, context);
}
Expand Down
Loading