From 6311c5167bd9a08cb1eee3c37f67ec2d0ca2ccf4 Mon Sep 17 00:00:00 2001 From: Alex O Bunnyshell Date: Sun, 18 Jan 2026 02:04:50 +0200 Subject: [PATCH 1/3] Add bns logs command for streaming container logs Implement a new top-level 'bns logs' command that streams container application logs (stdout/stderr) from Kubernetes pods with support for multi-component streaming and multiple output formats. Features: - Stream logs from single or multiple components - Environment-wide log streaming (--environment flag) - Component filtering by ID or name (--name flag) - Context fallback support - kubectl-compatible flags (--follow, --tail, --since, --timestamps, etc.) - Thread-safe merged output from multiple sources - Color-coded prefixes for easy identification [component/pod/container] - Multiple output formats: stylish (default), JSON, YAML - Graceful error handling and signal interruption (Ctrl+C) New files: - cmd/logs/root.go - Main command implementation with component resolution - pkg/k8s/kubectl/logs/logs.go - K8s log streaming wrapper - pkg/k8s/kubectl/logs/multiplexer.go - Concurrent streaming coordinator Modified files: - cmd/utils/root.go - Register logs command Usage examples: bns logs --environment env-123 --follow --tail 100 bns logs --component comp-1,comp-2 --follow bns logs --environment env-123 --name api --name worker -o json bns logs --component comp-123 --since 5m --timestamps Co-Authored-By: Claude Sonnet 4.5 --- cmd/logs/root.go | 524 ++++++++++++++++++++++++++++ cmd/utils/root.go | 6 +- pkg/k8s/kubectl/logs/logs.go | 142 ++++++++ pkg/k8s/kubectl/logs/multiplexer.go | 377 ++++++++++++++++++++ 4 files changed, 1048 insertions(+), 1 deletion(-) create mode 100644 cmd/logs/root.go create mode 100644 pkg/k8s/kubectl/logs/logs.go create mode 100644 pkg/k8s/kubectl/logs/multiplexer.go diff --git a/cmd/logs/root.go b/cmd/logs/root.go new file mode 100644 index 0000000..db4a446 --- /dev/null +++ b/cmd/logs/root.go @@ -0,0 +1,524 @@ +package logs + +import ( + "errors" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "bunnyshell.com/cli/pkg/api/component" + "bunnyshell.com/cli/pkg/api/environment" + "bunnyshell.com/cli/pkg/config" + k8sLogs "bunnyshell.com/cli/pkg/k8s/kubectl/logs" + k8sWizard "bunnyshell.com/cli/pkg/wizard/k8s" + "bunnyshell.com/sdk" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" +) + +var mainCmd *cobra.Command + +type LogsOptions struct { + // Component selection + ComponentIDs []string + EnvironmentID string + ComponentNames []string + + // Pod/Container selection + Namespace string + PodName string + Container string + AllContainers bool + + // Log filtering (kubectl standard) + Follow bool + Tail int64 + Since string + SinceTime string + Timestamps bool + Previous bool + + // Output options + Prefix bool + NoColor bool + + OverrideClusterServer string +} + +func (o *LogsOptions) UpdateFlagSet(flags *pflag.FlagSet) { + // Component selection + flags.StringSliceVar(&o.ComponentIDs, "component", o.ComponentIDs, "Component ID(s) (comma-separated)") + flags.StringVar(&o.EnvironmentID, "environment", o.EnvironmentID, "Environment ID (stream logs from all components)") + flags.StringSliceVar(&o.ComponentNames, "name", o.ComponentNames, "Filter by component name (requires --environment, repeatable)") + + // Pod/Container selection + flags.StringVarP(&o.Namespace, "namespace", "n", o.Namespace, "Kubernetes namespace") + flags.StringVar(&o.PodName, "pod", o.PodName, "Pod name (interactive selection if not specified)") + flags.StringVarP(&o.Container, "container", "c", o.Container, "Container name (interactive selection if not specified)") + flags.BoolVar(&o.AllContainers, "all-containers", o.AllContainers, "Stream from all containers in pod") + + // Log filtering + flags.BoolVarP(&o.Follow, "follow", "f", o.Follow, "Stream logs continuously") + flags.Int64Var(&o.Tail, "tail", -1, "Show last N lines (default: all)") + flags.StringVar(&o.Since, "since", o.Since, "Logs newer than duration (e.g. 5s, 2m, 3h)") + flags.StringVar(&o.SinceTime, "since-time", o.SinceTime, "Logs after timestamp (RFC3339)") + flags.BoolVar(&o.Timestamps, "timestamps", o.Timestamps, "Include timestamps in output") + flags.BoolVar(&o.Previous, "previous", o.Previous, "Show logs from previous terminated container") + + // Output options + flags.BoolVar(&o.Prefix, "prefix", true, "Prefix lines with source (component/pod/container)") + flags.BoolVar(&o.NoColor, "no-color", o.NoColor, "Disable color-coded prefixes") + + flags.StringVar(&o.OverrideClusterServer, "cluster-server", o.OverrideClusterServer, "Override kubeconfig cluster server") +} + +func init() { + settings := config.GetSettings() + + logsOptions := LogsOptions{} + + mainCmd = &cobra.Command{ + Use: "logs [flags]", + Short: "Stream logs from component containers", + Long: `Stream container application logs (stdout/stderr) from Kubernetes pods. + +This command streams logs from one or more components in an environment. When multiple +components are specified, their logs are merged with color-coded prefixes for easy +identification. + +Component Selection: + --component , Stream logs from specific component(s) + --environment Stream logs from all components in environment + --name Filter by component name (requires --environment, repeatable) + +If no component is specified, the component from your current context will be used. + +Examples: + # Stream logs from component in context with follow mode + bns configure set-context --component comp-123 + bns logs --follow --tail 100 + + # Stream logs from multiple specific components + bns logs --component comp-1,comp-2,comp-3 --follow + + # Stream logs from all components in environment + bns logs --environment env-123 --tail 50 + + # Filter by component name + bns logs --environment env-123 --name api --name worker --follow + + # Specific pod and container + bns logs --component comp-123 --pod my-pod --container api --follow + + # All containers in component + bns logs --component comp-123 --all-containers --follow + + # Time-based filtering + bns logs --component comp-123 --since 5m --timestamps + + # Previous container logs + bns logs --component comp-123 --previous + +Flags: + -f, --follow Stream logs continuously + --tail Show last N lines + --since Logs newer than duration (e.g. 5s, 2m, 3h) + --since-time Logs after timestamp (RFC3339) + --timestamps Include timestamps + --previous Show logs from previous terminated container + --prefix Prefix lines with source (default: true) + --no-color Disable color-coded prefixes + -c, --container Container name (interactive if not specified) + --pod Pod name (interactive if not specified) + -n, --namespace Kubernetes namespace + --all-containers Stream from all containers`, + + Args: cobra.NoArgs, + ValidArgsFunction: cobra.NoFileCompletions, + + RunE: func(cmd *cobra.Command, args []string) error { + // Validate flags + if err := validateFlags(&logsOptions, settings); err != nil { + return err + } + + // Resolve component list + components, err := resolveComponents(&logsOptions, settings) + if err != nil { + return err + } + + if len(components) == 0 { + return errors.New("no components found") + } + + // Get environment ID from first component + envID, err := getEnvironmentID(components[0]) + if err != nil { + return err + } + + // Fetch kubeconfig once for all components + kubeConfigOptions := environment.NewKubeConfigOptions(envID, logsOptions.OverrideClusterServer) + kubeConfig, err := environment.KubeConfig(kubeConfigOptions) + if err != nil { + return err + } + + // Build stream sources for all components + sources, err := buildStreamSources(components, kubeConfig, &logsOptions) + if err != nil { + return err + } + + if len(sources) == 0 { + return errors.New("no log sources found (no running pods)") + } + + // Get output format from settings + outputFormat := settings.OutputFormat + if outputFormat == "" { + outputFormat = "stylish" + } + + // Create multiplexer + mux := k8sLogs.NewMultiplexer(sources, logsOptions.Prefix, logsOptions.NoColor, outputFormat) + + // Setup signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + go func() { + <-sigChan + fmt.Fprintln(os.Stderr, "\nReceived interrupt signal, stopping log streams...") + mux.Stop() + }() + + // Start streaming + if err := mux.Start(); err != nil { + return err + } + + // Wait for completion + errs := mux.Wait() + + // Report any errors + if len(errs) > 0 { + fmt.Fprintln(os.Stderr, "\nErrors occurred during log streaming:") + for _, err := range errs { + fmt.Fprintf(os.Stderr, " %v\n", err) + } + return errors.New("log streaming completed with errors") + } + + return nil + }, + } + + flags := mainCmd.Flags() + logsOptions.UpdateFlagSet(flags) + + config.MainManager.CommandWithAPI(mainCmd) +} + +func validateFlags(opts *LogsOptions, settings *config.Settings) error { + // Check mutually exclusive flags + if len(opts.ComponentIDs) > 0 && opts.EnvironmentID != "" { + return errors.New("--component and --environment are mutually exclusive") + } + + // --name requires --environment + if len(opts.ComponentNames) > 0 && opts.EnvironmentID == "" { + return errors.New("--name requires --environment") + } + + // Validate that we have at least one component source + if len(opts.ComponentIDs) == 0 && opts.EnvironmentID == "" && settings.Profile.Context.ServiceComponent == "" { + return errors.New("component ID required: use --component, --environment, or set in context with 'bns configure set-context --component '") + } + + return nil +} + +func resolveComponents(opts *LogsOptions, settings *config.Settings) ([]interface{}, error) { + var components []interface{} + + if len(opts.ComponentIDs) > 0 { + // Specific component IDs + for _, id := range opts.ComponentIDs { + itemOpts := component.NewItemOptions(id) + comp, err := component.Get(itemOpts) + if err != nil { + return nil, fmt.Errorf("failed to get component %s: %w", id, err) + } + components = append(components, comp) + } + } else if opts.EnvironmentID != "" { + // All components in environment (optionally filtered by name) + listOpts := component.NewListOptions() + listOpts.Environment = opts.EnvironmentID + + result, err := component.List(listOpts) + if err != nil { + return nil, fmt.Errorf("failed to list components: %w", err) + } + + var allComponents []sdk.ComponentCollection + if result.Embedded != nil { + allComponents = result.Embedded.Item + } + + // Filter by name if specified + if len(opts.ComponentNames) > 0 { + nameMap := make(map[string]bool) + for _, name := range opts.ComponentNames { + nameMap[name] = true + } + + for _, comp := range allComponents { + if nameMap[comp.GetName()] { + components = append(components, comp) + } + } + } else { + for _, comp := range allComponents { + components = append(components, comp) + } + } + } else { + // Fall back to context + componentID := settings.Profile.Context.ServiceComponent + if componentID == "" { + return nil, errors.New("no component specified") + } + + itemOpts := component.NewItemOptions(componentID) + comp, err := component.Get(itemOpts) + if err != nil { + return nil, fmt.Errorf("failed to get component from context: %w", err) + } + components = append(components, comp) + } + + return components, nil +} + +// Helper functions to extract component information from different SDK types +func getComponentID(comp interface{}) (string, error) { + switch c := comp.(type) { + case *sdk.ComponentItem: + return c.GetId(), nil + case sdk.ComponentCollection: + return c.GetId(), nil + default: + return "", fmt.Errorf("unsupported component type: %T", comp) + } +} + +func getComponentName(comp interface{}) (string, error) { + switch c := comp.(type) { + case *sdk.ComponentItem: + return c.GetName(), nil + case sdk.ComponentCollection: + return c.GetName(), nil + default: + return "", fmt.Errorf("unsupported component type: %T", comp) + } +} + +func getEnvironmentID(comp interface{}) (string, error) { + switch c := comp.(type) { + case *sdk.ComponentItem: + return c.GetEnvironment(), nil + case sdk.ComponentCollection: + return c.GetEnvironment(), nil + default: + return "", fmt.Errorf("unsupported component type: %T", comp) + } +} + +func buildStreamSources( + components []interface{}, + kubeConfig *environment.KubeConfigItem, + opts *LogsOptions, +) ([]*k8sLogs.StreamSource, error) { + var sources []*k8sLogs.StreamSource + + // Parse log options + logOpts := parseLogOptions(opts) + + for _, comp := range components { + // Get component ID and name + compID, err := getComponentID(comp) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to get component ID: %v\n", err) + continue + } + + compName, err := getComponentName(comp) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to get component name: %v\n", err) + continue + } + + // Get pods for this component + podListOpts := &k8sWizard.PodListOptions{ + Component: compID, + } + + pods, err := k8sWizard.PodList(podListOpts) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to list pods for component %s: %v\n", + compName, err) + continue + } + + if len(pods) == 0 { + fmt.Fprintf(os.Stderr, "Warning: no pods found for component %s\n", compName) + continue + } + + // For each pod, get containers and create stream sources + for _, pod := range pods { + podSources, err := buildPodStreamSources(compID, compName, pod, kubeConfig, opts, logOpts) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to build streams for pod %s: %v\n", + pod.GetName(), err) + continue + } + + sources = append(sources, podSources...) + } + } + + return sources, nil +} + +func buildPodStreamSources( + compID string, + compName string, + pod sdk.ComponentResourceItem, + kubeConfig *environment.KubeConfigItem, + opts *LogsOptions, + logOpts *k8sLogs.Options, +) ([]*k8sLogs.StreamSource, error) { + var sources []*k8sLogs.StreamSource + + // Filter by pod name if specified + if opts.PodName != "" && pod.GetName() != opts.PodName { + return sources, nil + } + + // Filter by namespace if specified + if opts.Namespace != "" && pod.GetNamespace() != opts.Namespace { + return sources, nil + } + + // Get containers for the pod + containers, err := getContainersForPod(kubeConfig, pod.GetNamespace(), pod.GetName()) + if err != nil { + return nil, err + } + + // Create stream source for each container + for _, container := range containers { + // Filter by container name if specified + if opts.Container != "" && container.Name != opts.Container { + continue + } + + // Create log streamer for this container + containerLogOpts := *logOpts + containerLogOpts.Namespace = pod.GetNamespace() + containerLogOpts.PodName = pod.GetName() + containerLogOpts.Container = container.Name + + streamer, err := k8sLogs.NewLogStreamer(kubeConfig.Bytes, &containerLogOpts) + if err != nil { + return nil, fmt.Errorf("failed to create log streamer: %w", err) + } + + source := &k8sLogs.StreamSource{ + ComponentID: compID, + ComponentName: compName, + Namespace: pod.GetNamespace(), + PodName: pod.GetName(), + Container: container.Name, + Streamer: streamer, + } + + sources = append(sources, source) + + // If not streaming all containers, break after first match + if opts.Container != "" { + break + } + } + + return sources, nil +} + +func getContainersForPod(kubeConfig *environment.KubeConfigItem, namespace, podName string) ([]*corev1.Container, error) { + // Create log streamer temporarily just to get the k8s client + tempOpts := &k8sLogs.Options{ + Namespace: namespace, + PodName: podName, + } + + streamer, err := k8sLogs.NewLogStreamer(kubeConfig.Bytes, tempOpts) + if err != nil { + return nil, err + } + + // Use the wizard to get containers + containerListOpts := &k8sWizard.ContainerListOptions{ + Namespace: namespace, + PodName: podName, + Client: streamer.PodClient, + } + + containerItems, err := k8sWizard.ContainerList(containerListOpts) + if err != nil { + return nil, err + } + + containers := make([]*corev1.Container, len(containerItems)) + for i, item := range containerItems { + containers[i] = item.Container + } + + return containers, nil +} + +func parseLogOptions(opts *LogsOptions) *k8sLogs.Options { + logOpts := &k8sLogs.Options{ + Follow: opts.Follow, + Timestamps: opts.Timestamps, + Previous: opts.Previous, + } + + if opts.Tail >= 0 { + logOpts.Tail = &opts.Tail + } + + if opts.Since != "" { + if duration, err := time.ParseDuration(opts.Since); err == nil { + logOpts.Since = &duration + } + } + + if opts.SinceTime != "" { + if t, err := time.Parse(time.RFC3339, opts.SinceTime); err == nil { + logOpts.SinceTime = &t + } + } + + return logOpts +} + +func GetMainCommand() *cobra.Command { + return mainCmd +} diff --git a/cmd/utils/root.go b/cmd/utils/root.go index 137a8cc..c8733ff 100644 --- a/cmd/utils/root.go +++ b/cmd/utils/root.go @@ -1,8 +1,10 @@ package utils import ( - "bunnyshell.com/cli/cmd/git" "bunnyshell.com/cli/cmd/component_debug" + "bunnyshell.com/cli/cmd/exec" + "bunnyshell.com/cli/cmd/git" + "bunnyshell.com/cli/cmd/logs" "bunnyshell.com/cli/cmd/remote_development" "github.com/spf13/cobra" ) @@ -13,6 +15,8 @@ func init() { mainCmd.AddCommand(git.GetMainCommand()) mainCmd.AddCommand(remote_development.GetMainCommand()) mainCmd.AddCommand(component_debug.GetMainCommand()) + mainCmd.AddCommand(exec.GetMainCommand()) + mainCmd.AddCommand(logs.GetMainCommand()) } func GetMainCommand() *cobra.Command { diff --git a/pkg/k8s/kubectl/logs/logs.go b/pkg/k8s/kubectl/logs/logs.go new file mode 100644 index 0000000..cef8f69 --- /dev/null +++ b/pkg/k8s/kubectl/logs/logs.go @@ -0,0 +1,142 @@ +package logs + +import ( + "context" + "io" + "time" + + "bunnyshell.com/cli/pkg/build" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/kubectl/pkg/scheme" +) + +type Options struct { + // Pod/Container selection + Namespace string + PodName string + Container string + + // Log filtering (kubectl standard) + Follow bool + Tail *int64 + Since *time.Duration + SinceTime *time.Time + Timestamps bool + Previous bool +} + +type LogStreamer struct { + Config *rest.Config + PodClient v1.PodsGetter + Namespace string + PodName string + Container string + Options *Options + Stream io.ReadCloser +} + +// NewLogStreamer creates a new log streamer for a specific pod/container +func NewLogStreamer(kubeConfig []byte, options *Options) (*LogStreamer, error) { + config, err := makeRestConfig(kubeConfig) + if err != nil { + return nil, err + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + return &LogStreamer{ + Config: config, + PodClient: client.CoreV1(), + Namespace: options.Namespace, + PodName: options.PodName, + Container: options.Container, + Options: options, + }, nil +} + +// Start begins streaming logs from the pod/container +func (ls *LogStreamer) Start(ctx context.Context) error { + podLogOptions := &corev1.PodLogOptions{ + Container: ls.Container, + Follow: ls.Options.Follow, + Timestamps: ls.Options.Timestamps, + Previous: ls.Options.Previous, + } + + if ls.Options.Tail != nil { + podLogOptions.TailLines = ls.Options.Tail + } + + if ls.Options.Since != nil { + seconds := int64(ls.Options.Since.Seconds()) + podLogOptions.SinceSeconds = &seconds + } + + if ls.Options.SinceTime != nil { + metaTime := metav1.NewTime(*ls.Options.SinceTime) + podLogOptions.SinceTime = &metaTime + } + + req := ls.PodClient.Pods(ls.Namespace).GetLogs(ls.PodName, podLogOptions) + + stream, err := req.Stream(ctx) + if err != nil { + return err + } + + ls.Stream = stream + return nil +} + +// Read reads from the log stream +func (ls *LogStreamer) Read(p []byte) (int, error) { + if ls.Stream == nil { + return 0, io.EOF + } + return ls.Stream.Read(p) +} + +// Close closes the log stream +func (ls *LogStreamer) Close() error { + if ls.Stream != nil { + return ls.Stream.Close() + } + return nil +} + +func makeRestConfig(bytes []byte) (*rest.Config, error) { + config, err := clientcmd.NewClientConfigFromBytes(bytes) + if err != nil { + return nil, err + } + + restConfig, err := config.ClientConfig() + if err != nil { + return nil, err + } + + setConfigDefaults(restConfig) + + return restConfig, nil +} + +func setConfigDefaults(config *rest.Config) *rest.Config { + config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"} + config.APIPath = "/api" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + + if config.UserAgent == "" { + config.UserAgent = "BunnyCLI+" + build.Version + } + + return config +} diff --git a/pkg/k8s/kubectl/logs/multiplexer.go b/pkg/k8s/kubectl/logs/multiplexer.go new file mode 100644 index 0000000..17af04f --- /dev/null +++ b/pkg/k8s/kubectl/logs/multiplexer.go @@ -0,0 +1,377 @@ +package logs + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "hash/fnv" + "io" + "os" + "strings" + "sync" + "time" + + "github.com/fatih/color" + "gopkg.in/yaml.v3" +) + +// StreamSource represents a single log stream source +type StreamSource struct { + ComponentID string + ComponentName string + Namespace string + PodName string + Container string + Streamer *LogStreamer +} + +// Multiplexer coordinates concurrent log streaming from multiple sources +type Multiplexer struct { + Sources []*StreamSource + Prefix bool + NoColor bool + OutputFormat string // "stylish", "json", "yaml" + errChan chan error + doneChan chan struct{} + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +// PrefixWriter wraps an io.Writer with thread-safe line prefixing +type PrefixWriter struct { + Writer io.Writer + Prefix string + Color *color.Color + mu sync.Mutex + lineBuffer bytes.Buffer +} + +// NewMultiplexer creates a new log multiplexer +func NewMultiplexer(sources []*StreamSource, prefix bool, noColor bool, outputFormat string) *Multiplexer { + ctx, cancel := context.WithCancel(context.Background()) + + return &Multiplexer{ + Sources: sources, + Prefix: prefix, + NoColor: noColor, + OutputFormat: outputFormat, + errChan: make(chan error, len(sources)), + doneChan: make(chan struct{}), + ctx: ctx, + cancel: cancel, + } +} + +// Start begins streaming from all sources concurrently +func (m *Multiplexer) Start() error { + // Start a goroutine for each source + for _, source := range m.Sources { + m.wg.Add(1) + go m.streamSource(source) + } + + // Wait for all streams to complete + go func() { + m.wg.Wait() + close(m.doneChan) + }() + + return nil +} + +// Wait blocks until all streams complete or are stopped +func (m *Multiplexer) Wait() []error { + <-m.doneChan + + // Collect any errors + var errs []error + close(m.errChan) + for err := range m.errChan { + errs = append(errs, err) + } + + return errs +} + +// Stop cancels all active streams +func (m *Multiplexer) Stop() { + m.cancel() +} + +// streamSource handles streaming from a single source +func (m *Multiplexer) streamSource(source *StreamSource) { + defer m.wg.Done() + + // Start the log stream + err := source.Streamer.Start(m.ctx) + if err != nil { + m.errChan <- fmt.Errorf("[%s] Failed to start log stream: %w", + m.formatSourcePrefix(source), err) + return + } + defer source.Streamer.Close() + + // Create appropriate writer based on output format + var writer io.Writer = os.Stdout + + switch m.OutputFormat { + case "json": + writer = NewJSONWriter(os.Stdout, source) + case "yaml": + writer = NewYAMLWriter(os.Stdout, source) + default: // "stylish" or any other format + if m.Prefix { + prefix := m.formatSourcePrefix(source) + prefixColor := m.getColorForSource(source) + writer = NewPrefixWriter(os.Stdout, prefix, prefixColor, m.NoColor) + } + } + + // Copy logs to output + _, err = io.Copy(writer, source.Streamer) + if err != nil && err != io.EOF { + // Check if error is due to context cancellation + if m.ctx.Err() == nil { + m.errChan <- fmt.Errorf("[%s] Stream error: %w", + m.formatSourcePrefix(source), err) + } + } +} + +// formatSourcePrefix creates a prefix string for a source +func (m *Multiplexer) formatSourcePrefix(source *StreamSource) string { + name := source.ComponentName + if name == "" { + name = source.ComponentID + } + + if source.Container != "" { + return fmt.Sprintf("%s/%s/%s", name, source.PodName, source.Container) + } + + return fmt.Sprintf("%s/%s", name, source.PodName) +} + +// getColorForSource returns a consistent color for a source based on hash +func (m *Multiplexer) getColorForSource(source *StreamSource) *color.Color { + colors := []*color.Color{ + color.New(color.FgCyan), + color.New(color.FgGreen), + color.New(color.FgYellow), + color.New(color.FgBlue), + color.New(color.FgMagenta), + } + + // Hash the source identifier to get consistent color + h := fnv.New32a() + h.Write([]byte(m.formatSourcePrefix(source))) + index := h.Sum32() % uint32(len(colors)) + + return colors[index] +} + +// NewPrefixWriter creates a new prefix writer +func NewPrefixWriter(w io.Writer, prefix string, c *color.Color, noColor bool) *PrefixWriter { + return &PrefixWriter{ + Writer: w, + Prefix: prefix, + Color: c, + } +} + +// Write writes data with prefix for each line +func (pw *PrefixWriter) Write(p []byte) (int, error) { + pw.mu.Lock() + defer pw.mu.Unlock() + + totalWritten := 0 + + // Process input byte by byte to handle line breaks + for _, b := range p { + pw.lineBuffer.WriteByte(b) + + // When we hit a newline, flush the line with prefix + if b == '\n' { + line := pw.lineBuffer.String() + pw.lineBuffer.Reset() + + // Write prefix and line + var written int + var err error + if pw.Color != nil { + written, err = pw.Color.Fprintf(pw.Writer, "[%s] %s", pw.Prefix, line) + } else { + written, err = fmt.Fprintf(pw.Writer, "[%s] %s", pw.Prefix, line) + } + + if err != nil { + return totalWritten, err + } + + totalWritten += written + } + } + + return len(p), nil +} + +// Flush flushes any remaining data in the buffer +func (pw *PrefixWriter) Flush() error { + pw.mu.Lock() + defer pw.mu.Unlock() + + if pw.lineBuffer.Len() > 0 { + line := pw.lineBuffer.String() + pw.lineBuffer.Reset() + + var err error + if pw.Color != nil { + _, err = pw.Color.Fprintf(pw.Writer, "[%s] %s\n", pw.Prefix, line) + } else { + _, err = fmt.Fprintf(pw.Writer, "[%s] %s\n", pw.Prefix, line) + } + + return err + } + + return nil +} + +// LogEntry represents a structured log entry for JSON/YAML output +type LogEntry struct { + Timestamp string `json:"timestamp" yaml:"timestamp"` + Component string `json:"component" yaml:"component"` + ComponentID string `json:"componentId" yaml:"componentId"` + Pod string `json:"pod" yaml:"pod"` + Container string `json:"container" yaml:"container"` + Namespace string `json:"namespace" yaml:"namespace"` + Message string `json:"message" yaml:"message"` +} + +// JSONWriter formats log lines as JSON objects +type JSONWriter struct { + Writer io.Writer + Source *StreamSource + mu sync.Mutex + lineBuffer bytes.Buffer +} + +// NewJSONWriter creates a new JSON writer +func NewJSONWriter(w io.Writer, source *StreamSource) *JSONWriter { + return &JSONWriter{ + Writer: w, + Source: source, + } +} + +// Write writes data as JSON formatted log entries +func (jw *JSONWriter) Write(p []byte) (int, error) { + jw.mu.Lock() + defer jw.mu.Unlock() + + totalWritten := 0 + + // Process input byte by byte to handle line breaks + for _, b := range p { + jw.lineBuffer.WriteByte(b) + + // When we hit a newline, format as JSON and flush + if b == '\n' { + line := strings.TrimSuffix(jw.lineBuffer.String(), "\n") + jw.lineBuffer.Reset() + + if line == "" { + continue + } + + entry := LogEntry{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Component: jw.Source.ComponentName, + ComponentID: jw.Source.ComponentID, + Pod: jw.Source.PodName, + Container: jw.Source.Container, + Namespace: jw.Source.Namespace, + Message: line, + } + + jsonData, err := json.Marshal(entry) + if err != nil { + return totalWritten, err + } + + written, err := fmt.Fprintf(jw.Writer, "%s\n", jsonData) + if err != nil { + return totalWritten, err + } + + totalWritten += written + } + } + + return len(p), nil +} + +// YAMLWriter formats log lines as YAML documents +type YAMLWriter struct { + Writer io.Writer + Source *StreamSource + mu sync.Mutex + lineBuffer bytes.Buffer +} + +// NewYAMLWriter creates a new YAML writer +func NewYAMLWriter(w io.Writer, source *StreamSource) *YAMLWriter { + return &YAMLWriter{ + Writer: w, + Source: source, + } +} + +// Write writes data as YAML formatted log entries +func (yw *YAMLWriter) Write(p []byte) (int, error) { + yw.mu.Lock() + defer yw.mu.Unlock() + + totalWritten := 0 + + // Process input byte by byte to handle line breaks + for _, b := range p { + yw.lineBuffer.WriteByte(b) + + // When we hit a newline, format as YAML and flush + if b == '\n' { + line := strings.TrimSuffix(yw.lineBuffer.String(), "\n") + yw.lineBuffer.Reset() + + if line == "" { + continue + } + + entry := LogEntry{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Component: yw.Source.ComponentName, + ComponentID: yw.Source.ComponentID, + Pod: yw.Source.PodName, + Container: yw.Source.Container, + Namespace: yw.Source.Namespace, + Message: line, + } + + yamlData, err := yaml.Marshal(entry) + if err != nil { + return totalWritten, err + } + + written, err := fmt.Fprintf(yw.Writer, "---\n%s", yamlData) + if err != nil { + return totalWritten, err + } + + totalWritten += written + } + } + + return len(p), nil +} From f32c5398efe7ed457cf73c76b2d6d38638a72856 Mon Sep 17 00:00:00 2001 From: Alex O Bunnyshell Date: Mon, 19 Jan 2026 17:18:22 +0200 Subject: [PATCH 2/3] Fix: Remove exec command import (belongs to separate PR) --- cmd/utils/root.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/utils/root.go b/cmd/utils/root.go index c8733ff..c551e98 100644 --- a/cmd/utils/root.go +++ b/cmd/utils/root.go @@ -2,7 +2,6 @@ package utils import ( "bunnyshell.com/cli/cmd/component_debug" - "bunnyshell.com/cli/cmd/exec" "bunnyshell.com/cli/cmd/git" "bunnyshell.com/cli/cmd/logs" "bunnyshell.com/cli/cmd/remote_development" @@ -15,7 +14,6 @@ func init() { mainCmd.AddCommand(git.GetMainCommand()) mainCmd.AddCommand(remote_development.GetMainCommand()) mainCmd.AddCommand(component_debug.GetMainCommand()) - mainCmd.AddCommand(exec.GetMainCommand()) mainCmd.AddCommand(logs.GetMainCommand()) } From 395d7748d19fc727e7f934cb2fc9651acee6cd4b Mon Sep 17 00:00:00 2001 From: Alex O Bunnyshell Date: Mon, 19 Jan 2026 17:57:14 +0200 Subject: [PATCH 3/3] Fix duplicate Flags section in logs help output - Move examples from Long to Example field for proper Cobra formatting - Remove manual Flags documentation from Long field - Let Cobra automatically generate the Flags section - Improves help readability by eliminating duplication --- cmd/logs/root.go | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/cmd/logs/root.go b/cmd/logs/root.go index db4a446..682ade9 100644 --- a/cmd/logs/root.go +++ b/cmd/logs/root.go @@ -94,10 +94,8 @@ Component Selection: --environment Stream logs from all components in environment --name Filter by component name (requires --environment, repeatable) -If no component is specified, the component from your current context will be used. - -Examples: - # Stream logs from component in context with follow mode +If no component is specified, the component from your current context will be used.`, + Example: ` # Stream logs from component in context with follow mode bns configure set-context --component comp-123 bns logs --follow --tail 100 @@ -120,21 +118,7 @@ Examples: bns logs --component comp-123 --since 5m --timestamps # Previous container logs - bns logs --component comp-123 --previous - -Flags: - -f, --follow Stream logs continuously - --tail Show last N lines - --since Logs newer than duration (e.g. 5s, 2m, 3h) - --since-time Logs after timestamp (RFC3339) - --timestamps Include timestamps - --previous Show logs from previous terminated container - --prefix Prefix lines with source (default: true) - --no-color Disable color-coded prefixes - -c, --container Container name (interactive if not specified) - --pod Pod name (interactive if not specified) - -n, --namespace Kubernetes namespace - --all-containers Stream from all containers`, + bns logs --component comp-123 --previous`, Args: cobra.NoArgs, ValidArgsFunction: cobra.NoFileCompletions,