From 6f89038cc22ce4c3937cf05518a5cf1c2eb845f3 Mon Sep 17 00:00:00 2001 From: Trevor Linton Date: Thu, 15 Oct 2020 10:00:30 -0600 Subject: [PATCH 1/4] Ensure we trap close and error channels * This adds a select case statements to trap error and close channels during the initial read-through of the file (Before waiting for changes). If there is an error or a close is requested prior to this block finishing it would cause the caller to block on `.Close()` or cause parts of the internals here to block when sending to error. --- follower/follower.go | 73 ++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/follower/follower.go b/follower/follower.go index a73297d..eec463a 100644 --- a/follower/follower.go +++ b/follower/follower.go @@ -114,47 +114,60 @@ func (t *Follower) follow() error { for { for { - // discard leading NUL bytes - var discarded int + select { - for { - b, _ := t.reader.Peek(peekSize) - i := bytes.LastIndexByte(b, '\x00') + // any errors that come from fsnotify + case err := <-errChan: + return err + + // a request to stop + case <-t.closeCh: + t.watcher.Remove(t.filename) + return nil + + default: + // discard leading NUL bytes + var discarded int + + for { + b, _ := t.reader.Peek(peekSize) + i := bytes.LastIndexByte(b, '\x00') - if i > 0 { - n, _ := t.reader.Discard(i + 1) - discarded += n + if i > 0 { + n, _ := t.reader.Discard(i + 1) + discarded += n + } + + if i+1 < peekSize { + break + } } - if i+1 < peekSize { - break + s, err := t.reader.ReadBytes('\n') + if err != nil && err != io.EOF { + return err } - } - s, err := t.reader.ReadBytes('\n') - if err != nil && err != io.EOF { - return err - } + // if we encounter EOF before a line delimiter, + // ReadBytes() will return the remaining bytes, + // so push them back onto the buffer, rewind + // our seek position, and wait for further file changes. + // we also have to save our dangling byte count in the event + // that we want to re-open the file and seek to the end + if err == io.EOF { + l := len(s) - // if we encounter EOF before a line delimiter, - // ReadBytes() will return the remaining bytes, - // so push them back onto the buffer, rewind - // our seek position, and wait for further file changes. - // we also have to save our dangling byte count in the event - // that we want to re-open the file and seek to the end - if err == io.EOF { - l := len(s) + t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent) + if err != nil { + return err + } - t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent) - if err != nil { - return err + t.reader.Reset(t.file) + break } - t.reader.Reset(t.file) - break + t.sendLine(s, discarded) } - - t.sendLine(s, discarded) } // we're now at EOF, so wait for changes From 76c5e9bf73b94fb4a1211d37be1779904fd41266 Mon Sep 17 00:00:00 2001 From: Trevor Linton Date: Wed, 4 Nov 2020 16:22:42 -0700 Subject: [PATCH 2/4] Add wait incase were in a fast loop with no new data --- follower/follower.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/follower/follower.go b/follower/follower.go index eec463a..1d01031 100644 --- a/follower/follower.go +++ b/follower/follower.go @@ -156,13 +156,16 @@ func (t *Follower) follow() error { // that we want to re-open the file and seek to the end if err == io.EOF { l := len(s) - - t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent) - if err != nil { - return err + if l == 0 { + <-time.NewTimer(time.Millisecond * 100).C + } else { + t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent) + if err != nil { + return err + } + + t.reader.Reset(t.file) } - - t.reader.Reset(t.file) break } From 0af50d01261675017149d6168e4e4203a4e88787 Mon Sep 17 00:00:00 2001 From: Trevor Linton Date: Tue, 10 Nov 2020 12:19:30 -0700 Subject: [PATCH 3/4] Reset buffer on each iteration --- follower/follower.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/follower/follower.go b/follower/follower.go index 1d01031..98d8fc8 100644 --- a/follower/follower.go +++ b/follower/follower.go @@ -158,14 +158,13 @@ func (t *Follower) follow() error { l := len(s) if l == 0 { <-time.NewTimer(time.Millisecond * 100).C - } else { - t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent) - if err != nil { - return err - } - - t.reader.Reset(t.file) + } + t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent) + if err != nil { + return err } + + t.reader.Reset(t.file) break } From 2ba8774901cde0c56ec6017ea77c1fa34584f83a Mon Sep 17 00:00:00 2001 From: Trevor Linton Date: Tue, 10 Nov 2020 15:22:42 -0700 Subject: [PATCH 4/4] Update follower.go --- follower/follower.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/follower/follower.go b/follower/follower.go index 98d8fc8..91aaa7a 100644 --- a/follower/follower.go +++ b/follower/follower.go @@ -187,7 +187,7 @@ func (t *Follower) follow() error { if !os.IsNotExist(err) { return err } - + <-time.NewTimer(time.Second * 10).C // it's possible that an unlink can cause fsnotify.Chmod, // so attempt to rewatch if the file is missing if err := t.rewatch(); err != nil {