Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 47 additions & 32 deletions follower/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,47 +114,62 @@ func (t *Follower) follow() error {

for {
for {
// discard leading NUL bytes
var discarded int
select {

for {
b, _ := t.reader.Peek(peekSize)
i := bytes.LastIndexByte(b, '\x00')
// any errors that come from fsnotify
case err := <-errChan:
return err

if i > 0 {
n, _ := t.reader.Discard(i + 1)
discarded += n
}
// a request to stop
case <-t.closeCh:
t.watcher.Remove(t.filename)
return nil

if i+1 < peekSize {
break
}
}
default:
// discard leading NUL bytes
var discarded int

s, err := t.reader.ReadBytes('\n')
if err != nil && err != io.EOF {
return err
}
for {
b, _ := t.reader.Peek(peekSize)
i := bytes.LastIndexByte(b, '\x00')

// if we encounter EOF before a line delimiter,
// ReadBytes() will return the remaining bytes,
// so push them back onto the buffer, rewind
// our seek position, and wait for further file changes.
// we also have to save our dangling byte count in the event
// that we want to re-open the file and seek to the end
if err == io.EOF {
l := len(s)
if i > 0 {
n, _ := t.reader.Discard(i + 1)
discarded += n
}

t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent)
if err != nil {
if i+1 < peekSize {
break
}
}

s, err := t.reader.ReadBytes('\n')
if err != nil && err != io.EOF {
return err
}

t.reader.Reset(t.file)
break
}
// if we encounter EOF before a line delimiter,
// ReadBytes() will return the remaining bytes,
// so push them back onto the buffer, rewind
// our seek position, and wait for further file changes.
// we also have to save our dangling byte count in the event
// that we want to re-open the file and seek to the end
if err == io.EOF {
l := len(s)
if l == 0 {
<-time.NewTimer(time.Millisecond * 100).C
}
t.offset, err = t.file.Seek(-int64(l), io.SeekCurrent)
if err != nil {
return err
}

t.sendLine(s, discarded)
t.reader.Reset(t.file)
break
}

t.sendLine(s, discarded)
}
}

// we're now at EOF, so wait for changes
Expand All @@ -172,7 +187,7 @@ func (t *Follower) follow() error {
if !os.IsNotExist(err) {
return err
}

<-time.NewTimer(time.Second * 10).C
// it's possible that an unlink can cause fsnotify.Chmod,
// so attempt to rewatch if the file is missing
if err := t.rewatch(); err != nil {
Expand Down