From eaa6b02e8a2aec369f727c29a15a7fc44ebcd12a Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Thu, 29 Sep 2016 16:13:06 -0400 Subject: [PATCH 01/16] [JENKINS-38381] Prototype API to receive asynchronous notifications of process output or exit code. --- pom.xml | 14 +- .../durabletask/BourneShellScript.java | 18 +-- .../plugins/durabletask/Controller.java | 13 ++ .../plugins/durabletask/DurableTask.java | 5 +- .../durabletask/FileMonitoringTask.java | 143 ++++++++++++++++-- .../plugins/durabletask/Handler.java | 67 ++++++++ .../executors/OnceRetentionStrategy.java | 6 +- .../durabletask/BourneShellScriptTest.java | 65 +++++++- 8 files changed, 289 insertions(+), 42 deletions(-) create mode 100644 src/main/java/org/jenkinsci/plugins/durabletask/Handler.java diff --git a/pom.xml b/pom.xml index c5d8ea4d..ed9423df 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,8 @@ org.jenkins-ci.plugins plugin - 2.7 + 2.15 + durable-task 1.13-SNAPSHOT @@ -14,26 +15,25 @@ MIT License - http://www.opensource.org/licenses/mit-license.php + https://opensource.org/licenses/MIT - 1.609.1 - 6 - false + 1.642.3 + 1.120 repo.jenkins-ci.org - http://repo.jenkins-ci.org/public/ + https://repo.jenkins-ci.org/public/ repo.jenkins-ci.org - http://repo.jenkins-ci.org/public/ + https://repo.jenkins-ci.org/public/ diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java b/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java index bed07841..e2f1f8cc 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java @@ -179,25 +179,17 @@ private synchronized int pid(FilePath ws) throws IOException, InterruptedExcepti return pid; } - @Override public Integer exitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { - Integer status = super.exitStatus(workspace, launcher); - if (status != null) { - return status; - } + @Override protected Integer specialExitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { int _pid = pid(workspace); if (_pid > 0 && !ProcessLiveness.isAlive(workspace.getChannel(), _pid, launcher)) { - // it looks like the process has disappeared. one last check to make sure it's not a result of a race condition, - // then if we still don't have the exit code, use fake exit code to distinguish from 0 (success) and 1+ (observed failure) + // it looks like the process has disappeared; use fake exit code to distinguish from 0 (success) and 1+ (observed failure) // TODO would be better to have exitStatus accept a TaskListener so we could print an informative message - status = super.exitStatus(workspace, launcher); - if (status == null) { - status = -1; - } - return status; + return -1; } else if (_pid == 0 && /* compatibility */ startTime > 0 && System.currentTimeMillis() - startTime > 1000 * LAUNCH_FAILURE_TIMEOUT) { return -2; // apparently never started + } else { + return null; } - return null; } @Override public String getDiagnostics(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java index f24fc809..c8a3c67b 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java @@ -42,6 +42,19 @@ */ public abstract class Controller implements Serializable { + /** + * Begins watching the process asynchronously, so that the master may receive notification when output is available or the process has exited. + * This should be called as soon as the process is launched, and thereafter whenever reconnecting to the agent. + * You should not call {@link #writeLog} in this case; you do not need to call {@link #exitStatus(FilePath, Launcher)} frequently, + * though it is advisable to still call it occasionally to verify that the process is still running. + * @param workspace the workspace in use + * @param handler a remotable callback + * @throws UnsupportedOperationException when this mode is not available, so you must fall back to polling {@link #writeLog} and {@link #exitStatus(FilePath, Launcher)} + */ + public void watch(@Nonnull FilePath workspace, @Nonnull Handler handler) throws IOException, InterruptedException, UnsupportedOperationException { + throw new UnsupportedOperationException("Asynchronous mode is not implemented in " + getClass().getName()); + } + /** * Obtains any new task log output. * Could use a serializable field to keep track of how much output has been previously written. diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java index d86b5bb9..9142fdc6 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java @@ -53,8 +53,9 @@ public abstract class DurableTask extends AbstractDescribableImpl i public abstract Controller launch(EnvVars env, FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException; /** - * Requests that standard output of the task be captured rather than streamed to {@link Controller#writeLog}. - * If so, you may call {@link Controller#getOutput}. + * Requests that standard output of the task be captured rather than streamed. + * If you use {@link Controller#watch}, standard output will not be sent to {@link Handler#output}; it will be included in {@link Handler#exited} instead. + * Otherwise (using polling mode), standard output will not be sent to {@link Controller#writeLog}; call {@link Controller#getOutput} to collect. * Standard error should still be streamed to the log. * Should be called prior to {@link #launch} to take effect. * @throws UnsupportedOperationException if this implementation does not support that mode diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 163ca833..0f021a21 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -30,19 +30,31 @@ import hudson.Util; import hudson.model.TaskListener; import hudson.remoting.Channel; +import hudson.remoting.DaemonThreadFactory; +import hudson.remoting.NamingThreadFactory; import hudson.remoting.RemoteOutputStream; import hudson.remoting.VirtualChannel; import hudson.slaves.WorkspaceList; +import hudson.util.LogTaskListener; import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.io.RandomAccessFile; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.CheckForNull; import jenkins.MasterToSlaveFileCallable; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.CountingInputStream; /** * A task which forks some external command and then waits for log and status files to be updated/created. @@ -79,6 +91,10 @@ protected FileMonitoringController doLaunch(FilePath workspace, Launcher launche throw new AbstractMethodError("override either doLaunch or launchWithCookie"); } + /** + * Tails a log file and watches for an exit status file. + * Must be remotable so that {@link #watch} can transfer the implementation. + */ protected static class FileMonitoringController extends Controller { /** Absolute path of {@link #controlDir(FilePath)}. */ @@ -91,6 +107,7 @@ protected static class FileMonitoringController extends Controller { /** * Byte offset in the file that has been reported thus far. + * Only used if {@link #writeLog(FilePath, OutputStream)} is used; not used for {@link #watch}. */ private long lastLocation; @@ -130,7 +147,6 @@ private static class WriteLog extends MasterToSlaveFileCallable { if (toRead > Integer.MAX_VALUE) { // >2Gb of output at once is unlikely throw new IOException("large reads not yet implemented"); } - // TODO is this efficient for large amounts of output? Would it be better to stream data, or return a byte[] from the callable? byte[] buf = new byte[(int) toRead]; raf.readFully(buf); sink.write(buf); @@ -144,22 +160,42 @@ private static class WriteLog extends MasterToSlaveFileCallable { } } - // TODO would be more efficient to allow API to consolidate writeLog with exitStatus (save an RPC call) @Override public Integer exitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { FilePath status = getResultFile(workspace); if (status.exists()) { - try { - return Integer.parseInt(status.readToString().trim()); - } catch (NumberFormatException x) { - throw new IOException("corrupted content in " + status + ": " + x, x); - } + return readStatus(status); } else { - return null; + Integer code = specialExitStatus(workspace, launcher); + if (code != null) { + // recheck normal file to defend against race conditions + if (status.exists()) { + return readStatus(status); + } + // Make sure that an exitStatus with a decorated launcher will ultimately result in Handler.exited being called + // and the task idled, even if the result file was never created normally: + status.write(Integer.toString(code), null); + } + return code; + } + } + + private int readStatus(FilePath status) throws IOException, InterruptedException { + try { + return Integer.parseInt(status.readToString().trim()); + } catch (NumberFormatException x) { + throw new IOException("corrupted content in " + status + ": " + x, x); } } + /** + * A way to provide specialized exit statuses other than watching {@link #getResultFile}. + * @return a possible exit status, or null for the default behavior + */ + protected @CheckForNull Integer specialExitStatus(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { + return null; + } + @Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { - // TODO could perhaps be more efficient for large files to send a MasterToSlaveFileCallable return IOUtils.toByteArray(getOutputFile(workspace).read()); } @@ -229,7 +265,96 @@ public FilePath getOutputFile(FilePath workspace) throws IOException, Interrupte } } + @Override public void watch(FilePath workspace, Handler handler) throws IOException, InterruptedException, ClassCastException { + workspace.actAsync(new StartWatching(this, handler)); + } + + /** + * File in which a last-read position is stored if {@link #watch} is used. + */ + public FilePath getLastLocationFile(FilePath workspace) throws IOException, InterruptedException { + return controlDir(workspace).child("last-location.txt"); + } + + private static final long serialVersionUID = 1L; + } + + private static ScheduledExecutorService watchService; + private synchronized static ScheduledExecutorService watchService() { + if (watchService == null) { + watchService = new /*ErrorLogging*/ScheduledThreadPoolExecutor(5, new NamingThreadFactory(new DaemonThreadFactory(), "FileMonitoringTask watcher")); + } + return watchService; + } + + private static class StartWatching extends MasterToSlaveFileCallable { + private static final long serialVersionUID = 1L; + + private final FileMonitoringController controller; + private final Handler handler; + + StartWatching(FileMonitoringController controller, Handler handler) { + this.controller = controller; + this.handler = handler; + } + + @Override public Void invoke(File workspace, VirtualChannel channel) throws IOException, InterruptedException { + watchService().submit(new Watcher(controller, new FilePath(workspace), handler)); + return null; + } + + } + + private static class Watcher implements Runnable { + + // note that LOGGER here is going to the agent log, not master log + private static final Launcher localLauncher = new Launcher.LocalLauncher(new LogTaskListener(LOGGER, Level.INFO)); + + private final FileMonitoringController controller; + private final FilePath workspace; + private final Handler handler; + + Watcher(FileMonitoringController controller, FilePath workspace, Handler handler) { + this.controller = controller; + this.workspace = workspace; + this.handler = handler; + } + + @Override public void run() { + try { + Integer exitStatus = controller.exitStatus(workspace, localLauncher); // check before collecting output, in case the process is just now finishing + long lastLocation = 0; + FilePath lastLocationFile = controller.getLastLocationFile(workspace); + if (lastLocationFile.exists()) { + lastLocation = Long.parseLong(lastLocationFile.readToString()); + } + FilePath logFile = controller.getLogFile(workspace); + long len = logFile.length(); + if (len > lastLocation) { + assert !logFile.isRemote(); + try (FileChannel ch = FileChannel.open(Paths.get(logFile.getRemote()), StandardOpenOption.READ)) { + CountingInputStream cis = new CountingInputStream(Channels.newInputStream(ch.position(lastLocation))); + handler.output(cis); + lastLocationFile.write(Long.toString(lastLocation + cis.getByteCount()), null); + } + } + if (exitStatus != null) { + byte[] output; + if (controller.getOutputFile(workspace).exists()) { + output = controller.getOutput(workspace, localLauncher); + } else { + output = null; + } + handler.exited(exitStatus, output); + } else { + watchService().schedule(this, 100, TimeUnit.MILLISECONDS); // TODO consider an adaptive timeout as in DurableTaskStep.Execution in polling mode + } + } catch (Exception x) { + LOGGER.log(Level.WARNING, "giving up on watching " + controller.controlDir, x); + } + } + } } diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java b/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java new file mode 100644 index 00000000..0abf6f89 --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java @@ -0,0 +1,67 @@ +/* + * The MIT License + * + * Copyright 2016 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.durabletask; + +import hudson.FilePath; +import hudson.Launcher; +import hudson.remoting.VirtualChannel; +import java.io.InputStream; +import java.io.Serializable; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * A remote handler which may be sent to an agent and handle process output and results. + * If it needs to communicate with the master, you may use {@link VirtualChannel#export}. + * @see Controller#watch + */ +public abstract class Handler implements Serializable { + + /** + * Notification that new process output is available. + *

