diff --git a/plugins/inputs/t128_tank/reader.go b/plugins/inputs/t128_tank/reader.go index 7cf65090fe4ba..5c81cd4525f06 100644 --- a/plugins/inputs/t128_tank/reader.go +++ b/plugins/inputs/t128_tank/reader.go @@ -14,6 +14,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/influxdata/telegraf" @@ -132,28 +133,57 @@ func (r *Reader) Run(mainCtx context.Context) { readCtxCancel() return } + + var wg sync.WaitGroup nextSaveCheck := time.NewTicker(2 * time.Second) defer func() { readCtxCancel() - close(r.lastObservedIndex) - observedValue, ok := <-r.lastObservedIndex - if ok { - r.setIndex(r.indexPath, index{value: observedValue}) + go func() { + wg.Wait() + close(r.lastObservedIndex) + }() + + var anyFound bool + var lastObservedValue uint64 + drainLoop: + for { + select { + case observedValue, ok := <-r.lastObservedIndex: + if !ok { + break drainLoop + } + anyFound = true + lastObservedValue = observedValue + case <-r.readDone: + } + } + if anyFound { + r.setIndex(r.indexPath, index{value: lastObservedValue}) } else { r.setIndex(r.indexPath, index{value: lastIndex.value}) } }() - go r.read(readCtx, lastIndex.next()) + + wg.Add(1) + go func() { + defer wg.Done() + r.read(readCtx, lastIndex.next()) + }() for { select { case <-mainCtx.Done(): r.log.Errorf("%s reader done", r.topic) return - case observedValue = <-r.lastObservedIndex: - if lastIndex.value%1000 == 0 { - r.setIndex(r.indexPath, index{value: lastIndex.value}) + + case val := <-r.lastObservedIndex: + if val > lastIndex.value { + lastIndex.value = val + if lastIndex.value%1000 == 0 { + r.setIndex(r.indexPath, index{value: lastIndex.value}) + } } + case <-nextSaveCheck.C: if observedValue > lastIndex.value { lastIndex.value = observedValue @@ -165,10 +195,21 @@ func (r *Reader) Run(mainCtx context.Context) { if err != nil && errors.As(err, &errBoundaryFault) { r.log.Debugf("detected boundary fault, restarting %s tank read from index %d", r.topic, errBoundaryFault.nextAvailableIndex.value) lastIndex = errBoundaryFault.nextAvailableIndex - go r.read(readCtx, lastIndex) + wg.Add(1) + go func() { + defer wg.Done() + r.read(readCtx, lastIndex) + }() } else { + if observedValue > lastIndex.value { + lastIndex.value = observedValue + } lastIndex = lastIndex.next() - go r.read(readCtx, lastIndex) + wg.Add(1) + go func() { + defer wg.Done() + r.read(readCtx, lastIndex) + }() } } } @@ -391,4 +432,3 @@ func isBoundaryFault(line string) (bool, index) { return true, nextIndex } -