Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream logs from TaskRuns and PipelineRuns #36

Merged
merged 5 commits into from
Sep 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ install-tekton:
kubectl apply --filename https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml

coverage:
mvn cobertura:cobertura
mvn cobertura:cobertura -q

build:
mvn clean install -DskipTests
mvn clean install -DskipTests -q

e2e:
kubectl create -f $(JENKINS_DEPLOYMENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public static GlobalPluginConfiguration get() {

private synchronized void configChange() {
logger.info("Tekton Client Plugin processing a newly supplied configuration");
TektonUtils.shutdownTektonClient();
TektonUtils.shutdownKubeClients();
try {
TektonUtils.initializeTektonClient(this.server);
TektonUtils.initializeKubeClients(this.server);
} catch (KubernetesClientException e){
Throwable exceptionOrCause = (e.getCause() != null) ? e.getCause() : e;
logger.log(SEVERE, "Failed to configure Tekton Client Plugin: " + exceptionOrCause);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.waveywaves.jenkins.plugins.tekton.client;

import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.tekton.client.DefaultTektonClient;
import io.fabric8.tekton.client.TektonClient;

Expand All @@ -18,6 +20,7 @@ public class TektonUtils {
private static final Logger logger = Logger.getLogger(TektonUtils.class.getName());

private static TektonClient tektonClient;
private static KubernetesClient kubernetesClient;

public enum TektonResourceType {
task,
Expand All @@ -28,20 +31,25 @@ public enum TektonResourceType {
}


public synchronized static void initializeTektonClient(String serverUrl) {
public synchronized static void initializeKubeClients(String serverUrl) {
if (serverUrl != null && !serverUrl.isEmpty()) {
logger.info("ServerUrl has been passed to Tekton Client ");
}
tektonClient = new DefaultTektonClient();
kubernetesClient = new DefaultKubernetesClient();
String namespace = tektonClient.getNamespace();
logger.info("Running in namespace "+namespace);
}

public synchronized static void shutdownTektonClient() {
public synchronized static void shutdownKubeClients() {
if (tektonClient != null) {
tektonClient.close();
tektonClient = null;
}
if (kubernetesClient != null) {
kubernetesClient.close();
kubernetesClient = null;
}
}

public static List<TektonResourceType> getKindFromInputStream(InputStream inputStream, String inputType) {
Expand Down Expand Up @@ -104,4 +112,8 @@ public static InputStream urlToByteArrayStream(URL url) {
public synchronized static TektonClient getTektonClient(){
return tektonClient;
}

public synchronized static KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

public abstract class BaseStep extends Builder implements SimpleBuildStep {
protected transient Client tektonClient;
protected transient Client kubernetesClient;

protected MixedOperation<TaskRun, TaskRunList, DoneableTaskRun, Resource<TaskRun, DoneableTaskRun>>
taskRunClient;
Expand All @@ -32,6 +33,10 @@ public enum InputType {
Interactive
}

public void setKubernetesClient(Client kc) {
this.kubernetesClient = kc;
}

public void setTektonClient(Client tc) {
this.tektonClient = tc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
import hudson.tasks.Builder;
import hudson.util.FormValidation;
import hudson.util.ListBoxModel;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.knative.internal.pkg.apis.Condition;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.tekton.client.TektonClient;
import io.fabric8.tekton.pipeline.v1beta1.*;
import io.fabric8.tekton.resource.v1alpha1.DoneablePipelineResource;
import io.fabric8.tekton.resource.v1alpha1.PipelineResource;
import io.fabric8.tekton.resource.v1alpha1.PipelineResourceList;
import org.kohsuke.stapler.DataBoundConstructor;
import org.kohsuke.stapler.QueryParameter;
import org.waveywaves.jenkins.plugins.tekton.client.TektonUtils;
Expand All @@ -25,26 +23,29 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.io.PrintStream;
import java.net.URL;
import java.nio.channels.Pipe;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

import org.waveywaves.jenkins.plugins.tekton.client.logwatch.PipelineRunLogWatch;
import org.waveywaves.jenkins.plugins.tekton.client.logwatch.TaskRunLogWatch;

public class CreateStep extends BaseStep {
private static final Logger logger = Logger.getLogger(CreateStep.class.getName());

private String input;
private String inputType;
private final String input;
private final String inputType;
private PrintStream consoleLogger;

@DataBoundConstructor
public CreateStep(String input, String inputType) {
super();
this.inputType = inputType;
this.input = input;

setKubernetesClient(TektonUtils.getKubernetesClient());
setTektonClient(TektonUtils.getTektonClient());
}

Expand Down Expand Up @@ -81,6 +82,8 @@ public String createTaskRun(InputStream inputStream) {
TaskRun taskrun = taskRunClient.load(inputStream).get();
taskrun = taskRunClient.create(taskrun);
resourceName = taskrun.getMetadata().getName();

streamTaskRunLogsToConsole(taskrun);
return resourceName;
}

Expand Down Expand Up @@ -117,6 +120,8 @@ public String createPipelineRun(InputStream inputStream) {
PipelineRun pipelineRun = pipelineRunClient.load(inputStream).get();
pipelineRun = pipelineRunClient.create(pipelineRun);
resourceName = pipelineRun.getMetadata().getName();

streamPipelineRunLogsToConsole(pipelineRun);
return resourceName;
}

Expand All @@ -132,8 +137,38 @@ public String createPipelineResource(InputStream inputStream) {
return resourceName;
}

public void streamTaskRunLogsToConsole(TaskRun taskRun) {
synchronized (consoleLogger) {
KubernetesClient kc = (KubernetesClient) kubernetesClient;
Thread logWatchTask = null;
try {
TaskRunLogWatch logWatch = new TaskRunLogWatch(kc, taskRun, consoleLogger);
logWatchTask = new Thread(logWatch);
logWatchTask.start();
logWatchTask.join();
} catch (Exception e) {
logger.warning("Exception occurred "+e.toString());
}
}
}

public void streamPipelineRunLogsToConsole(PipelineRun pipelineRun) {
KubernetesClient kc = (KubernetesClient) kubernetesClient;
TektonClient tc = (TektonClient) tektonClient;
Thread logWatchTask;
try {
PipelineRunLogWatch logWatch = new PipelineRunLogWatch(kc, tc, pipelineRun, consoleLogger);
logWatchTask = new Thread(logWatch);
logWatchTask.start();
logWatchTask.join();
} catch (Exception e) {
logger.warning("Exception occurred "+e.toString());
}
}

@Override
public void perform(@Nonnull Run<?, ?> run, @Nonnull FilePath workspace, @Nonnull Launcher launcher, @Nonnull TaskListener listener) throws InterruptedException, IOException {
consoleLogger = listener.getLogger();
runCreate();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.waveywaves.jenkins.plugins.tekton.client.build.create.mock;

import io.fabric8.tekton.pipeline.v1beta1.PipelineRun;
import io.fabric8.tekton.pipeline.v1beta1.TaskRun;
import org.waveywaves.jenkins.plugins.tekton.client.TektonUtils;
import org.waveywaves.jenkins.plugins.tekton.client.build.create.CreateStep;

Expand Down Expand Up @@ -34,4 +36,14 @@ public String createPipelineRun(InputStream inputStream) {
public String createPipelineResource(InputStream inputStream) {
return TektonUtils.TektonResourceType.pipelineresource.toString();
}

@Override
public void streamTaskRunLogsToConsole(TaskRun taskRun) {
return;
}

@Override
public void streamPipelineRunLogsToConsole(PipelineRun pipelineRun) {
return;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.waveywaves.jenkins.plugins.tekton.client.logwatch;

import io.fabric8.knative.internal.pkg.apis.Condition;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.tekton.client.TektonClient;
import io.fabric8.tekton.pipeline.v1beta1.*;
import net.sf.ezmorph.array.BooleanObjectArrayMorpher;
import org.waveywaves.jenkins.plugins.tekton.client.TektonUtils.TektonResourceType;

import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;

public class PipelineRunLogWatch implements Runnable {
private static final Logger logger = Logger.getLogger(PipelineRunLogWatch.class.getName());

private KubernetesClient kubernetesClient;
private TektonClient tektonClient;
private PipelineRun pipelineRun;
OutputStream consoleLogger;

ConcurrentHashMap<String, TaskRun> taskRunsOnWatch = new ConcurrentHashMap<String, TaskRun>();
ConcurrentHashMap<String, Boolean> taskRunsWatchDone = new ConcurrentHashMap<String, Boolean>();

private final String pipelineTaskLabelName = "tekton.dev/pipelineTask";
private final String pipelineRunLabelName = "tekton.dev/pipelineRun";

public PipelineRunLogWatch(KubernetesClient kubernetesClient, TektonClient tektonClient, PipelineRun pipelineRun, OutputStream consoleLogger) {
this.kubernetesClient = kubernetesClient;
this.tektonClient = tektonClient;
this.pipelineRun = pipelineRun;
this.consoleLogger = consoleLogger;
}

@Override
public void run() {
String pipelineRunName = pipelineRun.getMetadata().getName();
String pipelineRunUid = pipelineRun.getMetadata().getUid();
List<PipelineTask> pipelineTasks = pipelineRun.getSpec().getPipelineSpec().getTasks();

for (PipelineTask pt: pipelineTasks){
ListOptions lo = new ListOptions();
lo.setLabelSelector(String.format("%s=%s",pipelineTaskLabelName, pt.getName()));
lo.setLabelSelector(String.format("%s=%s", pipelineRunLabelName, pipelineRunName));

List<TaskRun> taskRunList = tektonClient.v1beta1().taskRuns().list(lo).getItems();
for (TaskRun tr: taskRunList) {
List<OwnerReference> ownerReferences = tr.getMetadata().getOwnerReferences();
for (OwnerReference or : ownerReferences) {
if (or.getUid().equals(pipelineRunUid)){
logger.info(String.format("Streaming logs for TaskRun %s owned by PipelineRun %s", tr.getMetadata().getName(), pipelineRunName));
TaskRunLogWatch logWatch = new TaskRunLogWatch(kubernetesClient, tr, consoleLogger);
Thread logWatchTask = new Thread(logWatch);
logWatchTask.start();
try {
logWatchTask.join();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}
}

}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.waveywaves.jenkins.plugins.tekton.client.logwatch;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.DoneablePod;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.tekton.pipeline.v1beta1.TaskRun;
import org.waveywaves.jenkins.plugins.tekton.client.TektonUtils;
import org.waveywaves.jenkins.plugins.tekton.client.TektonUtils.TektonResourceType;
import org.waveywaves.jenkins.plugins.tekton.client.build.create.CreateStep;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.logging.Logger;

public class TaskRunLogWatch implements Runnable{
private static final Logger logger = Logger.getLogger(TaskRunLogWatch.class.getName());

private KubernetesClient kubernetesClient;
private TaskRun taskRun;
OutputStream consoleLogger;

public TaskRunLogWatch(KubernetesClient kubernetesClient, TaskRun taskRun, OutputStream consoleLogger) {
this.kubernetesClient = kubernetesClient;
this.taskRun = taskRun;
this.consoleLogger = consoleLogger;
}

@Override
public void run() {
List<Pod> pods = kubernetesClient.pods().list().getItems();
Pod taskRunPod = null;
String podName = "";
for (Pod pod : pods) {
List<OwnerReference> ownerReferences = pod.getMetadata().getOwnerReferences();
if (ownerReferences != null && ownerReferences.size() > 0) {
for (OwnerReference or : ownerReferences) {
String orKind = or.getKind();
String orName = or.getName();
if (orKind.toLowerCase().equals(TektonResourceType.taskrun.toString())
&& orName.equals(taskRun.getMetadata().getName())){
podName = pod.getMetadata().getName();
taskRunPod = pod;
}
}
}
}

if (!podName.isEmpty() && taskRunPod != null){
Predicate<Pod> succeededState = i -> (i.getStatus().getPhase().equals("Succeeded"));
PodResource<Pod, DoneablePod> pr = kubernetesClient.pods().inNamespace(taskRunPod.getMetadata().getNamespace()).withName(podName);
try {
pr.waitUntilCondition(succeededState,60, TimeUnit.MINUTES);
} catch ( InterruptedException e) {
logger.warning("Interrupted Exception Occurred");
}
List<String> taskRunContainerNames = new ArrayList<String>();
for (Container c : taskRunPod.getSpec().getContainers()) {
taskRunContainerNames.add(c.getName());
}
for (String i : taskRunContainerNames) {
pr.inContainer(i).watchLog(this.consoleLogger);
}
}
}
}
Loading