diff --git a/.golangci.yml b/.golangci.yml index c4474e4..a25123a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -5,6 +5,10 @@ run: linters: default: all disable: + - funlen + - gocyclo + - gocognit + - err113 - embeddedstructfieldcheck - testpackage - noinlineerr diff --git a/cmd/catp/README.md b/cmd/catp/README.md index ada3ab7..2749360 100644 --- a/cmd/catp/README.md +++ b/cmd/catp/README.md @@ -29,6 +29,9 @@ catp [OPTIONS] PATH ... write first 10 seconds of CPU profile to file -dbg-mem-prof string write heap profile to file after 10 seconds + -end-line int + stop printing lines at this line (exclusive), + default is 0 (no limit), each input file is counted separately -l count lines -no-progress disable progress printing @@ -57,6 +60,8 @@ catp [OPTIONS] PATH ... write current progress to a file -rate-limit float output rate limit lines per second + -save-matches value + save matches of previous filter group to file -skip value filter matching, may contain multiple AND patterns separated by ^, if filter matches, line is removed from the output (may be kept if it passed preceding -pass) @@ -64,13 +69,16 @@ catp [OPTIONS] PATH ... -skip-csv value filter matching, loads skip params from CSV file, each line is treated as -skip, each column value is AND condition. + -start-line int + start printing lines from this line (inclusive), + default is 0 (first line), each input file is counted separately -version print version and exit ``` ## Examples -Feed a file into `jq` field extractor with progress printing. +### Feed a file into `jq` field extractor with progress printing ``` catp get-key.log | jq .context.callback.Data.Nonce > get-key.jq @@ -84,11 +92,13 @@ get-key.log: 96.8% bytes read, 967819 lines processed, 8064.9 l/s, 41.8 MB/s, el get-key.log: 100.0% bytes read, 1000000 lines processed, 8065.7 l/s, 41.8 MB/s, elapsed 2m3.98s, remaining 0s ``` +### Parallel scan of multiple files + Run log filtering (lines containing `foo bar` or `baz`) on multiple files in background (with `screen`) and output to a new compressed file. ``` -screen -dmS foo12 ./catp -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12* +screen -dmS foo12 ./catp -parallel 20 -output ~/foo-2023-07-12.log.zst -pass "foo bar" -pass "baz" /home/logs/server-2023-07-12* ``` ``` @@ -108,14 +118,31 @@ all: 32.3% bytes read, /home/logs/server-2023-07-12-09-00.log_6.zst: 5.1% bytes # detaching from screen with ctrl+a+d ``` -Filter based on large list of needles. Values from allow and block lists are loaded into high-performance -[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes. +### Filter based on large list of needles + +Values from allow and block lists are loaded into high-performance +[Aho Corasick](https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm) indexes. ``` catp -pass-csv allowlist.csv -skip-csv blocklist.csv -pass-any -output filtered.log.zst source.log.zst ``` Each source line would follow the filtering pipeline: -* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into output -* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is skipped + +* if `allowlist.csv` has at least one row, all cells of which are present in the source line, source line gets into + output +* if not, but if `blocklist.csv` has at least one row, all cells of which are present in the source line, source line is + skipped * if not, source line gets into output because of `-pass-any` + +### Split matches into separate files + +``` +catp -pass foo -save-matches foo.log.zst -pass bar^baz -save-matches 2.gz -pass qux -pass quux -output other.log input.log +``` + +Pipeline: +* each line from `input.log` is being read +* lines that contain `foo` are stored to `foo.log.zst` +* lines that contain `bar` and `baz` (but not `foo` that was already matched) are stored to `2.gz` +* lines that contain `qux` or `quux` are stored to `other.log` diff --git a/cmd/catp/catp/app.go b/cmd/catp/catp/app.go index c561f11..559ebd2 100644 --- a/cmd/catp/catp/app.go +++ b/cmd/catp/catp/app.go @@ -7,7 +7,6 @@ import ( "errors" "flag" "fmt" - "io" "log" "os" "os/signal" @@ -21,13 +20,21 @@ import ( "github.com/bool64/dev/version" "github.com/bool64/progress" - gzip "github.com/klauspost/pgzip" ) // Main is the entry point for catp CLI tool. func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx r := &runner{} + var closers []func() error + defer func() { + for _, closer := range closers { + if err := closer(); err != nil { + log.Printf("failed to close: %s\n", err.Error()) + } + } + }() + flag.Var(flagFunc(func(v string) error { r.filters.addFilter(true, bytes.Split([]byte(v), []byte("^"))...) @@ -62,6 +69,17 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g "if filter matches, line is removed from the output (may be kept if it passed preceding -pass)\n"+ "for example, you can use \"-skip quux^baz -skip fooO\" to skip lines that have (quux AND baz) OR fooO") + flag.Var(flagFunc(func(v string) error { + w, closer, err := makeWriter(v) + if err != nil { + return err + } + + closers = append(closers, closer) + + return r.filters.saveTo(w) + }), "save-matches", "save matches of previous filter group to file") + flag.IntVar(&r.parallel, "parallel", 1, "number of parallel readers if multiple files are provided\n"+ "lines from different files will go to output simultaneously (out of order of files, but in order of lines in each file)\n"+ "use 0 for multi-threaded zst decoder (slightly faster at cost of more CPU)") @@ -79,8 +97,13 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g "files will be written to out dir with original base names\n"+ "disables output flag") + flag.IntVar(&r.startLine, "start-line", 0, "start printing lines from this line (inclusive),\n"+ + "default is 0 (first line), each input file is counted separately") + flag.IntVar(&r.endLine, "end-line", 0, "stop printing lines at this line (exclusive),\n"+ + "default is 0 (no limit), each input file is counted separately") + flag.Usage = func() { - fmt.Println("catp", version.Module("github.com/bool64/progress").Version+",", + fmt.Println("catp", version.Module("github.com/bool64/progress").Version+r.options.VersionLabel+",", version.Info().GoVersion, strings.Join(versionExtra, " ")) fmt.Println() fmt.Println("catp prints contents of files to STDOUT or dir/file output, \n" + @@ -94,20 +117,6 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g } flag.Parse() - r.filters.buildIndex() - - if *ver { - fmt.Println(version.Module("github.com/bool64/progress").Version) - - return nil - } - - if flag.NArg() == 0 { - flag.Usage() - - return nil - } - if *cpuProfile != "" { startProfiling(*cpuProfile, *memProfile) @@ -122,6 +131,20 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g } } + r.filters.buildIndex() + + if *ver { + fmt.Println(version.Module("github.com/bool64/progress").Version + r.options.VersionLabel) + + return nil + } + + if flag.NArg() == 0 { + flag.Usage() + + return nil + } + var files []string args := flag.Args() @@ -158,61 +181,21 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g sort.Strings(files) if *output != "" && r.outDir == "" { - fn := *output - - out, err := os.Create(fn) //nolint:gosec + w, closer, err := makeWriter(*output) if err != nil { - return fmt.Errorf("failed to create output file %s: %w", fn, err) + return err } - r.output = out - compCloser := io.Closer(io.NopCloser(nil)) - - switch { - case strings.HasSuffix(fn, ".gz"): - gw := gzip.NewWriter(r.output) - compCloser = gw - - r.output = gw - case strings.HasSuffix(fn, ".zst"): - zw, err := zstdWriter(r.output) - if err != nil { - return fmt.Errorf("zstd new writer: %w", err) - } - - compCloser = zw - - r.output = zw - } - - w := bufio.NewWriterSize(r.output, 64*1024) r.output = w - defer func() { - if err := w.Flush(); err != nil { - log.Fatalf("failed to flush STDOUT buffer: %s", err) - } - - if err := compCloser.Close(); err != nil { - log.Fatalf("failed to close compressor: %s", err) - } - - if err := out.Close(); err != nil { - log.Fatalf("failed to close output file %s: %s", *output, err) - } - }() + closers = append(closers, closer) } else { if isStdin { r.output = os.Stdout } else { w := bufio.NewWriterSize(os.Stdout, 64*1024) r.output = w - - defer func() { - if err := w.Flush(); err != nil { - log.Fatalf("failed to flush STDOUT buffer: %s", err) - } - }() + closers = append(closers, w.Flush) } } diff --git a/cmd/catp/catp/catp.go b/cmd/catp/catp/catp.go index 3d86882..2e85545 100644 --- a/cmd/catp/catp/catp.go +++ b/cmd/catp/catp/catp.go @@ -59,6 +59,9 @@ type runner struct { noProgress bool countLines bool + startLine int + endLine int + hasOptions bool options Options @@ -200,6 +203,7 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { s := bufio.NewScanner(rd) s.Buffer(make([]byte, 64*1024), 10*1024*1024) + fileLines := 0 lines := 0 buf := make([]byte, 64*1024) @@ -208,13 +212,18 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { linesPush = 1 } - flusher, _ := out.(interface { //nolint:errcheck // nil is good enough. - Flush() error - }) - for s.Scan() { + fileLines++ lines++ + if r.startLine > 0 && fileLines <= r.startLine { + continue + } + + if r.endLine > 0 && fileLines > r.endLine { + break + } + if atomic.LoadInt64(&r.closed) > 0 { break } @@ -223,11 +232,21 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { _ = r.limiter.Wait(context.Background()) //nolint:errcheck // No failure condition here. } + line := s.Bytes() + w := out + + save, shouldWrite := r.filters.shouldWrite(line) + if save != nil { + w = save + } + if lines >= linesPush { atomic.AddInt64(&r.currentLines, int64(lines)) lines = 0 - if flusher != nil { + if flusher, ok := w.(interface { + Flush() error + }); ok { if r.parallel > 1 && r.outDir == "" { r.mu.Lock() if err := flusher.Flush(); err != nil { @@ -242,9 +261,7 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { } } - line := s.Bytes() - - if !r.filters.shouldWrite(line) { + if !shouldWrite { continue } @@ -261,21 +278,23 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { atomic.AddInt64(&r.matches, 1) - if r.parallel > 1 && r.outDir == "" { + synchronize := r.parallel > 1 && (r.outDir == "" || save != nil) + + if synchronize { r.mu.Lock() } - if _, err := out.Write(append(line, '\n')); err != nil { + if _, err := w.Write(append(line, '\n')); err != nil { r.lastErr = err - if r.parallel > 1 && r.outDir == "" { + if synchronize { r.mu.Unlock() } return } - if r.parallel > 1 && r.outDir == "" { + if synchronize { r.mu.Unlock() } } @@ -290,7 +309,7 @@ func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { } } -func (r *runner) cat(filename string) (err error) { //nolint:gocyclo +func (r *runner) cat(filename string) (err error) { var rd io.Reader if filename == "-" { @@ -343,42 +362,18 @@ func (r *runner) cat(filename string) (err error) { //nolint:gocyclo if r.outDir != "" { fn := r.outDir + "/" + path.Base(filename) - w, err := os.Create(fn) //nolint:gosec + w, closer, err := makeWriter(fn) if err != nil { return err } defer func() { - if clErr := w.Close(); clErr != nil && err == nil { - err = clErr + if err := closer(); err != nil { + log.Println("failed to close writer:", err.Error()) } }() out = w - - if strings.HasSuffix(fn, ".gz") { - z := gzip.NewWriter(w) - out = z - - defer func() { - if clErr := z.Close(); clErr != nil && err == nil { - err = clErr - } - }() - } else if strings.HasSuffix(fn, ".zst") { - z, err := zstdWriter(w) - if err != nil { - return err - } - - out = z - - defer func() { - if clErr := z.Close(); clErr != nil && err == nil { - err = clErr - } - }() - } } if r.parallel <= 1 && !r.noProgress { @@ -399,6 +394,10 @@ func (r *runner) cat(filename string) (err error) { //nolint:gocyclo r.limiter = rate.NewLimiter(rate.Limit(r.rateLimit), 100) } + if r.startLine != 0 || r.endLine != 0 { + r.countLines = true + } + if r.filters.isSet() || r.parallel > 1 || r.hasOptions || r.countLines || r.rateLimit > 0 { r.scanFile(filename, rd, out) } else { @@ -470,6 +469,9 @@ type Options struct { // PrepareLine is invoked for every line, if result is nil, line is skipped. // You can use buf to avoid allocations for a result, and change its capacity if needed. PrepareLine func(filename string, lineNr int, line []byte, buf *[]byte) []byte + + // VersionLabel is added to version message. + VersionLabel string } func (r *runner) loadCSVFilter(fn string, pass bool) error { diff --git a/cmd/catp/catp/filter.go b/cmd/catp/catp/filter.go index 3652522..cf19946 100644 --- a/cmd/catp/catp/filter.go +++ b/cmd/catp/catp/filter.go @@ -2,6 +2,8 @@ package catp import ( "bytes" + "errors" + "io" "github.com/cloudflare/ahocorasick" ) @@ -15,6 +17,8 @@ type ( // Prefilter checks for match of the first element of any ors item. // This first element is removed from and. pre *ahocorasick.Matcher + + save io.Writer } filters struct { g []*filterGroup @@ -45,6 +49,21 @@ func (f *filters) addPassAny() { f.g = append(f.g, &filterGroup{pass: true}) } +func (f *filters) saveTo(writer io.Writer) error { + if len(f.g) == 0 { + return errors.New("no filters set") + } + + g := f.g[len(f.g)-1] + if g.save != nil { + return errors.New("save already set") + } + + g.save = writer + + return nil +} + func (f *filters) addFilter(pass bool, and ...[]byte) { if len(and) == 0 { return @@ -56,7 +75,7 @@ func (f *filters) addFilter(pass bool, and ...[]byte) { if len(f.g) != 0 { g = f.g[len(f.g)-1] - if g.pass != pass { + if g.pass != pass || g.save != nil { g = &filterGroup{pass: pass} f.g = append(f.g, g) } @@ -69,7 +88,7 @@ func (f *filters) addFilter(pass bool, and ...[]byte) { g.ors = append(g.ors, and) } -func (f *filters) shouldWrite(line []byte) bool { +func (f *filters) shouldWrite(line []byte) (io.Writer, bool) { shouldWrite := true for _, g := range f.g { @@ -80,11 +99,11 @@ func (f *filters) shouldWrite(line []byte) bool { matched := g.match(line) if matched { - return g.pass + return g.save, g.pass } } - return shouldWrite + return nil, shouldWrite } func (g *filterGroup) match(line []byte) bool { diff --git a/cmd/catp/catp/filter_test.go b/cmd/catp/catp/filter_test.go index fb2d5c0..d34fb69 100644 --- a/cmd/catp/catp/filter_test.go +++ b/cmd/catp/catp/filter_test.go @@ -19,7 +19,7 @@ func TestFilter_Match(t *testing.T) { } for _, line := range bytes.Split(input, []byte("\n")) { - if f.shouldWrite(line) { + if _, ok := f.shouldWrite(line); ok { println(string(line)) } } diff --git a/cmd/catp/catp/writer.go b/cmd/catp/catp/writer.go new file mode 100644 index 0000000..962d9c5 --- /dev/null +++ b/cmd/catp/catp/writer.go @@ -0,0 +1,58 @@ +package catp + +import ( + "bufio" + "fmt" + "io" + "os" + "strings" + + gzip "github.com/klauspost/pgzip" +) + +func makeWriter(fn string) (io.Writer, func() error, error) { + f, err := os.Create(fn) //nolint:gosec + if err != nil { + return nil, nil, fmt.Errorf("failed to create output file %s: %w", fn, err) + } + + var res io.Writer + res = f + compCloser := io.Closer(io.NopCloser(nil)) + + switch { + case strings.HasSuffix(fn, ".gz"): + gw := gzip.NewWriter(res) + compCloser = gw + + res = gw + case strings.HasSuffix(fn, ".zst"): + zw, err := zstdWriter(res) + if err != nil { + return nil, nil, fmt.Errorf("zstd new writer: %w", err) + } + + compCloser = zw + + res = zw + } + + w := bufio.NewWriterSize(res, 64*1024) + res = w + + return res, func() error { + if err := w.Flush(); err != nil { + return fmt.Errorf("failed to flush output buffer %s: %w", fn, err) + } + + if err := compCloser.Close(); err != nil { + return fmt.Errorf("failed to close compressor %s: %w", fn, err) + } + + if err := f.Close(); err != nil { + return fmt.Errorf("failed to close output file %s: %w", fn, err) + } + + return nil + }, nil +} diff --git a/cmd/catp/default.pgo b/cmd/catp/default.pgo index 7e9358b..1da740f 100644 Binary files a/cmd/catp/default.pgo and b/cmd/catp/default.pgo differ