From 4e0903e958c6f819e7041bd5ecccd48ec7e48eae Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Tue, 29 Apr 2025 18:27:48 -0400 Subject: [PATCH 1/2] better handle indexing race condition WAN-4112 #time 1d --- plugins/inputs/t128_tank/reader.go | 60 ++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 7cf65090fe4ba..5efba839b76c7 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -14,6 +14,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/influxdata/telegraf" @@ -132,28 +133,55 @@ func (r *Reader) Run(mainCtx context.Context) { readCtxCancel() return } + r.readDone = make(chan error, 5) + r.lastObservedIndex = make(chan uint64, 100) + + var wg sync.WaitGroup nextSaveCheck := time.NewTicker(2 * time.Second) defer func() { readCtxCancel() - close(r.lastObservedIndex) - observedValue, ok := <-r.lastObservedIndex - if ok { - r.setIndex(r.indexPath, index{value: observedValue}) + wg.Wait() + + var anyFound bool + var lastObservedValue uint64 + drainLoop: + for { + select { + case observedValue := <-r.lastObservedIndex: + anyFound = true + lastObservedValue = observedValue + default: + break drainLoop + } + } + if anyFound { + r.setIndex(r.indexPath, index{value: lastObservedValue}) } else { r.setIndex(r.indexPath, index{value: lastIndex.value}) } + close(r.lastObservedIndex) + }() + + wg.Add(1) + go func() { + defer wg.Done() + r.read(readCtx, lastIndex.next()) }() - go r.read(readCtx, lastIndex.next()) for { select { case <-mainCtx.Done(): r.log.Errorf("%s reader done", r.topic) return - case observedValue = <-r.lastObservedIndex: - if lastIndex.value%1000 == 0 { - r.setIndex(r.indexPath, index{value: lastIndex.value}) + + case val := <-r.lastObservedIndex: + if val > lastIndex.value { + lastIndex.value = val + if lastIndex.value%1000 == 0 { + r.setIndex(r.indexPath, index{value: lastIndex.value}) + } } + case <-nextSaveCheck.C: if observedValue > lastIndex.value { lastIndex.value = observedValue @@ -165,10 +193,21 @@ func (r *Reader) Run(mainCtx context.Context) { if err != nil && errors.As(err, &errBoundaryFault) { r.log.Debugf("detected boundary fault, restarting %s tank read from index %d", r.topic, errBoundaryFault.nextAvailableIndex.value) lastIndex = errBoundaryFault.nextAvailableIndex - go r.read(readCtx, lastIndex) + wg.Add(1) + go func() { + defer wg.Done() + r.read(readCtx, lastIndex) + }() } else { + if observedValue > lastIndex.value { + lastIndex.value = observedValue + } lastIndex = lastIndex.next() - go r.read(readCtx, lastIndex) + wg.Add(1) + go func() { + defer wg.Done() + r.read(readCtx, lastIndex) + }() } } } @@ -391,4 +430,3 @@ func isBoundaryFault(line string) (bool, index) { return true, nextIndex } - From 7daab1a907fef062a1b158bc08cd5506c792bddb Mon Sep 17 00:00:00 2001 From: Shriyansh Kothari Date: Thu, 1 May 2025 12:26:51 -0400 Subject: [PATCH 2/2] remove buffer changes and update defer func WAN-4112 #time 2h --- plugins/inputs/t128_tank/reader.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 5efba839b76c7..5c81cd4525f06 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -133,25 +133,28 @@ func (r *Reader) Run(mainCtx context.Context) { readCtxCancel() return } - r.readDone = make(chan error, 5) - r.lastObservedIndex = make(chan uint64, 100) var wg sync.WaitGroup nextSaveCheck := time.NewTicker(2 * time.Second) defer func() { readCtxCancel() - wg.Wait() + go func() { + wg.Wait() + close(r.lastObservedIndex) + }() var anyFound bool var lastObservedValue uint64 drainLoop: for { select { - case observedValue := <-r.lastObservedIndex: + case observedValue, ok := <-r.lastObservedIndex: + if !ok { + break drainLoop + } anyFound = true lastObservedValue = observedValue - default: - break drainLoop + case <-r.readDone: } } if anyFound { @@ -159,7 +162,6 @@ func (r *Reader) Run(mainCtx context.Context) { } else { r.setIndex(r.indexPath, index{value: lastIndex.value}) } - close(r.lastObservedIndex) }() wg.Add(1)