From 582d797913d050c37c824e928eebc60f62120d85 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 09:51:02 +0100 Subject: [PATCH 01/11] wip: PoC with docker-tc --- framework/cmd/cmd2/main.go | 60 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 framework/cmd/cmd2/main.go diff --git a/framework/cmd/cmd2/main.go b/framework/cmd/cmd2/main.go new file mode 100644 index 000000000..d12634540 --- /dev/null +++ b/framework/cmd/cmd2/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "log" + "os" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" + "github.com/smartcontractkit/chainlink-testing-framework/framework" + "github.com/testcontainers/testcontainers-go" +) + +func main() { + ctx := context.Background() + + _ = os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") + // Define the container request + req := testcontainers.ContainerRequest{ + Image: "lukaszlach/docker-tc", + Name: "dtc", + AutoRemove: false, + CapAdd: []string{"NET_ADMIN"}, + HostConfigModifier: func(h *container.HostConfig) { + h.Privileged = true + h.NetworkMode = "host" + h.Mounts = []mount.Mount{ + { + Type: "bind", + Source: "/var/run/docker.sock", + Target: "/var/run/docker.sock", + ReadOnly: true, + }, + { + Type: "bind", + Source: "/var/docker-tc", + Target: "/var/docker-tc", + }, + } + }, + } + + // Create the container + _, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + log.Fatalf("Failed to start container: %s", err) + } + time.Sleep(15 * time.Second) + if _, err := framework.ExecCmd("docker exec dtc curl -d delay=8000ms localhost:4080/blockchain-node-2baf2"); err != nil { + panic(err) + } + time.Sleep(30 * time.Second) + if _, err := framework.ExecCmd(`docker exec dtc curl -X DELETE localhost:4080/blockchain-node-2baf2`); err != nil { + panic(err) + } +} From 2530889b5d1319e997127d4ecbc2894b38a82ff1 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 14:02:46 +0100 Subject: [PATCH 02/11] wip: docker-tc --- framework/chaos/chaos.go | 97 +++++++++++++++++++++++++++++++++++ framework/chaos/chaos_test.go | 74 ++++++++++++++++++++++++++ framework/cmd/cmd2/main.go | 60 ---------------------- 3 files changed, 171 insertions(+), 60 deletions(-) create mode 100644 framework/chaos/chaos_test.go delete mode 100644 framework/cmd/cmd2/main.go diff --git a/framework/chaos/chaos.go b/framework/chaos/chaos.go index a26a0d0ba..71364a444 100644 --- a/framework/chaos/chaos.go +++ b/framework/chaos/chaos.go @@ -3,6 +3,8 @@ package chaos import ( "context" "fmt" + "os" + "slices" "strings" "time" @@ -10,10 +12,105 @@ import ( "github.com/docker/docker/api/types/mount" "github.com/google/uuid" "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" "github.com/smartcontractkit/chainlink-testing-framework/framework" ) +const ( + // Docker and docker-tc commands + CmdPause = "pause" + CmdDelay = "delay" + CmdLoss = "loss" + CmdDuplicate = "duplicate" + CmdCorrupt = "corrupt" +) + +const ( + // dockerTCContainerName default "docker-tc" container name + dockerTCContainerName = "dtc" + // dockerTCInternalSvc docker-tc internal service name + dockerTCInternalSvc = "localhost:4080" +) + +var ( + defaultCURLCMD = fmt.Sprintf("docker exec %s curl", dockerTCContainerName) + tcCommands = []string{CmdDelay, CmdLoss, CmdCorrupt, CmdDuplicate} +) + +// DockerChaos is a chaos generator for Docker +type DockerChaos struct { + Experiments map[string]string +} + +// NewDockerChaos creates a new "docker-tc" instance or reuses existing one +func NewDockerChaos(ctx context.Context) (*DockerChaos, error) { + framework.L.Info(). + Str("Container", dockerTCContainerName). + Msg("Starting new docker-tc container") + + _ = os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") + req := testcontainers.ContainerRequest{ + Image: "lukaszlach/docker-tc", + Name: dockerTCContainerName, + CapAdd: []string{"NET_ADMIN"}, + WaitingFor: wait.ForLog("Starting Docker Traffic Control"), + HostConfigModifier: func(h *container.HostConfig) { + h.Privileged = true + h.NetworkMode = "host" + h.Mounts = []mount.Mount{ + { + Type: "bind", + Source: "/var/run/docker.sock", + Target: "/var/run/docker.sock", + ReadOnly: true, + }, + } + }, + } + _, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + Reuse: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to start docker-tc container: %w", err) + } + return &DockerChaos{ + Experiments: make(map[string]string, 0), + }, nil +} + +// RemoveAll removes all the experiments +func (m *DockerChaos) RemoveAll() error { + for exName, exCmd := range m.Experiments { + if _, err := framework.ExecCmd(exCmd); err != nil { + return fmt.Errorf("failed to remove chaos experiment: name: %s, command:%s, err: %w", exName, exCmd, err) + } + } + return nil +} + +// Chaos executes either Docker or "docker-tc" commands +func (m *DockerChaos) Chaos(containerName string, cmd, val string) error { + if slices.Contains(tcCommands, cmd) { + m.Experiments[containerName] = fmt.Sprintf("%s -X DELETE %s/%s", defaultCURLCMD, dockerTCInternalSvc, containerName) + if _, err := framework.ExecCmd(fmt.Sprintf("%s -d %s=%s %s/%s", defaultCURLCMD, cmd, val, dockerTCInternalSvc, containerName)); err != nil { + return err + } + } else { + m.Experiments[containerName] = fmt.Sprintf("docker unpause %s", containerName) + if _, err := framework.ExecCmd(fmt.Sprintf("docker pause %s", containerName)); err != nil { + return err + } + } + return nil +} + +// DEPRECATED: Since Pumba has outdated Docker dependencies it may not work without additional +// setting to allow using Docker client which is out of client<>server compatibility range. +// Use NewDockerChaos for pause and network experiments! +// // ExecPumba executes Pumba (https://github.com/alexei-led/pumba) command // since handling various docker race conditions is hard and there is no easy API for that // for now you can provide time to wait until chaos is applied diff --git a/framework/chaos/chaos_test.go b/framework/chaos/chaos_test.go new file mode 100644 index 000000000..3abe873a5 --- /dev/null +++ b/framework/chaos/chaos_test.go @@ -0,0 +1,74 @@ +package chaos_test + +import ( + "testing" + "time" + + "github.com/smartcontractkit/chainlink-testing-framework/framework/chaos" + "github.com/smartcontractkit/chainlink-testing-framework/framework/rpc" + "github.com/stretchr/testify/require" +) + +func TestChaos(t *testing.T) { + c, err := rpc.StartAnvil([]string{"--balance", "1", "--block-time", "5"}) + require.NoError(t, err) + + i, err := c.Inspect(t.Context()) + require.NoError(t, err) + + dtc, err := chaos.NewDockerChaos(t.Context()) + require.NoError(t, err) + + tests := []struct { + name string + containerName string + cmd string + value string + wantErr bool + }{ + { + name: "pause container", + containerName: i.Name, + cmd: chaos.CmdPause, + }, + { + name: "delay container", + containerName: i.Name, + cmd: chaos.CmdDelay, + value: "8000ms", + }, + { + name: "loss container", + containerName: i.Name, + cmd: chaos.CmdLoss, + value: "50%", + }, + { + name: "corrupt traffic in container", + containerName: i.Name, + cmd: chaos.CmdCorrupt, + value: "10%", + }, + { + name: "duplicate traffic in container", + containerName: i.Name, + cmd: chaos.CmdDuplicate, + value: "50%", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err = dtc.Chaos(tc.containerName, tc.cmd, tc.value) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + time.Sleep(10 * time.Second) + + err = dtc.RemoveAll() + require.NoError(t, err) + }) + } +} diff --git a/framework/cmd/cmd2/main.go b/framework/cmd/cmd2/main.go deleted file mode 100644 index d12634540..000000000 --- a/framework/cmd/cmd2/main.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "context" - "log" - "os" - "time" - - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/mount" - "github.com/smartcontractkit/chainlink-testing-framework/framework" - "github.com/testcontainers/testcontainers-go" -) - -func main() { - ctx := context.Background() - - _ = os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") - // Define the container request - req := testcontainers.ContainerRequest{ - Image: "lukaszlach/docker-tc", - Name: "dtc", - AutoRemove: false, - CapAdd: []string{"NET_ADMIN"}, - HostConfigModifier: func(h *container.HostConfig) { - h.Privileged = true - h.NetworkMode = "host" - h.Mounts = []mount.Mount{ - { - Type: "bind", - Source: "/var/run/docker.sock", - Target: "/var/run/docker.sock", - ReadOnly: true, - }, - { - Type: "bind", - Source: "/var/docker-tc", - Target: "/var/docker-tc", - }, - } - }, - } - - // Create the container - _, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - if err != nil { - log.Fatalf("Failed to start container: %s", err) - } - time.Sleep(15 * time.Second) - if _, err := framework.ExecCmd("docker exec dtc curl -d delay=8000ms localhost:4080/blockchain-node-2baf2"); err != nil { - panic(err) - } - time.Sleep(30 * time.Second) - if _, err := framework.ExecCmd(`docker exec dtc curl -X DELETE localhost:4080/blockchain-node-2baf2`); err != nil { - panic(err) - } -} From 492ac4f65b232578eff335efb6a675113d52550b Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 14:33:11 +0100 Subject: [PATCH 03/11] finalize --- framework/chaos/chaos.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/framework/chaos/chaos.go b/framework/chaos/chaos.go index 71364a444..216ce2dfc 100644 --- a/framework/chaos/chaos.go +++ b/framework/chaos/chaos.go @@ -3,7 +3,6 @@ package chaos import ( "context" "fmt" - "os" "slices" "strings" "time" @@ -48,8 +47,6 @@ func NewDockerChaos(ctx context.Context) (*DockerChaos, error) { framework.L.Info(). Str("Container", dockerTCContainerName). Msg("Starting new docker-tc container") - - _ = os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") req := testcontainers.ContainerRequest{ Image: "lukaszlach/docker-tc", Name: dockerTCContainerName, @@ -88,11 +85,15 @@ func (m *DockerChaos) RemoveAll() error { return fmt.Errorf("failed to remove chaos experiment: name: %s, command:%s, err: %w", exName, exCmd, err) } } + m.Experiments = make(map[string]string) return nil } // Chaos executes either Docker or "docker-tc" commands func (m *DockerChaos) Chaos(containerName string, cmd, val string) error { + if _, ok := m.Experiments[containerName]; ok { + return fmt.Errorf("chaos is already applied, only a single chaos can be applied to a container, call RemoveAll first") + } if slices.Contains(tcCommands, cmd) { m.Experiments[containerName] = fmt.Sprintf("%s -X DELETE %s/%s", defaultCURLCMD, dockerTCInternalSvc, containerName) if _, err := framework.ExecCmd(fmt.Sprintf("%s -d %s=%s %s/%s", defaultCURLCMD, cmd, val, dockerTCInternalSvc, containerName)); err != nil { From 0418ef61ad20f6381cf01808678f8950988cbbb9 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 14:33:56 +0100 Subject: [PATCH 04/11] changeset --- framework/.changeset/v0.14.7.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 framework/.changeset/v0.14.7.md diff --git a/framework/.changeset/v0.14.7.md b/framework/.changeset/v0.14.7.md new file mode 100644 index 000000000..60e63a252 --- /dev/null +++ b/framework/.changeset/v0.14.7.md @@ -0,0 +1 @@ +- Deprecate Pumba, use docker-tc for chaos testing \ No newline at end of file From 2f59b8c308c4778b3c579a55ad6ccf3586dd1a0c Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 14:37:58 +0100 Subject: [PATCH 05/11] fix lint --- framework/chaos/chaos_test.go | 3 ++- framework/components/fake/fake.go | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/chaos/chaos_test.go b/framework/chaos/chaos_test.go index 3abe873a5..bb0666e0c 100644 --- a/framework/chaos/chaos_test.go +++ b/framework/chaos/chaos_test.go @@ -4,9 +4,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-testing-framework/framework/chaos" "github.com/smartcontractkit/chainlink-testing-framework/framework/rpc" - "github.com/stretchr/testify/require" ) func TestChaos(t *testing.T) { diff --git a/framework/components/fake/fake.go b/framework/components/fake/fake.go index bac4ebc02..86940e8f7 100644 --- a/framework/components/fake/fake.go +++ b/framework/components/fake/fake.go @@ -14,7 +14,6 @@ const ( DefaultFakeServicePort = 9111 ) - var ( Service *gin.Engine validMethod = regexp.MustCompile("GET|POST|PATCH|PUT|DELETE") From 889e0c30e933bbc876f2e4523eeeced46df78ddc Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 14:38:24 +0100 Subject: [PATCH 06/11] turn on the test --- framework/chaos/chaos_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/chaos/chaos_test.go b/framework/chaos/chaos_test.go index bb0666e0c..49c8c2cf7 100644 --- a/framework/chaos/chaos_test.go +++ b/framework/chaos/chaos_test.go @@ -10,7 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-testing-framework/framework/rpc" ) -func TestChaos(t *testing.T) { +func TestSmokeChaos(t *testing.T) { c, err := rpc.StartAnvil([]string{"--balance", "1", "--block-time", "5"}) require.NoError(t, err) From 0ab8adecc07749a1018127b9d18670cd386a0035 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 14:45:16 +0100 Subject: [PATCH 07/11] cleanup --- framework/.changeset/v0.14.7.md | 1 - 1 file changed, 1 deletion(-) delete mode 100644 framework/.changeset/v0.14.7.md diff --git a/framework/.changeset/v0.14.7.md b/framework/.changeset/v0.14.7.md deleted file mode 100644 index 60e63a252..000000000 --- a/framework/.changeset/v0.14.7.md +++ /dev/null @@ -1 +0,0 @@ -- Deprecate Pumba, use docker-tc for chaos testing \ No newline at end of file From 4a618f26a6a4780e271dd957a1b2bea1c5dca735 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 14:46:17 +0100 Subject: [PATCH 08/11] new changeset --- framework/.changeset/v0.14.8.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 framework/.changeset/v0.14.8.md diff --git a/framework/.changeset/v0.14.8.md b/framework/.changeset/v0.14.8.md new file mode 100644 index 000000000..779a0913d --- /dev/null +++ b/framework/.changeset/v0.14.8.md @@ -0,0 +1 @@ +- Replace Pumba with Docker-TC to avoid Docker API compatibility issues From c32211ab8aab2e4c007244de98d2b9f52d2cb211 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 16:05:55 +0100 Subject: [PATCH 09/11] finalize --- framework/chaos/chaos.go | 39 ++++++++++++++++++++++++++++------- framework/chaos/chaos_test.go | 13 ++++++------ framework/rpc/rpc.go | 2 ++ 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/framework/chaos/chaos.go b/framework/chaos/chaos.go index 216ce2dfc..1a4969bfe 100644 --- a/framework/chaos/chaos.go +++ b/framework/chaos/chaos.go @@ -16,6 +16,13 @@ import ( "github.com/smartcontractkit/chainlink-testing-framework/framework" ) +/* + * A simple wrapper for "docker-tc" chaos actions. + * One small caveat is that in order for chaos to work your containers should be on a network so + * interfaces like 'vetha57f116' are created inside them. + * Works for any network, by default we are testing containers on "ctf" network. + */ + const ( // Docker and docker-tc commands CmdPause = "pause" @@ -80,9 +87,13 @@ func NewDockerChaos(ctx context.Context) (*DockerChaos, error) { // RemoveAll removes all the experiments func (m *DockerChaos) RemoveAll() error { - for exName, exCmd := range m.Experiments { - if _, err := framework.ExecCmd(exCmd); err != nil { - return fmt.Errorf("failed to remove chaos experiment: name: %s, command:%s, err: %w", exName, exCmd, err) + for containerName, experimentCmd := range m.Experiments { + framework.L.Info(). + Str("Container", containerName). + Str("Cmd", experimentCmd). + Msg("Removing chaos for container") + if _, err := framework.ExecCmd(experimentCmd); err != nil { + return fmt.Errorf("failed to remove chaos experiment: name: %s, command:%s, err: %w", containerName, experimentCmd, err) } } m.Experiments = make(map[string]string) @@ -94,16 +105,30 @@ func (m *DockerChaos) Chaos(containerName string, cmd, val string) error { if _, ok := m.Experiments[containerName]; ok { return fmt.Errorf("chaos is already applied, only a single chaos can be applied to a container, call RemoveAll first") } + // tc commands if slices.Contains(tcCommands, cmd) { m.Experiments[containerName] = fmt.Sprintf("%s -X DELETE %s/%s", defaultCURLCMD, dockerTCInternalSvc, containerName) - if _, err := framework.ExecCmd(fmt.Sprintf("%s -d %s=%s %s/%s", defaultCURLCMD, cmd, val, dockerTCInternalSvc, containerName)); err != nil { + out, err := framework.ExecCmd(fmt.Sprintf("%s -d %s=%s %s/%s", defaultCURLCMD, cmd, val, dockerTCInternalSvc, containerName)) + if err != nil { return err } - } else { - m.Experiments[containerName] = fmt.Sprintf("docker unpause %s", containerName) - if _, err := framework.ExecCmd(fmt.Sprintf("docker pause %s", containerName)); err != nil { + if err := verifyTCOutput(string(out)); err != nil { return err } + return nil + } + // docker commands + m.Experiments[containerName] = fmt.Sprintf("docker unpause %s", containerName) + _, err := framework.ExecCmd(fmt.Sprintf("docker pause %s", containerName)) + if err != nil { + return err + } + return nil +} + +func verifyTCOutput(out string) error { + if !strings.Contains(out, "Controlling traffic") { + return fmt.Errorf("experiment failed to apply, set debug logs, export CTF_LOG_LEVEL=debug. Your container also must be on a network, 'ctf' or any other, won't work with default 'bridge'") } return nil } diff --git a/framework/chaos/chaos_test.go b/framework/chaos/chaos_test.go index 49c8c2cf7..cfae591a1 100644 --- a/framework/chaos/chaos_test.go +++ b/framework/chaos/chaos_test.go @@ -16,6 +16,7 @@ func TestSmokeChaos(t *testing.T) { i, err := c.Inspect(t.Context()) require.NoError(t, err) + containerName := i.Name[1:] dtc, err := chaos.NewDockerChaos(t.Context()) require.NoError(t, err) @@ -29,30 +30,30 @@ func TestSmokeChaos(t *testing.T) { }{ { name: "pause container", - containerName: i.Name, + containerName: containerName, cmd: chaos.CmdPause, }, { name: "delay container", - containerName: i.Name, + containerName: containerName, cmd: chaos.CmdDelay, value: "8000ms", }, { name: "loss container", - containerName: i.Name, + containerName: containerName, cmd: chaos.CmdLoss, value: "50%", }, { name: "corrupt traffic in container", - containerName: i.Name, + containerName: containerName, cmd: chaos.CmdCorrupt, value: "10%", }, { name: "duplicate traffic in container", - containerName: i.Name, + containerName: containerName, cmd: chaos.CmdDuplicate, value: "50%", }, @@ -66,7 +67,7 @@ func TestSmokeChaos(t *testing.T) { } else { require.NoError(t, err) } - time.Sleep(10 * time.Second) + time.Sleep(5 * time.Second) err = dtc.RemoveAll() require.NoError(t, err) diff --git a/framework/rpc/rpc.go b/framework/rpc/rpc.go index 96f58c9e3..547aaf204 100644 --- a/framework/rpc/rpc.go +++ b/framework/rpc/rpc.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" + "github.com/smartcontractkit/chainlink-testing-framework/framework" f "github.com/smartcontractkit/chainlink-testing-framework/framework" "github.com/testcontainers/testcontainers-go" @@ -330,6 +331,7 @@ func StartAnvil(params []string) (*AnvilContainer, error) { entryPoint = append(entryPoint, params...) req := testcontainers.ContainerRequest{ Image: "ghcr.io/foundry-rs/foundry:stable", + Networks: []string{framework.DefaultNetworkName}, ExposedPorts: []string{"8545/tcp"}, WaitingFor: wait.ForListeningPort("8545").WithStartupTimeout(10 * time.Second), Entrypoint: entryPoint, From d416dcc89ad9f35e8237bf0a8e00213960e034d1 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 16:13:47 +0100 Subject: [PATCH 10/11] lint --- framework/chaos/chaos.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/framework/chaos/chaos.go b/framework/chaos/chaos.go index 1a4969bfe..fc0acb139 100644 --- a/framework/chaos/chaos.go +++ b/framework/chaos/chaos.go @@ -112,10 +112,7 @@ func (m *DockerChaos) Chaos(containerName string, cmd, val string) error { if err != nil { return err } - if err := verifyTCOutput(string(out)); err != nil { - return err - } - return nil + return verifyTCOutput(string(out)) } // docker commands m.Experiments[containerName] = fmt.Sprintf("docker unpause %s", containerName) From 6fa2eb5eb734e169be2c69c7e44f057e9d6df5f8 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 26 Feb 2026 16:19:01 +0100 Subject: [PATCH 11/11] fix race --- framework/osutil.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/framework/osutil.go b/framework/osutil.go index 00048b2e1..5669c2894 100644 --- a/framework/osutil.go +++ b/framework/osutil.go @@ -6,6 +6,7 @@ import ( "io" "os/exec" "strings" + "sync" ) // ExecCmd executes a command and logs the output interactively @@ -48,20 +49,25 @@ func ExecCmdWithOpts(ctx context.Context, command string, stdoutFunc func(string // create a buffer, listen to both pipe outputs, wait them to finish and merge output // both log it and return merged output var combinedBuf strings.Builder + combinedBufMu := &sync.Mutex{} stdoutDone := make(chan struct{}) stderrDone := make(chan struct{}) go func() { readStdPipe(stdout, func(m string) { stdoutFunc(m) + combinedBufMu.Lock() combinedBuf.WriteString(m + "\n") + combinedBufMu.Unlock() }) close(stdoutDone) }() go func() { readStdPipe(stderr, func(m string) { stderrFunc(m) + combinedBufMu.Lock() combinedBuf.WriteString(m + "\n") + combinedBufMu.Unlock() }) close(stderrDone) }()