Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ coverage.txt
# Built binaries
bin/
build/
borges
borges.exe
/borges
/borges.exe

143 changes: 143 additions & 0 deletions cli/borges/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package main

import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"

"github.com/inconshreveable/log15"
"github.com/src-d/borges/metrics"
)

type ExecutableCommand interface {
Command
Execute(args []string) error
}

type Command interface {
Name() string
ShortDescription() string
LongDescription() string
}

type simpleCommand struct {
name string
shortDescription string
longDescription string
}

func newSimpleCommand(name, short, long string) simpleCommand {
return simpleCommand{
name: name,
shortDescription: short,
longDescription: long,
}
}

func (c *simpleCommand) Name() string { return c.name }

func (c *simpleCommand) ShortDescription() string { return c.shortDescription }

func (c *simpleCommand) LongDescription() string { return c.longDescription }

type command struct {
simpleCommand
queueOpts
loggerOpts
metricsOpts
profilerOpts
}

func newCommand(name, short, long string) command {
return command{
simpleCommand: newSimpleCommand(
name,
short,
long,
),
}
}

func (c *command) init() {
c.loggerOpts.init()
c.profilerOpts.maybeStartProfiler()
c.metricsOpts.maybeStartMetrics()
}

type queueOpts struct {
Queue string `long:"queue" default:"borges" description:"queue name"`
}

type loggerOpts struct {
LogLevel string `short:"" long:"loglevel" description:"max log level enabled" default:"info"`
LogFile string `short:"" long:"logfile" description:"path to file where logs will be stored" default:""`
LogFormat string `short:"" long:"logformat" description:"format used to output the logs (json or text)" default:"text"`
}

func (c *loggerOpts) init() {
lvl, err := log15.LvlFromString(c.LogLevel)
if err != nil {
panic(fmt.Sprintf("unknown level name %q", c.LogLevel))
}

var handlers []log15.Handler
var format log15.Format
if c.LogFormat == "json" {
format = log15.JsonFormat()
handlers = append(
handlers,
log15.CallerFileHandler(log15.Must.FileHandler(os.Stdout.Name(), format)),
)
} else {
format = log15.LogfmtFormat()
handlers = append(
handlers,
log15.CallerFileHandler(log15.StdoutHandler),
)
}

if c.LogFile != "" {
handlers = append(
handlers,
log15.CallerFileHandler(log15.Must.FileHandler(c.LogFile, format)),
)
}

log15.Root().SetHandler(log15.LvlFilterHandler(lvl, log15.MultiHandler(handlers...)))
}

type metricsOpts struct {
Metrics bool `long:"metrics" description:"expose a metrics endpoint using an HTTP server"`
MetricsPort int `long:"metrics-port" description:"port to bind metrics to" default:"6062"`
}

func (c *metricsOpts) maybeStartMetrics() {
if c.Metrics {
addr := fmt.Sprintf("0.0.0.0:%d", c.MetricsPort)
go func() {
log.Debug("Started metrics service at", "address", addr)
if err := metrics.Start(addr); err != nil {
log.Warn("metrics service stopped", "err", err)
}
}()
}
}

type profilerOpts struct {
Profiler bool `long:"profiler" description:"start CPU, memory and block profilers"`
ProfilerPort int `long:"profiler-port" description:"port to bind profiler to" default:"6061"`
}

func (c *profilerOpts) maybeStartProfiler() {
if c.Profiler {
addr := fmt.Sprintf("0.0.0.0:%d", c.ProfilerPort)
go func() {
log.Debug("Started CPU, memory and block profilers at", "address", addr)
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Warn("Profiler failed to listen and serve at", "address", addr, "error", err)
}
}()
}
}
20 changes: 19 additions & 1 deletion cli/borges/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ const (
consumerCmdLongDesc = ""
)

var consumerCommand = &consumerCmd{command: newCommand(
consumerCmdName,
consumerCmdShortDesc,
consumerCmdLongDesc,
)}

type consumerCmd struct {
cmd
command
WorkersCount int `long:"workers" default:"8" description:"number of workers"`
Timeout string `long:"timeout" default:"10h" description:"deadline to process a job"`
}
Expand Down Expand Up @@ -51,3 +57,15 @@ func (c *consumerCmd) Execute(args []string) error {

return nil
}

func init() {
_, err := parser.AddCommand(
consumerCommand.Name(),
consumerCommand.ShortDescription(),
consumerCommand.LongDescription(),
consumerCommand)

if err != nil {
panic(err)
}
}
50 changes: 50 additions & 0 deletions cli/borges/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"os"

"github.com/src-d/borges"
"github.com/src-d/borges/storage"
core "gopkg.in/src-d/core-retrieval.v0"
)

const (
fileCmdName = "file"
fileCmdShortDesc = "produce jobs from file"
fileCmdLongDesc = ""
)

// fileCommand is a producer subcommand.
var fileCommand = &fileCmd{producerSubcmd: newProducerSubcmd(
fileCmdName,
fileCmdShortDesc,
fileCmdLongDesc,
)}

type fileCmd struct {
producerSubcmd

filePositionalArgs `positional-args:"true" required:"1"`
}

type filePositionalArgs struct {
File string `positional-arg-name:"path"`
}

func (c *fileCmd) Execute(args []string) error {
if err := c.producerSubcmd.init(); err != nil {
return err
}
defer c.broker.Close()

return c.generateJobs(c.jobIter)
}

func (c *fileCmd) jobIter() (borges.JobIter, error) {
storer := storage.FromDatabase(core.Database())
f, err := os.Open(c.File)
if err != nil {
return nil, err
}
return borges.NewLineJobIter(f, storer), nil
}
21 changes: 20 additions & 1 deletion cli/borges/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@ const (
initCmdLongDesc = ""
)

var initCommand = &initCmd{simpleCommand: newSimpleCommand(
initCmdName,
initCmdShortDesc,
initCmdLongDesc,
)}

type initCmd struct {
loggerCmd
simpleCommand
loggerOpts
}

func (c *initCmd) Execute(args []string) error {
Expand All @@ -34,3 +41,15 @@ func (c *initCmd) Execute(args []string) error {
log15.Info("database was successfully initialized")
return nil
}

func init() {
_, err := parser.AddCommand(
initCommand.Name(),
initCommand.ShortDescription(),
initCommand.LongDescription(),
initCommand)

if err != nil {
panic(err)
}
}
Loading