Should only be called when at least one byte is available. + * Whatever bytes are actually read will not be offered on the next call, if there is one; there is no need to close the stream. + *

There is no guarantee that output is offered in the form of complete lines of text, + * though in the typical case of line-oriented output it is likely that it will end in a newline. + *

Buffering is the responsibility of the caller, and {@link InputStream#markSupported} may be false. + * @param stream a way to read process output which has not already been handled + * @throws Exception if anything goes wrong, this watch is deactivated + */ + public abstract void output(@Nonnull InputStream stream) throws Exception; + + /** + * Notification that the process has exited or vanished. + * {@link #output} should have been called with any final uncollected output. + *

Any metadata associated with the process may be deleted after this call completes, rendering subsequent {@link Controller} calls unsatisfiable. + *

Note that unlike {@link Controller#exitStatus(FilePath, Launcher)}, no specialized {@link Launcher} is available on the agent, + * so if there are specialized techniques for determining process liveness they will not be considered here; + * you still need to occasionally poll for an exit status from the master. + * @param code the exit code, if known (0 conventionally represents success); may be negative for anomalous conditions such as a missing process + * @param output standard output captured, if {@link DurableTask#captureOutput} was called; else null + * @throws Exception if anything goes wrong, this watch is deactivated + */ + public abstract void exited(int code, @Nullable byte[] output) throws Exception; + +} diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/executors/OnceRetentionStrategy.java b/src/main/java/org/jenkinsci/plugins/durabletask/executors/OnceRetentionStrategy.java index fff0cd55..6ab114e5 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/executors/OnceRetentionStrategy.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/executors/OnceRetentionStrategy.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; -import jenkins.security.NotReallyRoleSensitiveCallable; /** * Retention strategy that allows a cloud slave to run only a single build before disconnecting. @@ -119,8 +118,8 @@ private void done(final AbstractCloudComputer c) { Computer.threadPoolForRemoting.submit(new Runnable() { @Override public void run() { - Queue.withLock(new NotReallyRoleSensitiveCallable() { - @Override public Void call() { + Queue.withLock(new Runnable() { + @Override public void run() { try { AbstractCloudSlave node = c.getNode(); if (node != null) { @@ -137,7 +136,6 @@ public void run() { terminating = false; } } - return null; } }); } diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index b7022560..3538eff6 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -27,25 +27,31 @@ import hudson.EnvVars; import hudson.FilePath; import hudson.Launcher; +import hudson.model.Slave; +import hudson.remoting.VirtualChannel; import hudson.util.StreamTaskListener; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Rule; -import org.junit.Test; -import org.jvnet.hudson.test.JenkinsRule; - import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.InputStream; import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.io.IOUtils; import static org.hamcrest.Matchers.containsString; +import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; import org.jvnet.hudson.test.Issue; +import org.jvnet.hudson.test.JenkinsRule; public class BourneShellScriptTest extends Assert { @Rule public JenkinsRule j = new JenkinsRule(); - @Before public void unix() { + @BeforeClass public static void unix() { Assume.assumeTrue("This test is only for Unix", File.pathSeparatorChar==':'); } @@ -138,4 +144,49 @@ public void smokeTest() throws Exception { c.cleanup(ws); } + @Issue("JENKINS-3838") + @Test public void watch() throws Exception { + Slave s = j.createOnlineSlave(); + ws = s.getWorkspaceRoot(); + launcher = s.createLauncher(listener); + DurableTask task = new BourneShellScript("set +x; for x in 1 2 3 4 5; do echo $x; sleep 1; done"); + Controller c = task.launch(new EnvVars(), ws, launcher, listener); + BlockingQueue status = new LinkedBlockingQueue<>(); + BlockingQueue output = new LinkedBlockingQueue<>(); + BlockingQueue lines = new LinkedBlockingQueue<>(); + c.watch(ws, new MockHandler(s.getChannel(), status, output, lines)); + assertEquals("+ set +x", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("", output.take()); + assertEquals("[1, 2, 3, 4, 5]", lines.toString()); + task = new BourneShellScript("echo result"); + task.captureOutput(); + c = task.launch(new EnvVars(), ws, launcher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(ws, new MockHandler(s.getChannel(), status, output, lines)); + assertEquals(0, status.take().intValue()); + assertEquals("result\n", output.take()); + assertEquals("[+ echo result]", lines.toString()); + } + private static class MockHandler extends Handler { + private final BlockingQueue status; + private final BlockingQueue output; + private final BlockingQueue lines; + @SuppressWarnings("unchecked") + MockHandler(VirtualChannel channel, BlockingQueue status, BlockingQueue output, BlockingQueue lines) { + this.status = channel.export(BlockingQueue.class, status); + this.output = channel.export(BlockingQueue.class, output); + this.lines = channel.export(BlockingQueue.class, lines); + } + @Override public void output(InputStream stream) throws Exception { + lines.addAll(IOUtils.readLines(stream)); + } + @Override public void exited(int code, byte[] data) throws Exception { + status.add(code); + output.add(data != null ? new String(data) : ""); + } + } + } From 7ab9e49c1cb7935f783f24e9189089419dd4d5e6 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Fri, 30 Sep 2016 16:49:02 -0400 Subject: [PATCH 02/16] In the case of WithContainerStep, do not even try to have ShellController call ProcessLiveness. It is guaranteed to not find the process, which would cause an immediate -1 return value. Instead limit the watcher to the simple result file, and check for special statuses only if exitStatus is being called from the master. --- .../durabletask/FileMonitoringTask.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 0f021a21..ddb42ef9 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -35,7 +35,6 @@ import hudson.remoting.RemoteOutputStream; import hudson.remoting.VirtualChannel; import hudson.slaves.WorkspaceList; -import hudson.util.LogTaskListener; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -179,6 +178,16 @@ private static class WriteLog extends MasterToSlaveFileCallable { } } + /** Like {@link #exitStatus} but when we cannot rely on a {@link Launcher}. */ + Integer _exitStatus(FilePath workspace) throws IOException, InterruptedException { + FilePath status = getResultFile(workspace); + if (status.exists()) { + return readStatus(status); + } else { + return null; + } + } + private int readStatus(FilePath status) throws IOException, InterruptedException { try { return Integer.parseInt(status.readToString().trim()); @@ -196,6 +205,11 @@ private int readStatus(FilePath status) throws IOException, InterruptedException } @Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { + return _getOutput(workspace); + } + + /** Like {@link #getOutput} but when we cannot rely on a {@link Launcher}. */ + byte[] _getOutput(FilePath workspace) throws IOException, InterruptedException { return IOUtils.toByteArray(getOutputFile(workspace).read()); } @@ -308,9 +322,6 @@ private static class StartWatching extends MasterToSlaveFileCallable { private static class Watcher implements Runnable { - // note that LOGGER here is going to the agent log, not master log - private static final Launcher localLauncher = new Launcher.LocalLauncher(new LogTaskListener(LOGGER, Level.INFO)); - private final FileMonitoringController controller; private final FilePath workspace; private final Handler handler; @@ -323,7 +334,7 @@ private static class Watcher implements Runnable { @Override public void run() { try { - Integer exitStatus = controller.exitStatus(workspace, localLauncher); // check before collecting output, in case the process is just now finishing + Integer exitStatus = controller._exitStatus(workspace); // check before collecting output, in case the process is just now finishing long lastLocation = 0; FilePath lastLocationFile = controller.getLastLocationFile(workspace); if (lastLocationFile.exists()) { @@ -342,7 +353,7 @@ private static class Watcher implements Runnable { if (exitStatus != null) { byte[] output; if (controller.getOutputFile(workspace).exists()) { - output = controller.getOutput(workspace, localLauncher); + output = controller._getOutput(workspace); } else { output = null; } @@ -351,6 +362,7 @@ private static class Watcher implements Runnable { watchService().schedule(this, 100, TimeUnit.MILLISECONDS); // TODO consider an adaptive timeout as in DurableTaskStep.Execution in polling mode } } catch (Exception x) { + // note that LOGGER here is going to the agent log, not master log LOGGER.log(Level.WARNING, "giving up on watching " + controller.controlDir, x); } } From 642d7be8626c52a71e917785862d4ff9f96dcf74 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Mon, 3 Oct 2016 17:31:44 -0400 Subject: [PATCH 03/16] 2.16 parent --- pom.xml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index ed9423df..d145ec5d 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.jenkins-ci.plugins plugin - 2.15 + 2.16 durable-task @@ -21,7 +21,6 @@ 1.642.3 - 1.120 From 51a6b8d454f6c8e3c237ed7473428de8f945c40b Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Mon, 3 Oct 2016 18:03:22 -0400 Subject: [PATCH 04/16] Watcher is on the hook to call cleanup. --- src/main/java/org/jenkinsci/plugins/durabletask/Controller.java | 2 +- .../org/jenkinsci/plugins/durabletask/FileMonitoringTask.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java index c8a3c67b..002e9cc0 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java @@ -45,7 +45,7 @@ public abstract class Controller implements Serializable { /** * Begins watching the process asynchronously, so that the master may receive notification when output is available or the process has exited. * This should be called as soon as the process is launched, and thereafter whenever reconnecting to the agent. - * You should not call {@link #writeLog} in this case; you do not need to call {@link #exitStatus(FilePath, Launcher)} frequently, + * You should not call {@link #writeLog} or {@link #cleanup} in this case; you do not need to call {@link #exitStatus(FilePath, Launcher)} frequently, * though it is advisable to still call it occasionally to verify that the process is still running. * @param workspace the workspace in use * @param handler a remotable callback diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index ddb42ef9..2fce7327 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -358,6 +358,7 @@ private static class Watcher implements Runnable { output = null; } handler.exited(exitStatus, output); + controller.cleanup(workspace); } else { watchService().schedule(this, 100, TimeUnit.MILLISECONDS); // TODO consider an adaptive timeout as in DurableTaskStep.Execution in polling mode } From 0dc09602ad4831a19cc47640bf7289b5c017b801 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Tue, 4 Oct 2016 11:20:16 -0400 Subject: [PATCH 05/16] A bit more logging. --- .../org/jenkinsci/plugins/durabletask/FileMonitoringTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 2fce7327..59d256a1 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -281,6 +281,7 @@ public FilePath getOutputFile(FilePath workspace) throws IOException, Interrupte @Override public void watch(FilePath workspace, Handler handler) throws IOException, InterruptedException, ClassCastException { workspace.actAsync(new StartWatching(this, handler)); + LOGGER.log(Level.FINE, "started asynchronous watch in {0}", controlDir); } /** From 89fd3196c442c6dcddf3001633bd66b0b90bb426 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Tue, 4 Oct 2016 14:03:59 -0400 Subject: [PATCH 06/16] Mistyped issue number. --- .../jenkinsci/plugins/durabletask/BourneShellScriptTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index 3538eff6..a6306e64 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -144,7 +144,7 @@ public void smokeTest() throws Exception { c.cleanup(ws); } - @Issue("JENKINS-3838") + @Issue("JENKINS-38381") @Test public void watch() throws Exception { Slave s = j.createOnlineSlave(); ws = s.getWorkspaceRoot(); From d0e9f762e5b8a44fe55381f78e3373f448710014 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 5 Oct 2016 15:56:04 -0400 Subject: [PATCH 07/16] Recode process output into UTF-8. --- .../plugins/durabletask/FileMonitoringTask.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 59d256a1..1e71ab59 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -24,6 +24,7 @@ package org.jenkinsci.plugins.durabletask; +import com.google.common.base.Charsets; import hudson.EnvVars; import hudson.FilePath; import hudson.Launcher; @@ -37,10 +38,13 @@ import hudson.slaves.WorkspaceList; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.RandomAccessFile; import java.nio.channels.Channels; import java.nio.channels.FileChannel; +import java.nio.charset.Charset; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Collections; @@ -54,6 +58,7 @@ import jenkins.MasterToSlaveFileCallable; import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.input.ReaderInputStream; /** * A task which forks some external command and then waits for log and status files to be updated/created. @@ -346,7 +351,15 @@ private static class Watcher implements Runnable { if (len > lastLocation) { assert !logFile.isRemote(); try (FileChannel ch = FileChannel.open(Paths.get(logFile.getRemote()), StandardOpenOption.READ)) { - CountingInputStream cis = new CountingInputStream(Channels.newInputStream(ch.position(lastLocation))); + InputStream locallyEncodedStream = Channels.newInputStream(ch.position(lastLocation)); + InputStream utf8EncodedStream; + Charset nativeEncoding = Charset.defaultCharset(); + if (nativeEncoding.equals(Charsets.UTF_8)) { + utf8EncodedStream = locallyEncodedStream; + } else { + utf8EncodedStream = new ReaderInputStream(new InputStreamReader(locallyEncodedStream, nativeEncoding), Charsets.UTF_8); + } + CountingInputStream cis = new CountingInputStream(utf8EncodedStream); handler.output(cis); lastLocationFile.write(Long.toString(lastLocation + cis.getByteCount()), null); } From 3f58d6cdedbe4d4b4c648803349d812feb69491b Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 5 Oct 2016 18:22:18 -0400 Subject: [PATCH 08/16] Switching to StandardCharsets rather than using the Guava version. --- .../jenkinsci/plugins/durabletask/FileMonitoringTask.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 1e71ab59..bfa04d38 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -24,7 +24,6 @@ package org.jenkinsci.plugins.durabletask; -import com.google.common.base.Charsets; import hudson.EnvVars; import hudson.FilePath; import hudson.Launcher; @@ -45,6 +44,7 @@ import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Collections; @@ -354,10 +354,10 @@ private static class Watcher implements Runnable { InputStream locallyEncodedStream = Channels.newInputStream(ch.position(lastLocation)); InputStream utf8EncodedStream; Charset nativeEncoding = Charset.defaultCharset(); - if (nativeEncoding.equals(Charsets.UTF_8)) { + if (nativeEncoding.equals(StandardCharsets.UTF_8)) { utf8EncodedStream = locallyEncodedStream; } else { - utf8EncodedStream = new ReaderInputStream(new InputStreamReader(locallyEncodedStream, nativeEncoding), Charsets.UTF_8); + utf8EncodedStream = new ReaderInputStream(new InputStreamReader(locallyEncodedStream, nativeEncoding), StandardCharsets.UTF_8); } CountingInputStream cis = new CountingInputStream(utf8EncodedStream); handler.output(cis); From 8f11c53b13f8cdf7a486aa7cb604fbc4ce38a78b Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Wed, 7 Dec 2016 15:15:23 -0500 Subject: [PATCH 09/16] Updated parent to pick up jitpack.io support. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d145ec5d..b7a0526d 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.jenkins-ci.plugins plugin - 2.16 + 2.19 durable-task From 532ac57d81031719593474b88aa30c51dbcb33fe Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Fri, 9 Feb 2018 11:45:59 -0500 Subject: [PATCH 10/16] Comments. --- .../plugins/durabletask/FileMonitoringTask.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 9ddad373..6c02bbab 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -182,6 +182,9 @@ private static class WriteLog extends MasterToSlaveFileCallable { return exitStatus(workspace, listener); } + /** + * Like {@link #exitStatus(FilePath, Launcher, TaskListener)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway. + */ protected Integer exitStatus(FilePath workspace, TaskListener listener) throws IOException, InterruptedException { FilePath status = getResultFile(workspace); if (status.exists()) { @@ -199,6 +202,9 @@ protected Integer exitStatus(FilePath workspace, TaskListener listener) throws I return getOutput(workspace); } + /** + * Like {@link #getOutput(FilePath, Launcher)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway. + */ protected byte[] getOutput(FilePath workspace) throws IOException, InterruptedException { try (InputStream is = getOutputFile(workspace).read()) { return IOUtils.toByteArray(is); @@ -207,6 +213,7 @@ protected byte[] getOutput(FilePath workspace) throws IOException, InterruptedEx @Override public final void stop(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { launcher.kill(Collections.singletonMap(COOKIE, cookieFor(workspace))); + // TODO after 10s, if the control dir still exists, write a flag file and have the Watcher shut down (interrupting any ongoing handler.output call if possible) } @Override public void cleanup(FilePath workspace) throws IOException, InterruptedException { @@ -366,7 +373,9 @@ private static class Watcher implements Runnable { handler.exited(exitStatus, output); controller.cleanup(workspace); } else { - watchService().schedule(this, 100, TimeUnit.MILLISECONDS); // TODO consider an adaptive timeout as in DurableTaskStep.Execution in polling mode + // Could use an adaptive timeout as in DurableTaskStep.Execution in polling mode, + // though less relevant here since there is no network overhead to the check. + watchService().schedule(this, 100, TimeUnit.MILLISECONDS); } } catch (Exception x) { // note that LOGGER here is going to the agent log, not master log From b945cf22b5b594516447372b34a3ebc448fd6690 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Fri, 9 Feb 2018 12:37:31 -0500 Subject: [PATCH 11/16] [JENKINS-38381] Controller.watch API. --- .../durabletask/BourneShellScript.java | 4 +- .../plugins/durabletask/Controller.java | 18 ++- .../plugins/durabletask/DurableTask.java | 5 +- .../durabletask/FileMonitoringTask.java | 128 +++++++++++++++++- .../plugins/durabletask/Handler.java | 67 +++++++++ .../durabletask/BourneShellScriptTest.java | 51 +++++++ 6 files changed, 264 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/jenkinsci/plugins/durabletask/Handler.java diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java b/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java index e93b7be3..c563c77d 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java @@ -207,8 +207,8 @@ private FilePath pidFile(FilePath ws) throws IOException, InterruptedException { return controlDir(ws).child("pid"); } - @Override public Integer exitStatus(FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException { - Integer status = super.exitStatus(workspace, launcher, listener); + @Override protected Integer exitStatus(FilePath workspace, TaskListener listener) throws IOException, InterruptedException { + Integer status = super.exitStatus(workspace, listener); if (status != null) { LOGGER.log(Level.FINE, "found exit code {0} in {1}", new Object[] {status, controlDir}); return status; diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java index e110bc49..223cb4a4 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java @@ -43,6 +43,20 @@ */ public abstract class Controller implements Serializable { + /** + * Begins watching the process asynchronously, so that the master may receive notification when output is available or the process has exited. + * This should be called as soon as the process is launched, and thereafter whenever reconnecting to the agent. + * You should not call {@link #writeLog} or {@link #cleanup} in this case; you do not need to call {@link #exitStatus(FilePath, Launcher)} frequently, + * though it is advisable to still call it occasionally to verify that the process is still running. + * @param workspace the workspace in use + * @param handler a remotable callback + * @param listener a remotable destination for messages + * @throws UnsupportedOperationException when this mode is not available, so you must fall back to polling {@link #writeLog} and {@link #exitStatus(FilePath, Launcher)} + */ + public void watch(@Nonnull FilePath workspace, @Nonnull Handler handler, @Nonnull TaskListener listener) throws IOException, InterruptedException, UnsupportedOperationException { + throw new UnsupportedOperationException("Asynchronous mode is not implemented in " + getClass().getName()); + } + /** * Obtains any new task log output. * Could use a serializable field to keep track of how much output has been previously written. @@ -55,7 +69,7 @@ public abstract class Controller implements Serializable { /** * Checks whether the task has finished. * @param workspace the workspace in use - * @param launcher a way to start processes + * @param launcher a way to start processes (currently unused) * @param logger a way to report special messages * @return an exit code (zero is successful), or null if the task appears to still be running */ @@ -86,7 +100,7 @@ public abstract class Controller implements Serializable { * Intended for use after {@link #exitStatus(FilePath, Launcher)} has returned a non-null status. * The result is undefined if {@link DurableTask#captureOutput} was not called before launch; generally an {@link IOException} will result. * @param workspace the workspace in use - * @param launcher a way to start processes + * @param launcher a way to start processes (currently unused) * @return the output of the process as raw bytes (may be empty but not null) */ public @Nonnull byte[] getOutput(@Nonnull FilePath workspace, @Nonnull Launcher launcher) throws IOException, InterruptedException { diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java index d86b5bb9..9142fdc6 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java @@ -53,8 +53,9 @@ public abstract class DurableTask extends AbstractDescribableImpl i public abstract Controller launch(EnvVars env, FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException; /** - * Requests that standard output of the task be captured rather than streamed to {@link Controller#writeLog}. - * If so, you may call {@link Controller#getOutput}. + * Requests that standard output of the task be captured rather than streamed. + * If you use {@link Controller#watch}, standard output will not be sent to {@link Handler#output}; it will be included in {@link Handler#exited} instead. + * Otherwise (using polling mode), standard output will not be sent to {@link Controller#writeLog}; call {@link Controller#getOutput} to collect. * Standard error should still be streamed to the log. * Should be called prior to {@link #launch} to take effect. * @throws UnsupportedOperationException if this implementation does not support that mode diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 7ef7d3d7..1b281fea 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -30,6 +30,8 @@ import hudson.Util; import hudson.model.TaskListener; import hudson.remoting.Channel; +import hudson.remoting.DaemonThreadFactory; +import hudson.remoting.NamingThreadFactory; import hudson.remoting.RemoteOutputStream; import hudson.remoting.VirtualChannel; import hudson.slaves.WorkspaceList; @@ -39,15 +41,23 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.RandomAccessFile; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.io.StringWriter; import java.util.Collections; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import jenkins.MasterToSlaveFileCallable; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.CountingInputStream; /** * A task which forks some external command and then waits for log and status files to be updated/created. @@ -95,6 +105,10 @@ protected static Map escape(EnvVars envVars) { return m; } + /** + * Tails a log file and watches for an exit status file. + * Must be remotable so that {@link #watch} can transfer the implementation. + */ protected static class FileMonitoringController extends Controller { /** Absolute path of {@link #controlDir(FilePath)}. */ @@ -107,6 +121,7 @@ protected static class FileMonitoringController extends Controller { /** * Byte offset in the file that has been reported thus far. + * Only used if {@link #writeLog(FilePath, OutputStream)} is used; not used for {@link #watch}. */ private long lastLocation; @@ -146,7 +161,6 @@ private static class WriteLog extends MasterToSlaveFileCallable { if (toRead > Integer.MAX_VALUE) { // >2Gb of output at once is unlikely throw new IOException("large reads not yet implemented"); } - // TODO is this efficient for large amounts of output? Would it be better to stream data, or return a byte[] from the callable? byte[] buf = new byte[(int) toRead]; raf.readFully(buf); sink.write(buf); @@ -160,8 +174,14 @@ private static class WriteLog extends MasterToSlaveFileCallable { } } - // TODO would be more efficient to allow API to consolidate writeLog with exitStatus (save an RPC call) @Override public Integer exitStatus(FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException { + return exitStatus(workspace, listener); + } + + /** + * Like {@link #exitStatus(FilePath, Launcher, TaskListener)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway. + */ + protected Integer exitStatus(FilePath workspace, TaskListener listener) throws IOException, InterruptedException { FilePath status = getResultFile(workspace); if (status.exists()) { try { @@ -175,7 +195,13 @@ private static class WriteLog extends MasterToSlaveFileCallable { } @Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { - // TODO could perhaps be more efficient for large files to send a MasterToSlaveFileCallable + return getOutput(workspace); + } + + /** + * Like {@link #getOutput(FilePath, Launcher)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway. + */ + protected byte[] getOutput(FilePath workspace) throws IOException, InterruptedException { try (InputStream is = getOutputFile(workspace).read()) { return IOUtils.toByteArray(is); } @@ -183,6 +209,7 @@ private static class WriteLog extends MasterToSlaveFileCallable { @Override public final void stop(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { launcher.kill(Collections.singletonMap(COOKIE, cookieFor(workspace))); + // TODO after 10s, if the control dir still exists, write a flag file and have the Watcher shut down (interrupting any ongoing handler.output call if possible) } @Override public void cleanup(FilePath workspace) throws IOException, InterruptedException { @@ -248,7 +275,102 @@ public FilePath getOutputFile(FilePath workspace) throws IOException, Interrupte } } + @Override public void watch(FilePath workspace, Handler handler, TaskListener listener) throws IOException, InterruptedException, ClassCastException { + workspace.actAsync(new StartWatching(this, handler, listener)); + LOGGER.log(Level.FINE, "started asynchronous watch in {0}", controlDir); + } + + /** + * File in which a last-read position is stored if {@link #watch} is used. + */ + public FilePath getLastLocationFile(FilePath workspace) throws IOException, InterruptedException { + return controlDir(workspace).child("last-location.txt"); + } + + private static final long serialVersionUID = 1L; + } + + private static ScheduledExecutorService watchService; + private synchronized static ScheduledExecutorService watchService() { + if (watchService == null) { + watchService = new /*ErrorLogging*/ScheduledThreadPoolExecutor(5, new NamingThreadFactory(new DaemonThreadFactory(), "FileMonitoringTask watcher")); + } + return watchService; + } + + private static class StartWatching extends MasterToSlaveFileCallable { + private static final long serialVersionUID = 1L; + + private final FileMonitoringController controller; + private final Handler handler; + private final TaskListener listener; + + StartWatching(FileMonitoringController controller, Handler handler, TaskListener listener) { + this.controller = controller; + this.handler = handler; + this.listener = listener; + } + + @Override public Void invoke(File workspace, VirtualChannel channel) throws IOException, InterruptedException { + watchService().submit(new Watcher(controller, new FilePath(workspace), handler, listener)); + return null; + } + + } + + private static class Watcher implements Runnable { + + private final FileMonitoringController controller; + private final FilePath workspace; + private final Handler handler; + private final TaskListener listener; + + Watcher(FileMonitoringController controller, FilePath workspace, Handler handler, TaskListener listener) { + this.controller = controller; + this.workspace = workspace; + this.handler = handler; + this.listener = listener; + } + + @Override public void run() { + try { + Integer exitStatus = controller.exitStatus(workspace, listener); // check before collecting output, in case the process is just now finishing + long lastLocation = 0; + FilePath lastLocationFile = controller.getLastLocationFile(workspace); + if (lastLocationFile.exists()) { + lastLocation = Long.parseLong(lastLocationFile.readToString()); + } + FilePath logFile = controller.getLogFile(workspace); + long len = logFile.length(); + if (len > lastLocation) { + assert !logFile.isRemote(); + try (FileChannel ch = FileChannel.open(Paths.get(logFile.getRemote()), StandardOpenOption.READ)) { + CountingInputStream cis = new CountingInputStream(Channels.newInputStream(ch.position(lastLocation))); + handler.output(cis); + lastLocationFile.write(Long.toString(lastLocation + cis.getByteCount()), null); + } + } + if (exitStatus != null) { + byte[] output; + if (controller.getOutputFile(workspace).exists()) { + output = controller.getOutput(workspace); + } else { + output = null; + } + handler.exited(exitStatus, output); + controller.cleanup(workspace); + } else { + // Could use an adaptive timeout as in DurableTaskStep.Execution in polling mode, + // though less relevant here since there is no network overhead to the check. + watchService().schedule(this, 100, TimeUnit.MILLISECONDS); + } + } catch (Exception x) { + // note that LOGGER here is going to the agent log, not master log + LOGGER.log(Level.WARNING, "giving up on watching " + controller.controlDir, x); + } + } + } } diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java b/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java new file mode 100644 index 00000000..0abf6f89 --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java @@ -0,0 +1,67 @@ +/* + * The MIT License + * + * Copyright 2016 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.durabletask; + +import hudson.FilePath; +import hudson.Launcher; +import hudson.remoting.VirtualChannel; +import java.io.InputStream; +import java.io.Serializable; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * A remote handler which may be sent to an agent and handle process output and results. + * If it needs to communicate with the master, you may use {@link VirtualChannel#export}. + * @see Controller#watch + */ +public abstract class Handler implements Serializable { + + /** + * Notification that new process output is available. + *

Should only be called when at least one byte is available. + * Whatever bytes are actually read will not be offered on the next call, if there is one; there is no need to close the stream. + *

There is no guarantee that output is offered in the form of complete lines of text, + * though in the typical case of line-oriented output it is likely that it will end in a newline. + *

Buffering is the responsibility of the caller, and {@link InputStream#markSupported} may be false. + * @param stream a way to read process output which has not already been handled + * @throws Exception if anything goes wrong, this watch is deactivated + */ + public abstract void output(@Nonnull InputStream stream) throws Exception; + + /** + * Notification that the process has exited or vanished. + * {@link #output} should have been called with any final uncollected output. + *

Any metadata associated with the process may be deleted after this call completes, rendering subsequent {@link Controller} calls unsatisfiable. + *

Note that unlike {@link Controller#exitStatus(FilePath, Launcher)}, no specialized {@link Launcher} is available on the agent, + * so if there are specialized techniques for determining process liveness they will not be considered here; + * you still need to occasionally poll for an exit status from the master. + * @param code the exit code, if known (0 conventionally represents success); may be negative for anomalous conditions such as a missing process + * @param output standard output captured, if {@link DurableTask#captureOutput} was called; else null + * @throws Exception if anything goes wrong, this watch is deactivated + */ + public abstract void exited(int code, @Nullable byte[] output) throws Exception; + +} diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index 78ff543a..1cfc2a73 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -27,15 +27,21 @@ import hudson.EnvVars; import hudson.FilePath; import hudson.Launcher; +import hudson.model.Slave; import hudson.plugins.sshslaves.SSHLauncher; +import hudson.remoting.VirtualChannel; import hudson.slaves.DumbSlave; import hudson.slaves.OfflineCause; import hudson.util.StreamTaskListener; import hudson.util.VersionNumber; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.InputStream; import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; +import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.TeeOutputStream; import static org.hamcrest.Matchers.*; import org.jenkinsci.test.acceptance.docker.Docker; @@ -168,6 +174,51 @@ public void smokeTest() throws Exception { c.cleanup(ws); } + @Issue("JENKINS-38381") + @Test public void watch() throws Exception { + Slave s = j.createOnlineSlave(); + ws = s.getWorkspaceRoot(); + launcher = s.createLauncher(listener); + DurableTask task = new BourneShellScript("set +x; for x in 1 2 3 4 5; do echo $x; sleep 1; done"); + Controller c = task.launch(new EnvVars(), ws, launcher, listener); + BlockingQueue status = new LinkedBlockingQueue<>(); + BlockingQueue output = new LinkedBlockingQueue<>(); + BlockingQueue lines = new LinkedBlockingQueue<>(); + c.watch(ws, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ set +x", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("", output.take()); + assertEquals("[1, 2, 3, 4, 5]", lines.toString()); + task = new BourneShellScript("echo result"); + task.captureOutput(); + c = task.launch(new EnvVars(), ws, launcher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(ws, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals(0, status.take().intValue()); + assertEquals("result\n", output.take()); + assertEquals("[+ echo result]", lines.toString()); + } + private static class MockHandler extends Handler { + private final BlockingQueue status; + private final BlockingQueue output; + private final BlockingQueue lines; + @SuppressWarnings("unchecked") + MockHandler(VirtualChannel channel, BlockingQueue status, BlockingQueue output, BlockingQueue lines) { + this.status = channel.export(BlockingQueue.class, status); + this.output = channel.export(BlockingQueue.class, output); + this.lines = channel.export(BlockingQueue.class, lines); + } + @Override public void output(InputStream stream) throws Exception { + lines.addAll(IOUtils.readLines(stream)); + } + @Override public void exited(int code, byte[] data) throws Exception { + status.add(code); + output.add(data != null ? new String(data) : ""); + } + } + @Issue("JENKINS-40734") @Test public void envWithShellChar() throws Exception { Controller c = new BourneShellScript("echo \"value=$MYNEWVAR\"").launch(new EnvVars("MYNEWVAR", "foo$$bar"), ws, launcher, listener); From e703cc829c8e95fc0cd8ef577e98a5749871d888 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Mon, 12 Feb 2018 17:16:17 -0500 Subject: [PATCH 12/16] [JENKINS-31096] Define API for gathering command output in a local encoding. --- .../plugins/durabletask/Controller.java | 4 + .../plugins/durabletask/DurableTask.java | 21 +++ .../durabletask/FileMonitoringTask.java | 78 ++++++++-- .../durabletask/BourneShellScriptTest.java | 140 ++++++++++++++++++ 4 files changed, 233 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java index e110bc49..9fbc01ca 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java @@ -49,6 +49,8 @@ public abstract class Controller implements Serializable { * @param workspace the workspace in use * @param sink where to send new log output * @return true if something was written and the controller should be resaved, false if everything is idle + * @see DurableTask#charset + * @see DurableTask#defaultCharset */ public abstract boolean writeLog(FilePath workspace, OutputStream sink) throws IOException, InterruptedException; @@ -88,6 +90,8 @@ public abstract class Controller implements Serializable { * @param workspace the workspace in use * @param launcher a way to start processes * @return the output of the process as raw bytes (may be empty but not null) + * @see DurableTask#charset + * @see DurableTask#defaultCharset */ public @Nonnull byte[] getOutput(@Nonnull FilePath workspace, @Nonnull Launcher launcher) throws IOException, InterruptedException { throw new IOException("Did not implement getOutput in " + getClass().getName()); diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java index d86b5bb9..22711cec 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java @@ -31,6 +31,8 @@ import hudson.model.AbstractDescribableImpl; import hudson.model.TaskListener; import java.io.IOException; +import java.nio.charset.Charset; +import javax.annotation.Nonnull; /** * A task which may be run asynchronously on a build node and withstand disconnection of the slave agent. @@ -63,4 +65,23 @@ public void captureOutput() throws UnsupportedOperationException { throw new UnsupportedOperationException("Capturing of output is not implemented in " + getClass().getName()); } + /** + * Requests that a specified charset be used to transcode process output. + * The encoding of {@link Controller#writeLog} and {@link Controller#getOutput} is then presumed to be UTF-8. + * If not called, no translation is performed. + * @param cs the character set in which process output is expected to be + */ + public void charset(@Nonnull Charset cs) { + // by default, ignore + } + + /** + * Requests that the node’s system charset be used to transcode process output. + * The encoding of {@link Controller#writeLog} and {@link Controller#getOutput} is then presumed to be UTF-8. + * If not called, no translation is performed. + */ + public void defaultCharset() { + // by default, ignore + } + } diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 7ef7d3d7..480db070 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -36,18 +36,24 @@ import hudson.util.StreamTaskListener; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.io.RandomAccessFile; import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.charset.Charset; +import java.nio.charset.CodingErrorAction; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Map; import java.util.TreeMap; import java.util.UUID; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; import jenkins.MasterToSlaveFileCallable; -import org.apache.commons.io.IOUtils; +import org.apache.commons.io.FileUtils; /** * A task which forks some external command and then waits for log and status files to be updated/created. @@ -58,12 +64,19 @@ public abstract class FileMonitoringTask extends DurableTask { private static final String COOKIE = "JENKINS_SERVER_COOKIE"; + /** + * Charset name to use for transcoding, or the empty string for node system default, or null for no transcoding. + */ + private @CheckForNull String charset; + private static String cookieFor(FilePath workspace) { return "durable-" + Util.getDigestOf(workspace.getRemote()); } @Override public final Controller launch(EnvVars env, FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException { - return launchWithCookie(workspace, launcher, listener, env, COOKIE, cookieFor(workspace)); + FileMonitoringController controller = launchWithCookie(workspace, launcher, listener, env, COOKIE, cookieFor(workspace)); + controller.charset = charset; + return controller; } protected FileMonitoringController launchWithCookie(FilePath workspace, Launcher launcher, TaskListener listener, EnvVars envVars, String cookieVariable, String cookieValue) throws IOException, InterruptedException { @@ -71,6 +84,14 @@ protected FileMonitoringController launchWithCookie(FilePath workspace, Launcher return doLaunch(workspace, launcher, listener, envVars); } + @Override public final void charset(Charset cs) { + charset = cs.name(); + } + + @Override public final void defaultCharset() { + charset = ""; + } + /** * Should start a process which sends output to {@linkplain FileMonitoringController#getLogFile(FilePath) log file} * in the workspace and finally writes its exit code to {@linkplain FileMonitoringController#getResultFile(FilePath) result file}. @@ -110,6 +131,9 @@ protected static class FileMonitoringController extends Controller { */ private long lastLocation; + /** @see FileMonitoringTask#charset */ + private @CheckForNull String charset; + protected FileMonitoringController(FilePath ws) throws IOException, InterruptedException { // can't keep ws reference because Controller is expected to be serializable ws.mkdirs(); @@ -120,7 +144,7 @@ protected FileMonitoringController(FilePath ws) throws IOException, InterruptedE @Override public final boolean writeLog(FilePath workspace, OutputStream sink) throws IOException, InterruptedException { FilePath log = getLogFile(workspace); - Long newLocation = log.act(new WriteLog(lastLocation, new RemoteOutputStream(sink))); + Long newLocation = log.act(new WriteLog(lastLocation, new RemoteOutputStream(sink), charset)); if (newLocation != null) { LOGGER.log(Level.FINE, "copied {0} bytes from {1}", new Object[] {newLocation - lastLocation, log}); lastLocation = newLocation; @@ -132,9 +156,11 @@ protected FileMonitoringController(FilePath ws) throws IOException, InterruptedE private static class WriteLog extends MasterToSlaveFileCallable { private final long lastLocation; private final OutputStream sink; - WriteLog(long lastLocation, OutputStream sink) { + private final @CheckForNull String charset; + WriteLog(long lastLocation, OutputStream sink, String charset) { this.lastLocation = lastLocation; this.sink = sink; + this.charset = charset; } @Override public Long invoke(File f, VirtualChannel channel) throws IOException, InterruptedException { long len = f.length(); @@ -149,7 +175,12 @@ private static class WriteLog extends MasterToSlaveFileCallable { // TODO is this efficient for large amounts of output? Would it be better to stream data, or return a byte[] from the callable? byte[] buf = new byte[(int) toRead]; raf.readFully(buf); - sink.write(buf); + ByteBuffer transcoded = maybeTranscode(buf, charset); + if (transcoded == null) { + sink.write(buf); + } else { + Channels.newChannel(sink).write(transcoded); + } } finally { raf.close(); } @@ -174,13 +205,40 @@ private static class WriteLog extends MasterToSlaveFileCallable { } } - @Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { - // TODO could perhaps be more efficient for large files to send a MasterToSlaveFileCallable - try (InputStream is = getOutputFile(workspace).read()) { - return IOUtils.toByteArray(is); + /** + * Transcode process output to UTF-8 if necessary. + * @param data output presumed to be in local encoding + * @param charset a particular encoding name, or the empty string for the system default encoding, or null to skip transcoding + * @return a newly allocate buffer of UTF-8 encoded data ({@link CodingErrorAction#REPLACE} is used), + * or null if not performing transcoding because it was not requested or the data was already thought to be in UTF-8 + */ + private static @CheckForNull ByteBuffer maybeTranscode(@Nonnull byte[] data, @CheckForNull String charset) { + if (charset == null) { // no transcoding requested, do raw copy and YMMV + return null; + } else { + Charset cs = charset.isEmpty() ? Charset.defaultCharset() : Charset.forName(charset); + if (cs.equals(StandardCharsets.UTF_8)) { // transcoding unnecessary as output was already UTF-8 + return null; + } else { // decode output in specified charset and reëncode in UTF-8 + return StandardCharsets.UTF_8.encode(cs.decode(ByteBuffer.wrap(data))); + } } } + @Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { + return getOutputFile(workspace).act(new MasterToSlaveFileCallable() { + @Override public byte[] invoke(File f, VirtualChannel channel) throws IOException, InterruptedException { + byte[] buf = FileUtils.readFileToByteArray(f); + ByteBuffer transcoded = maybeTranscode(buf, charset); + if (transcoded == null) { + return buf; + } else { + return transcoded.array(); + } + } + }); + } + @Override public final void stop(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { launcher.kill(Collections.singletonMap(COOKIE, cookieFor(workspace))); } diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index 78ff543a..4ae04e56 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -34,8 +34,11 @@ import hudson.util.VersionNumber; import java.io.ByteArrayOutputStream; import java.io.File; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.logging.Level; +import jenkins.security.MasterToSlaveCallable; import org.apache.commons.io.output.TeeOutputStream; import static org.hamcrest.Matchers.*; import org.jenkinsci.test.acceptance.docker.Docker; @@ -236,4 +239,141 @@ private void runOnDocker(DumbSlave s) throws Exception { runOnDocker(new DumbSlave("docker", "/home/jenkins/agent", new SimpleCommandLauncher("docker run -i --rm --name agent --init jenkinsci/slave:3.7-1 java -jar /usr/share/jenkins/slave.jar"))); } + @Issue("JENKINS-31096") + @Test public void encoding() throws Exception { + JavaContainer container = dockerUbuntu.get(); + DumbSlave s = new DumbSlave("docker", "/home/test", new SSHLauncher(container.ipBound(22), container.port(22), "test", "test", "", "-Dfile.encoding=ISO-8859-1")); + j.jenkins.addNode(s); + j.waitOnline(s); + assertEquals("ISO-8859-1", s.getChannel().call(new DetectCharset())); + FilePath dockerWS = s.getWorkspaceRoot(); + dockerWS.child("latin").write("¡Ole!", "ISO-8859-1"); + dockerWS.child("eastern").write("Čau!", "ISO-8859-2"); + dockerWS.child("mixed").write("¡Čau → there!", "UTF-8"); + Launcher dockerLauncher = s.createLauncher(listener); + // control: no transcoding + Controller c = new BourneShellScript("cat latin").launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + c.writeLog(dockerWS, baos); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(baos.toString("ISO-8859-1"), containsString("¡Ole!")); + c.cleanup(dockerWS); + // and with output capture: + BourneShellScript dt = new BourneShellScript("cat latin"); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + c.writeLog(dockerWS, System.err); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(new String(c.getOutput(dockerWS, launcher), "ISO-8859-1"), containsString("¡Ole!")); + c.cleanup(dockerWS); + // test: specify particular charset (UTF-8) + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.UTF_8); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + baos = new ByteArrayOutputStream(); + c.writeLog(dockerWS, baos); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(baos.toString("UTF-8"), containsString("¡Čau → there!")); + c.cleanup(dockerWS); + // and with output capture: + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.UTF_8); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + c.writeLog(dockerWS, System.err); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("¡Čau → there!")); + c.cleanup(dockerWS); + // test: specify particular charset (unrelated) + dt = new BourneShellScript("cat eastern"); + dt.charset(Charset.forName("ISO-8859-2")); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + baos = new ByteArrayOutputStream(); + c.writeLog(dockerWS, baos); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(baos.toString("UTF-8"), containsString("Čau!")); + c.cleanup(dockerWS); + // and with output capture: + dt = new BourneShellScript("cat eastern"); + dt.charset(Charset.forName("ISO-8859-2")); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + c.writeLog(dockerWS, System.err); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("Čau!")); + c.cleanup(dockerWS); + // test: specify agent default charset + dt = new BourneShellScript("cat latin"); + dt.defaultCharset(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + baos = new ByteArrayOutputStream(); + c.writeLog(dockerWS, baos); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(baos.toString("UTF-8"), containsString("¡Ole!")); + c.cleanup(dockerWS); + // and with output capture: + dt = new BourneShellScript("cat latin"); + dt.defaultCharset(); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + c.writeLog(dockerWS, System.err); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("¡Ole!")); + c.cleanup(dockerWS); + // test: inappropriate charset, some replacement characters + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.US_ASCII); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + baos = new ByteArrayOutputStream(); + c.writeLog(dockerWS, baos); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(baos.toString("UTF-8"), containsString("����au ��� there!")); + c.cleanup(dockerWS); + // and with output capture: + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.US_ASCII); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + Thread.sleep(100); + } + c.writeLog(dockerWS, System.err); + assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("����au ��� there!")); + c.cleanup(dockerWS); + s.toComputer().disconnect(new OfflineCause.UserCause(null, null)); + } + private static class DetectCharset extends MasterToSlaveCallable { + @Override public String call() throws RuntimeException { + return Charset.defaultCharset().name(); + } + } + } From 27f957f44db644bb8e565528415fb93c01fda670 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Tue, 13 Feb 2018 09:51:25 -0500 Subject: [PATCH 13/16] Reminder to pick up https://github.com/jenkinsci/jenkins/pull/3272. --- .../org/jenkinsci/plugins/durabletask/FileMonitoringTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 1b281fea..16f3cd07 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -293,6 +293,7 @@ public FilePath getLastLocationFile(FilePath workspace) throws IOException, Inte private static ScheduledExecutorService watchService; private synchronized static ScheduledExecutorService watchService() { if (watchService == null) { + // TODO 2.105+ use ClassLoaderSanityThreadFactory watchService = new /*ErrorLogging*/ScheduledThreadPoolExecutor(5, new NamingThreadFactory(new DaemonThreadFactory(), "FileMonitoringTask watcher")); } return watchService; From 5d7ea7fbb04bcfa8c31b2558f3ab32efdcbfaf37 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Tue, 13 Feb 2018 12:39:06 -0500 Subject: [PATCH 14/16] Integrating transcoding into watches. --- .../durabletask/FileMonitoringTask.java | 22 ++++++- .../durabletask/BourneShellScriptTest.java | 57 ++++++++++++++++++- 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index a0a112d6..0ba4a849 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -38,6 +38,8 @@ import hudson.util.StreamTaskListener; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.RandomAccessFile; import java.io.StringWriter; @@ -63,6 +65,7 @@ import jenkins.MasterToSlaveFileCallable; import org.apache.commons.io.FileUtils; import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.input.ReaderInputStream; /** * A task which forks some external command and then waits for log and status files to be updated/created. @@ -253,14 +256,23 @@ protected byte[] getOutput(FilePath workspace) throws IOException, InterruptedEx * or null if not performing transcoding because it was not requested or the data was already thought to be in UTF-8 */ private static @CheckForNull ByteBuffer maybeTranscode(@Nonnull byte[] data, @CheckForNull String charset) { - if (charset == null) { // no transcoding requested, do raw copy and YMMV + Charset cs = transcodingCharset(charset); + if (cs == null) { + return null; + } else { + return StandardCharsets.UTF_8.encode(cs.decode(ByteBuffer.wrap(data))); + } + } + + private static @CheckForNull Charset transcodingCharset(@CheckForNull String charset) { + if (charset == null) { return null; } else { Charset cs = charset.isEmpty() ? Charset.defaultCharset() : Charset.forName(charset); if (cs.equals(StandardCharsets.UTF_8)) { // transcoding unnecessary as output was already UTF-8 return null; } else { // decode output in specified charset and reëncode in UTF-8 - return StandardCharsets.UTF_8.encode(cs.decode(ByteBuffer.wrap(data))); + return cs; } } } @@ -384,12 +396,14 @@ private static class Watcher implements Runnable { private final FilePath workspace; private final Handler handler; private final TaskListener listener; + private final @CheckForNull Charset cs; Watcher(FileMonitoringController controller, FilePath workspace, Handler handler, TaskListener listener) { this.controller = controller; this.workspace = workspace; this.handler = handler; this.listener = listener; + cs = FileMonitoringController.transcodingCharset(controller.charset); } @Override public void run() { @@ -405,7 +419,9 @@ private static class Watcher implements Runnable { if (len > lastLocation) { assert !logFile.isRemote(); try (FileChannel ch = FileChannel.open(Paths.get(logFile.getRemote()), StandardOpenOption.READ)) { - CountingInputStream cis = new CountingInputStream(Channels.newInputStream(ch.position(lastLocation))); + InputStream locallyEncodedStream = Channels.newInputStream(ch.position(lastLocation)); + InputStream utf8EncodedStream = cs == null ? locallyEncodedStream : new ReaderInputStream(new InputStreamReader(locallyEncodedStream, cs), StandardCharsets.UTF_8); + CountingInputStream cis = new CountingInputStream(utf8EncodedStream); handler.output(cis); lastLocationFile.write(Long.toString(lastLocation + cis.getByteCount()), null); } diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index 88a4c2e0..97384bbc 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -214,11 +214,11 @@ private static class MockHandler extends Handler { this.lines = channel.export(BlockingQueue.class, lines); } @Override public void output(InputStream stream) throws Exception { - lines.addAll(IOUtils.readLines(stream)); + lines.addAll(IOUtils.readLines(stream, StandardCharsets.UTF_8)); } @Override public void exited(int code, byte[] data) throws Exception { status.add(code); - output.add(data != null ? new String(data) : ""); + output.add(data != null ? new String(data, StandardCharsets.UTF_8) : ""); } } @@ -290,7 +290,7 @@ private void runOnDocker(DumbSlave s) throws Exception { runOnDocker(new DumbSlave("docker", "/home/jenkins/agent", new SimpleCommandLauncher("docker run -i --rm --name agent --init jenkinsci/slave:3.7-1 java -jar /usr/share/jenkins/slave.jar"))); } - @Issue("JENKINS-31096") + @Issue({"JENKINS-31096", "JENKINS-38381"}) @Test public void encoding() throws Exception { JavaContainer container = dockerUbuntu.get(); DumbSlave s = new DumbSlave("docker", "/home/test", new SSHLauncher(container.ipBound(22), container.port(22), "test", "test", "", "-Dfile.encoding=ISO-8859-1")); @@ -419,6 +419,57 @@ private void runOnDocker(DumbSlave s) throws Exception { assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("����au ��� there!")); c.cleanup(dockerWS); + // test: using watch with particular charset + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.UTF_8); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + BlockingQueue status = new LinkedBlockingQueue<>(); + BlockingQueue output = new LinkedBlockingQueue<>(); + BlockingQueue lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat mixed", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("", output.take()); + assertEquals("[¡Čau → there!]", lines.toString()); + // and with output capture: + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.UTF_8); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat mixed", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("¡Čau → there!", output.take()); + assertEquals("[]", lines.toString()); + // with unrelated charset: + dt = new BourneShellScript("cat eastern"); + dt.charset(Charset.forName("ISO-8859-2")); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat eastern", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("", output.take()); + assertEquals("[Čau!]", lines.toString()); + // and with output capture: + dt = new BourneShellScript("cat eastern"); + dt.charset(Charset.forName("ISO-8859-2")); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat eastern", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("Čau!", output.take()./* TODO */trim()); + assertEquals("[]", lines.toString()); + // TODO agent default; mojibake s.toComputer().disconnect(new OfflineCause.UserCause(null, null)); } private static class DetectCharset extends MasterToSlaveCallable { From cc53b1838554ba96a62efc1e0e78fdeb02673c14 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Tue, 13 Feb 2018 12:50:01 -0500 Subject: [PATCH 15/16] ByteBuffer.array is not what I wanted. --- .../plugins/durabletask/FileMonitoringTask.java | 6 ++++-- .../plugins/durabletask/BourneShellScriptTest.java | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 480db070..477e99a6 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -209,7 +209,7 @@ private static class WriteLog extends MasterToSlaveFileCallable { * Transcode process output to UTF-8 if necessary. * @param data output presumed to be in local encoding * @param charset a particular encoding name, or the empty string for the system default encoding, or null to skip transcoding - * @return a newly allocate buffer of UTF-8 encoded data ({@link CodingErrorAction#REPLACE} is used), + * @return a buffer of UTF-8 encoded data ({@link CodingErrorAction#REPLACE} is used), * or null if not performing transcoding because it was not requested or the data was already thought to be in UTF-8 */ private static @CheckForNull ByteBuffer maybeTranscode(@Nonnull byte[] data, @CheckForNull String charset) { @@ -233,7 +233,9 @@ private static class WriteLog extends MasterToSlaveFileCallable { if (transcoded == null) { return buf; } else { - return transcoded.array(); + byte[] buf2 = new byte[transcoded.remaining()]; + transcoded.get(buf2); + return buf2; } } }); diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index 4ae04e56..f43ea9d6 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -270,7 +270,7 @@ private void runOnDocker(DumbSlave s) throws Exception { } c.writeLog(dockerWS, System.err); assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); - assertThat(new String(c.getOutput(dockerWS, launcher), "ISO-8859-1"), containsString("¡Ole!")); + assertEquals("¡Ole!", new String(c.getOutput(dockerWS, launcher), "ISO-8859-1")); c.cleanup(dockerWS); // test: specify particular charset (UTF-8) dt = new BourneShellScript("cat mixed"); @@ -294,7 +294,7 @@ private void runOnDocker(DumbSlave s) throws Exception { } c.writeLog(dockerWS, System.err); assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); - assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("¡Čau → there!")); + assertEquals("¡Čau → there!", new String(c.getOutput(dockerWS, launcher), "UTF-8")); c.cleanup(dockerWS); // test: specify particular charset (unrelated) dt = new BourneShellScript("cat eastern"); @@ -318,7 +318,7 @@ private void runOnDocker(DumbSlave s) throws Exception { } c.writeLog(dockerWS, System.err); assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); - assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("Čau!")); + assertEquals("Čau!", new String(c.getOutput(dockerWS, launcher), "UTF-8")); c.cleanup(dockerWS); // test: specify agent default charset dt = new BourneShellScript("cat latin"); @@ -342,7 +342,7 @@ private void runOnDocker(DumbSlave s) throws Exception { } c.writeLog(dockerWS, System.err); assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); - assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("¡Ole!")); + assertEquals("¡Ole!", new String(c.getOutput(dockerWS, launcher), "UTF-8")); c.cleanup(dockerWS); // test: inappropriate charset, some replacement characters dt = new BourneShellScript("cat mixed"); @@ -366,7 +366,7 @@ private void runOnDocker(DumbSlave s) throws Exception { } c.writeLog(dockerWS, System.err); assertEquals(0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); - assertThat(new String(c.getOutput(dockerWS, launcher), "UTF-8"), containsString("����au ��� there!")); + assertEquals("����au ��� there!", new String(c.getOutput(dockerWS, launcher), "UTF-8")); c.cleanup(dockerWS); s.toComputer().disconnect(new OfflineCause.UserCause(null, null)); } From deb425d17c5f2fe4f346ca6d064cbaf9380827e6 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Tue, 13 Feb 2018 13:00:30 -0500 Subject: [PATCH 16/16] Completing test coverage. --- .../durabletask/BourneShellScriptTest.java | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index df9cd1db..caa2bf28 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -469,7 +469,56 @@ private void runOnDocker(DumbSlave s) throws Exception { assertEquals(0, status.take().intValue()); assertEquals("Čau!", output.take()); assertEquals("[]", lines.toString()); - // TODO agent default; mojibake + // with agent default charset: + dt = new BourneShellScript("cat latin"); + dt.defaultCharset(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat latin", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("", output.take()); + assertEquals("[¡Ole!]", lines.toString()); + // and with output capture: + dt = new BourneShellScript("cat latin"); + dt.defaultCharset(); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat latin", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("¡Ole!", output.take()); + assertEquals("[]", lines.toString()); + // and mojibake: + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.US_ASCII); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat mixed", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("", output.take()); + assertEquals("[����au ��� there!]", lines.toString()); + // and with output capture: + dt = new BourneShellScript("cat mixed"); + dt.charset(StandardCharsets.US_ASCII); + dt.captureOutput(); + c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ cat mixed", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("����au ��� there!", output.take()); + assertEquals("[]", lines.toString()); s.toComputer().disconnect(new OfflineCause.UserCause(null, null)); } private static class DetectCharset extends MasterToSlaveCallable {