diff --git a/framework/.changeset/v0.14.8.md b/framework/.changeset/v0.14.8.md new file mode 100644 index 000000000..779a0913d --- /dev/null +++ b/framework/.changeset/v0.14.8.md @@ -0,0 +1 @@ +- Replace Pumba with Docker-TC to avoid Docker API compatibility issues diff --git a/framework/chaos/chaos.go b/framework/chaos/chaos.go index a26a0d0ba..fc0acb139 100644 --- a/framework/chaos/chaos.go +++ b/framework/chaos/chaos.go @@ -3,6 +3,7 @@ package chaos import ( "context" "fmt" + "slices" "strings" "time" @@ -10,10 +11,129 @@ import ( "github.com/docker/docker/api/types/mount" "github.com/google/uuid" "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" "github.com/smartcontractkit/chainlink-testing-framework/framework" ) +/* + * A simple wrapper for "docker-tc" chaos actions. + * One small caveat is that in order for chaos to work your containers should be on a network so + * interfaces like 'vetha57f116' are created inside them. + * Works for any network, by default we are testing containers on "ctf" network. + */ + +const ( + // Docker and docker-tc commands + CmdPause = "pause" + CmdDelay = "delay" + CmdLoss = "loss" + CmdDuplicate = "duplicate" + CmdCorrupt = "corrupt" +) + +const ( + // dockerTCContainerName default "docker-tc" container name + dockerTCContainerName = "dtc" + // dockerTCInternalSvc docker-tc internal service name + dockerTCInternalSvc = "localhost:4080" +) + +var ( + defaultCURLCMD = fmt.Sprintf("docker exec %s curl", dockerTCContainerName) + tcCommands = []string{CmdDelay, CmdLoss, CmdCorrupt, CmdDuplicate} +) + +// DockerChaos is a chaos generator for Docker +type DockerChaos struct { + Experiments map[string]string +} + +// NewDockerChaos creates a new "docker-tc" instance or reuses existing one +func NewDockerChaos(ctx context.Context) (*DockerChaos, error) { + framework.L.Info(). + Str("Container", dockerTCContainerName). + Msg("Starting new docker-tc container") + req := testcontainers.ContainerRequest{ + Image: "lukaszlach/docker-tc", + Name: dockerTCContainerName, + CapAdd: []string{"NET_ADMIN"}, + WaitingFor: wait.ForLog("Starting Docker Traffic Control"), + HostConfigModifier: func(h *container.HostConfig) { + h.Privileged = true + h.NetworkMode = "host" + h.Mounts = []mount.Mount{ + { + Type: "bind", + Source: "/var/run/docker.sock", + Target: "/var/run/docker.sock", + ReadOnly: true, + }, + } + }, + } + _, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + Reuse: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to start docker-tc container: %w", err) + } + return &DockerChaos{ + Experiments: make(map[string]string, 0), + }, nil +} + +// RemoveAll removes all the experiments +func (m *DockerChaos) RemoveAll() error { + for containerName, experimentCmd := range m.Experiments { + framework.L.Info(). + Str("Container", containerName). + Str("Cmd", experimentCmd). + Msg("Removing chaos for container") + if _, err := framework.ExecCmd(experimentCmd); err != nil { + return fmt.Errorf("failed to remove chaos experiment: name: %s, command:%s, err: %w", containerName, experimentCmd, err) + } + } + m.Experiments = make(map[string]string) + return nil +} + +// Chaos executes either Docker or "docker-tc" commands +func (m *DockerChaos) Chaos(containerName string, cmd, val string) error { + if _, ok := m.Experiments[containerName]; ok { + return fmt.Errorf("chaos is already applied, only a single chaos can be applied to a container, call RemoveAll first") + } + // tc commands + if slices.Contains(tcCommands, cmd) { + m.Experiments[containerName] = fmt.Sprintf("%s -X DELETE %s/%s", defaultCURLCMD, dockerTCInternalSvc, containerName) + out, err := framework.ExecCmd(fmt.Sprintf("%s -d %s=%s %s/%s", defaultCURLCMD, cmd, val, dockerTCInternalSvc, containerName)) + if err != nil { + return err + } + return verifyTCOutput(string(out)) + } + // docker commands + m.Experiments[containerName] = fmt.Sprintf("docker unpause %s", containerName) + _, err := framework.ExecCmd(fmt.Sprintf("docker pause %s", containerName)) + if err != nil { + return err + } + return nil +} + +func verifyTCOutput(out string) error { + if !strings.Contains(out, "Controlling traffic") { + return fmt.Errorf("experiment failed to apply, set debug logs, export CTF_LOG_LEVEL=debug. Your container also must be on a network, 'ctf' or any other, won't work with default 'bridge'") + } + return nil +} + +// DEPRECATED: Since Pumba has outdated Docker dependencies it may not work without additional +// setting to allow using Docker client which is out of client<>server compatibility range. +// Use NewDockerChaos for pause and network experiments! +// // ExecPumba executes Pumba (https://github.com/alexei-led/pumba) command // since handling various docker race conditions is hard and there is no easy API for that // for now you can provide time to wait until chaos is applied diff --git a/framework/chaos/chaos_test.go b/framework/chaos/chaos_test.go new file mode 100644 index 000000000..cfae591a1 --- /dev/null +++ b/framework/chaos/chaos_test.go @@ -0,0 +1,76 @@ +package chaos_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-testing-framework/framework/chaos" + "github.com/smartcontractkit/chainlink-testing-framework/framework/rpc" +) + +func TestSmokeChaos(t *testing.T) { + c, err := rpc.StartAnvil([]string{"--balance", "1", "--block-time", "5"}) + require.NoError(t, err) + + i, err := c.Inspect(t.Context()) + require.NoError(t, err) + containerName := i.Name[1:] + + dtc, err := chaos.NewDockerChaos(t.Context()) + require.NoError(t, err) + + tests := []struct { + name string + containerName string + cmd string + value string + wantErr bool + }{ + { + name: "pause container", + containerName: containerName, + cmd: chaos.CmdPause, + }, + { + name: "delay container", + containerName: containerName, + cmd: chaos.CmdDelay, + value: "8000ms", + }, + { + name: "loss container", + containerName: containerName, + cmd: chaos.CmdLoss, + value: "50%", + }, + { + name: "corrupt traffic in container", + containerName: containerName, + cmd: chaos.CmdCorrupt, + value: "10%", + }, + { + name: "duplicate traffic in container", + containerName: containerName, + cmd: chaos.CmdDuplicate, + value: "50%", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err = dtc.Chaos(tc.containerName, tc.cmd, tc.value) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + time.Sleep(5 * time.Second) + + err = dtc.RemoveAll() + require.NoError(t, err) + }) + } +} diff --git a/framework/components/fake/fake.go b/framework/components/fake/fake.go index bac4ebc02..86940e8f7 100644 --- a/framework/components/fake/fake.go +++ b/framework/components/fake/fake.go @@ -14,7 +14,6 @@ const ( DefaultFakeServicePort = 9111 ) - var ( Service *gin.Engine validMethod = regexp.MustCompile("GET|POST|PATCH|PUT|DELETE") diff --git a/framework/osutil.go b/framework/osutil.go index 00048b2e1..5669c2894 100644 --- a/framework/osutil.go +++ b/framework/osutil.go @@ -6,6 +6,7 @@ import ( "io" "os/exec" "strings" + "sync" ) // ExecCmd executes a command and logs the output interactively @@ -48,20 +49,25 @@ func ExecCmdWithOpts(ctx context.Context, command string, stdoutFunc func(string // create a buffer, listen to both pipe outputs, wait them to finish and merge output // both log it and return merged output var combinedBuf strings.Builder + combinedBufMu := &sync.Mutex{} stdoutDone := make(chan struct{}) stderrDone := make(chan struct{}) go func() { readStdPipe(stdout, func(m string) { stdoutFunc(m) + combinedBufMu.Lock() combinedBuf.WriteString(m + "\n") + combinedBufMu.Unlock() }) close(stdoutDone) }() go func() { readStdPipe(stderr, func(m string) { stderrFunc(m) + combinedBufMu.Lock() combinedBuf.WriteString(m + "\n") + combinedBufMu.Unlock() }) close(stderrDone) }() diff --git a/framework/rpc/rpc.go b/framework/rpc/rpc.go index 96f58c9e3..547aaf204 100644 --- a/framework/rpc/rpc.go +++ b/framework/rpc/rpc.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" + "github.com/smartcontractkit/chainlink-testing-framework/framework" f "github.com/smartcontractkit/chainlink-testing-framework/framework" "github.com/testcontainers/testcontainers-go" @@ -330,6 +331,7 @@ func StartAnvil(params []string) (*AnvilContainer, error) { entryPoint = append(entryPoint, params...) req := testcontainers.ContainerRequest{ Image: "ghcr.io/foundry-rs/foundry:stable", + Networks: []string{framework.DefaultNetworkName}, ExposedPorts: []string{"8545/tcp"}, WaitingFor: wait.ForListeningPort("8545").WithStartupTimeout(10 * time.Second), Entrypoint: entryPoint,