diff --git a/containers/container.go b/containers/container.go index 8b12ecf..abb2cfb 100644 --- a/containers/container.go +++ b/containers/container.go @@ -1071,13 +1071,13 @@ func (c *Container) runLogParser(logPath string) { switch c.cgroup.ContainerType { case cgroup.ContainerTypeSystemdService: ch := make(chan logparser.LogEntry) - if err := JournaldSubscribe(c.cgroup, ch); err != nil { + if err := JournaldSubscribe(c.metadata.systemd.Unit, ch); err != nil { klog.Warningln(err) return } parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId), multilineCollectorTimeout, *flags.LogPatternsPerContainer) stop := func() { - JournaldUnsubscribe(c.cgroup) + JournaldUnsubscribe(c.metadata.systemd.Unit) } klog.InfoS("started journald logparser", "cg", c.cgroup.Id) c.logParsers["journald"] = &LogParser{parser: parser, stop: stop} diff --git a/containers/journald.go b/containers/journald.go index a9c2605..a4f1204 100644 --- a/containers/journald.go +++ b/containers/journald.go @@ -3,7 +3,6 @@ package containers import ( "fmt" - "github.com/coroot/coroot-node-agent/cgroup" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/proc" "github.com/coroot/logparser" @@ -25,20 +24,20 @@ func JournaldInit() error { return nil } -func JournaldSubscribe(cg *cgroup.Cgroup, ch chan<- logparser.LogEntry) error { +func JournaldSubscribe(unit string, ch chan<- logparser.LogEntry) error { if journaldReader == nil { return fmt.Errorf("journald reader not initialized") } - err := journaldReader.Subscribe(cg.Id, ch) + err := journaldReader.Subscribe(unit, ch) if err != nil { return err } return nil } -func JournaldUnsubscribe(cg *cgroup.Cgroup) { +func JournaldUnsubscribe(unit string) { if journaldReader == nil { return } - journaldReader.Unsubscribe(cg.Id) + journaldReader.Unsubscribe(unit) } diff --git a/containers/systemd.go b/containers/systemd.go index cddcb8a..ea9367c 100644 --- a/containers/systemd.go +++ b/containers/systemd.go @@ -90,9 +90,16 @@ func getSystemdProperties(id string) SystemdProperties { ctx, cancel := context.WithTimeout(context.Background(), dbusTimeout) defer cancel() parts := strings.Split(id, "/") - unit := parts[len(parts)-1] - props.Unit = unit - properties, err := dbusConn.GetAllPropertiesContext(ctx, unit) + for _, p := range parts { + if strings.HasSuffix(p, ".service") { + props.Unit = p + break + } + } + if props.Unit == "" { + props.Unit = parts[len(parts)-1] + } + properties, err := dbusConn.GetAllPropertiesContext(ctx, props.Unit) if err != nil { klog.Warningln("failed to get systemd properties:", err) return props diff --git a/logs/journald_reader.go b/logs/journald_reader.go index bf3c1df..6e730fd 100644 --- a/logs/journald_reader.go +++ b/logs/journald_reader.go @@ -86,7 +86,7 @@ func (r *JournaldReader) follow() { Level: logparser.LevelByPriority(e.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY]), } r.lock.Lock() - ch, ok := r.subscribers[e.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP]] + ch, ok := r.subscribers[e.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT]] r.lock.Unlock() if !ok { continue @@ -95,24 +95,24 @@ func (r *JournaldReader) follow() { } } -func (r *JournaldReader) Subscribe(cgroup string, ch chan<- logparser.LogEntry) error { +func (r *JournaldReader) Subscribe(unit string, ch chan<- logparser.LogEntry) error { r.lock.Lock() defer r.lock.Unlock() - if _, ok := r.subscribers[cgroup]; ok { - return fmt.Errorf(`duplicate subscriber for cgroup %s`, cgroup) + if _, ok := r.subscribers[unit]; ok { + return fmt.Errorf(`duplicate subscriber for unit %s`, unit) } - r.subscribers[cgroup] = ch + r.subscribers[unit] = ch return nil } -func (r *JournaldReader) Unsubscribe(cgroup string) { +func (r *JournaldReader) Unsubscribe(unit string) { r.lock.Lock() defer r.lock.Unlock() - if _, ok := r.subscribers[cgroup]; !ok { - klog.Warning("unknown subscriber for cgroup", cgroup) + if _, ok := r.subscribers[unit]; !ok { + klog.Warning("unknown subscriber for unit", unit) return } - delete(r.subscribers, cgroup) + delete(r.subscribers, unit) } func (r *JournaldReader) Close() {