diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a207807..8e5e839 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -4,7 +4,7 @@ "Deps": [ { "ImportPath": "github.com/dawanda/go-mesos/marathon", - "Rev": "76abdfc1b8876b39505c5a1e63583bba16f3243a" + "Rev": "56dfa5705fdac2336ac685e96a4882af4629e346" }, { "ImportPath": "github.com/gorilla/context", diff --git a/core/app.go b/core/app.go new file mode 100644 index 0000000..81a0e0a --- /dev/null +++ b/core/app.go @@ -0,0 +1,33 @@ +package core + +// Marathon independant application definitions + +type AppCluster struct { + Name string // Marathon's human reaable AppId (i.e. /path/to/app) + Id string // globally unique ID that identifies this application + ServicePort uint // service port to listen to in service discovery + Protocol string // service protocol + PortName string // user filled port name + Labels map[string]string // key/value pairs of labels (port-local | global) + HealthCheck *AppHealthCheck // healthcheck, if available, or nil + Backends []AppBackend // ordered list of backend tasks + PortIndex int // Marathon's application port index +} + +type AppHealthCheck struct { + Protocol string + Path string + Command *string + GracePeriodSeconds uint + IntervalSeconds uint + TimeoutSeconds uint + MaxConsecutiveFailures uint + IgnoreHttp1xx bool +} + +type AppBackend struct { + Id string + Host string + Port uint + State string +} diff --git a/core/event_listener.go b/core/event_listener.go new file mode 100644 index 0000000..3174b16 --- /dev/null +++ b/core/event_listener.go @@ -0,0 +1,30 @@ +package core + +// EventListener provides an interface for hooking into +// standard service discovery API calls, such as adding and removing +// backends from load balancers (app clusters). +type EventListener interface { + // Startup is invoked upon application startup + Startup() + + // Shutdown is invoked upon application shutdown + Shutdown() + + // Apply installs the load balancer for all apps. + // + // This function is invoked upon startup to synchronize with the current + // state. + Apply(apps []*AppCluster) + + // AddTask must add the given backend to the cluster. + // + // It is assured that the task to be added is also already added to the + // given AppCluster. + AddTask(task *AppBackend, app *AppCluster) + + // RemoveTask must remove the given backend from the cluster. + // + // It is ensured that the task to be removed is not present in the given + // AppCluster. + RemoveTask(task *AppBackend, app *AppCluster) +} diff --git a/dns_manager.go b/dns_manager.go deleted file mode 100644 index e0c0f1d..0000000 --- a/dns_manager.go +++ /dev/null @@ -1,218 +0,0 @@ -package main - -// DNS based service discovery -// --------------------------------------------------------------------------- -// -// Marathon: "/path/to/application" -// DNS-query: application.to.path.$basedomain (A, AAAA, TXT, SRV) -// DNS-reply: -// A => list of IPv4 addresses -// AAAA => list of IPv6 addresses -// SRV => ip:port array per task -// TXT => app labels - -// TODO: DNS forwarding -// TODO: DNS proxy cache (for speeding up) - -import ( - "fmt" - "log" - "net" - "strings" - "sync" - "time" - - "github.com/dawanda/go-mesos/marathon" - "github.com/miekg/dns" -) - -type DnsManager struct { - Verbose bool - ServiceAddr net.IP - ServicePort uint - BaseName string - DnsTTL time.Duration - PushSRV bool - udpServer *dns.Server - tcpServer *dns.Server - db map[string]*appEntry - dbMutex sync.Mutex -} - -type appEntry struct { - ipAddresses []net.IP - app *marathon.App -} - -func (manager *DnsManager) Setup() error { - dns.HandleFunc(manager.BaseName, manager.dnsHandler) - - go func() { - manager.udpServer = &dns.Server{ - Addr: fmt.Sprintf("%v:%v", manager.ServiceAddr, manager.ServicePort), - Net: "udp", - TsigSecret: nil, - } - err := manager.udpServer.ListenAndServe() - if err != nil { - log.Fatal(err) - } - }() - - go func() { - manager.tcpServer = &dns.Server{ - Addr: fmt.Sprintf("%v:%v", manager.ServiceAddr, manager.ServicePort), - Net: "tcp", - TsigSecret: nil, - } - err := manager.tcpServer.ListenAndServe() - if err != nil { - log.Fatal(err) - } - }() - - return nil -} - -func (manager *DnsManager) Shutdown() { - manager.udpServer.Shutdown() - manager.tcpServer.Shutdown() -} - -func (manager *DnsManager) Log(msg string) { - if manager.Verbose { - log.Printf("[dns]: %v\n", msg) - } -} - -func (manager *DnsManager) Apply(apps []*marathon.App, force bool) error { - manager.dbMutex.Lock() - manager.db = make(map[string]*appEntry) - manager.dbMutex.Unlock() - - for _, app := range apps { - if err := manager.updateApp(app); err != nil { - return err - } - } - - return nil -} - -func (manager *DnsManager) Update(app *marathon.App, taskID string) error { - return manager.updateApp(app) -} - -func (manager *DnsManager) updateApp(app *marathon.App) error { - var ipAddresses []net.IP - - for _, task := range app.Tasks { - ip, err := net.ResolveIPAddr("ip", task.Host) - if err != nil { - return err - } - ipAddresses = append(ipAddresses, ip.IP) - } - - var reversed = manager.makeDnsNameFromAppId(app.Id) - var entry = &appEntry{ - ipAddresses: ipAddresses, - app: app, - } - - manager.dbMutex.Lock() - manager.db[reversed] = entry - manager.dbMutex.Unlock() - - return nil -} - -func (manager *DnsManager) makeDnsNameFromAppId(appID string) string { - var parts = strings.Split(appID, "/")[1:] - var reversedParts []string - for i := range parts { - reversedParts = append(reversedParts, parts[len(parts)-i-1]) - } - var reversed = strings.Join(reversedParts, ".") - - return reversed -} - -func (manager *DnsManager) Remove(appID string, taskID string, app *marathon.App) error { - if app == nil { - manager.dbMutex.Lock() - delete(manager.db, manager.makeDnsNameFromAppId(appID)) - manager.dbMutex.Unlock() - return nil - } - return manager.updateApp(app) -} - -func (manager *DnsManager) dnsHandler(w dns.ResponseWriter, req *dns.Msg) { - m := new(dns.Msg) - m.SetReply(req) - - name := req.Question[0].Name - name = strings.TrimSuffix(name, "."+manager.BaseName) - - manager.dbMutex.Lock() - entry, ok := manager.db[name] - manager.dbMutex.Unlock() - - if ok { - switch req.Question[0].Qtype { - case dns.TypeSRV: - m.Answer = manager.makeAllSRV(entry) - case dns.TypeA: - m.Answer = manager.makeAllA(entry) - if manager.PushSRV { - m.Extra = manager.makeAllSRV(entry) - } - } - } - - w.WriteMsg(m) -} - -func (manager *DnsManager) makeAllA(entry *appEntry) []dns.RR { - var result []dns.RR - - for _, ip := range entry.ipAddresses { - rr := &dns.A{ - Hdr: dns.RR_Header{ - Ttl: uint32(manager.DnsTTL.Seconds()), - Name: manager.BaseName, - Class: dns.ClassINET, - Rrtype: dns.TypeA, - }, - A: ip.To4(), - } - result = append(result, rr) - } - - return result -} - -func (manager *DnsManager) makeAllSRV(entry *appEntry) []dns.RR { - var result []dns.RR - - for _, task := range entry.app.Tasks { - for _, port := range task.Ports { - rr := &dns.SRV{ - Hdr: dns.RR_Header{ - Ttl: uint32(manager.DnsTTL.Seconds()), - Name: manager.BaseName, - Class: dns.ClassINET, - Rrtype: dns.TypeSRV, - }, - Port: uint16(port), - Target: task.Host + ".", - Weight: 1, - Priority: 1, - } - result = append(result, rr) - } - } - - return result -} diff --git a/files_manager.go b/files_manager.go deleted file mode 100644 index cab6fc8..0000000 --- a/files_manager.go +++ /dev/null @@ -1,143 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "io/ioutil" - "log" - "os" - "path/filepath" - - "github.com/dawanda/go-mesos/marathon" -) - -type FilesManager struct { - Enabled bool - Verbose bool - BasePath string -} - -func (upstream *FilesManager) Log(msg string) { - if upstream.Verbose { - log.Printf("upstream: %v\n", msg) - } -} - -func (manager *FilesManager) Setup() error { - return nil -} - -func (manager *FilesManager) IsEnabled() bool { - return manager.Enabled -} - -func (manager *FilesManager) SetEnabled(value bool) { - if value != manager.Enabled { - manager.Enabled = value - } -} - -func (upstream *FilesManager) Remove(appID string, taskID string, app *marathon.App) error { - if app != nil { - _, err := upstream.writeApp(app) - return err - } else { - // TODO: remove files for app-$portIndex - return nil - } -} - -func (upstream *FilesManager) Update(app *marathon.App, taskID string) error { - _, err := upstream.writeApp(app) - return err -} - -func (upstream *FilesManager) Apply(apps []*marathon.App, force bool) error { - err := os.MkdirAll(upstream.BasePath, 0770) - if err != nil { - return err - } - - var newFiles []string - oldFiles, err := upstream.collectFiles() - if err != nil { - return err - } - - for _, app := range apps { - filenames, _ := upstream.writeApp(app) - newFiles = append(newFiles, filenames...) - } - - // check for superfluous files - diff := FindMissing(oldFiles, newFiles) - for _, superfluous := range diff { - upstream.Log(fmt.Sprintf("Removing superfluous file: %v\n", superfluous)) - os.Remove(superfluous) - } - - return nil -} - -func (upstream *FilesManager) writeApp(app *marathon.App) ([]string, error) { - var files []string - - for portIndex, port := range app.Ports { - app_id := PrettifyAppId(app.Id, portIndex, port) - cfgfile := filepath.Join(upstream.BasePath, app_id+".instances") - tmpfile := cfgfile + ".tmp" - - err := upstream.writeFile(tmpfile, app_id, portIndex, app) - if err != nil { - return files, err - } - files = append(files, cfgfile) - - if _, err := os.Stat(cfgfile); os.IsNotExist(err) { - upstream.Log(fmt.Sprintf("new %v", cfgfile)) - os.Rename(tmpfile, cfgfile) - } else if !FileIsIdentical(tmpfile, cfgfile) { - upstream.Log(fmt.Sprintf("refresh %v", cfgfile)) - os.Rename(tmpfile, cfgfile) - } else { - // new file is identical to already existing one - os.Remove(tmpfile) - } - } - return files, nil -} - -func (upstream *FilesManager) writeFile(filename string, appId string, - portIndex int, app *marathon.App) error { - - var b bytes.Buffer - b.WriteString(fmt.Sprintf("Service-Name: %v\r\n", appId)) - b.WriteString(fmt.Sprintf("Service-Port: %v\r\n", app.Ports[portIndex])) - b.WriteString(fmt.Sprintf("Service-Transport-Proto: %v\r\n", GetTransportProtocol(app, portIndex))) - b.WriteString(fmt.Sprintf("Service-Application-Proto: %v\r\n", GetApplicationProtocol(app, portIndex))) - if proto := GetHealthCheckProtocol(app, portIndex); len(proto) != 0 { - b.WriteString(fmt.Sprintf("Health-Check-Proto: %v\r\n", GetApplicationProtocol(app, portIndex))) - } - b.WriteString("\r\n") - - for _, task := range app.Tasks { - b.WriteString(fmt.Sprintf("%v:%v\n", task.Host, task.Ports[portIndex])) - } - - return ioutil.WriteFile(filename, b.Bytes(), 0660) -} - -func (upstream *FilesManager) collectFiles() ([]string, error) { - fileInfos, err := ioutil.ReadDir(upstream.BasePath) - if err != nil { - upstream.Log(fmt.Sprintf("Error reading directory %v. %v", upstream.BasePath, err)) - return nil, err - } - - var fileNames []string - for _, fileInfo := range fileInfos { - fileNames = append(fileNames, filepath.Join(upstream.BasePath, fileInfo.Name())) - } - - return fileNames, nil -} diff --git a/haproxy_manager.go b/haproxy_manager.go deleted file mode 100644 index 24db654..0000000 --- a/haproxy_manager.go +++ /dev/null @@ -1,861 +0,0 @@ -package main - -// TODO: sort cluster names in cfg output -// TODO: sort backend names in cfg output -// TODO: avoid spam-reloading the haproxy binary (due to massive scaling) -// TODO: support local-health checks *or* marathon-based health check propagation (--local-health-checks=false) - -import ( - "errors" - "fmt" - "io/ioutil" - "log" - "net" - "os" - "os/exec" - "runtime" - "sort" - "strconv" - "strings" - "syscall" - - "github.com/dawanda/go-mesos/marathon" -) - -type HaproxyMgr struct { - Enabled bool - Verbose bool - LocalHealthChecks bool - FilterGroups []string - ServiceAddr net.IP - GatewayEnabled bool - GatewayAddr net.IP - GatewayPortHTTP uint - GatewayPortHTTPS uint - Executable string - ConfigPath string - ConfigTailPath string - OldConfigPath string - PidFile string - ManagementAddr net.IP - ManagementPort uint - AdminSockPath string - appConfigFragments map[string]string // [appId] = haproxy_config_fragment - appLabels map[string]map[string]string // [appId][key] = value - appStateCache map[string]map[string]*marathon.Task // [appId][task] = Task - vhosts map[string][]string // [appId] = []vhost - vhostDefault string - vhostsHTTPS map[string][]string - vhostDefaultHTTPS string -} - -const ( - LB_PROXY_PROTOCOL = "lb-proxy-protocol" - LB_ACCEPT_PROXY = "lb-accept-proxy" - LB_VHOST_HTTP = "lb-vhost" - LB_VHOST_DEFAULT_HTTP = "lb-vhost-default" - LB_VHOST_HTTPS = "lb-vhost-ssl" - LB_VHOST_DEFAULT_HTTPS = "lb-vhost-default-ssl" -) - -var ( - ErrBadExit = errors.New("Bad Process Exit.") -) - -// {{{ SortedVhostsKeys -type sortedVhosts struct { - m map[string][]string - s []string -} - -func (sm *sortedVhosts) Len() int { - return len(sm.m) -} - -func (sm *sortedVhosts) Less(a, b int) bool { - // return sm.m[sm.s[a]] > sm.m[sm.s[b]] - return sm.s[a] < sm.s[b] -} - -func (sm *sortedVhosts) Swap(a, b int) { - sm.s[a], sm.s[b] = sm.s[b], sm.s[a] -} - -func SortedVhostsKeys(m map[string][]string) []string { - sm := new(sortedVhosts) - sm.m = m - sm.s = make([]string, len(m)) - - i := 0 - for key, _ := range m { - sm.s[i] = key - i++ - } - sort.Sort(sm) - - return sm.s -} - -// }}} -// {{{ SortedStrStrKeys -type sortedStrStrKeys struct { - m map[string]string - s []string -} - -func (sm *sortedStrStrKeys) Len() int { - return len(sm.m) -} - -func (sm *sortedStrStrKeys) Less(a, b int) bool { - // return sm.m[sm.s[a]] > sm.m[sm.s[b]] - return sm.s[a] < sm.s[b] -} - -func (sm *sortedStrStrKeys) Swap(a, b int) { - sm.s[a], sm.s[b] = sm.s[b], sm.s[a] -} - -func SortedStrStrKeys(m map[string]string) []string { - sm := new(sortedStrStrKeys) - sm.m = m - sm.s = make([]string, len(m)) - - i := 0 - for key, _ := range m { - sm.s[i] = key - i++ - } - sort.Sort(sm) - - return sm.s -} - -// }}} - -func makeStringArray(s string) []string { - if len(s) == 0 { - return []string{} - } else { - return strings.Split(s, ",") - } -} - -func (manager *HaproxyMgr) Setup() error { - return nil -} - -func (manager *HaproxyMgr) IsEnabled() bool { - return manager.Enabled -} - -func (manager *HaproxyMgr) SetEnabled(value bool) { - if value != manager.Enabled { - manager.Enabled = value - } -} - -func (manager *HaproxyMgr) Apply(apps []*marathon.App, force bool) error { - manager.appConfigFragments = make(map[string]string) - manager.clearAppStateCache() - - manager.vhosts = make(map[string][]string) - manager.vhostDefault = "" - - manager.vhostsHTTPS = make(map[string][]string) - manager.vhostDefaultHTTPS = "" - - for _, app := range apps { - config, err := manager.makeConfig(app) - if err != nil { - return err - } - manager.appConfigFragments[app.Id] = config - for _, task := range app.Tasks { - manager.setAppStateCacheEntry(&task) - } - } - - err := manager.writeConfig() - if err != nil { - return err - } - - return manager.reloadConfig(false) -} - -func (manager *HaproxyMgr) Remove(appID string, taskID string, app *marathon.App) error { - if app != nil { - // make sure we *remove* the task from the cluster - config, err := manager.makeConfig(app) - if err != nil { - return err - } - if len(app.Tasks) > 0 { - // app removed one task, still at least one alive - manager.appConfigFragments[appID] = config - } else { - // app suspended (or scaled down to zero) - delete(manager.appConfigFragments, appID) - } - } else { - // app destroyed fully - delete(manager.appConfigFragments, appID) - } - manager.removeAppStateCacheEntry(appID, taskID) - - err := manager.writeConfig() - if err != nil { - return err - } - - return manager.reloadConfig(false) -} - -func isAppJustSpawned(app *marathon.App) bool { - // find out if an app has just been spawned by checking - // if it ever failed already. - - if len(app.Tasks) == 0 { - return false - } - - for _, hsr := range app.Tasks[0].HealthCheckResults { - if hsr.LastFailure != nil { - return false - } - } - - return true -} - -func (manager *HaproxyMgr) Update(app *marathon.App, taskID string) error { - // collect list of task labels as we formatted them in haproxy.cfg. - var instanceNames []string - for portIndex, servicePort := range app.Ports { - if GetTransportProtocol(app, portIndex) == "tcp" { - appID := PrettifyAppId(app.Id, portIndex, servicePort) - cachedTask := manager.getAppStateCacheEntry(app.Id, taskID) - if cachedTask == nil { - cachedTask = app.GetTaskById(taskID) - } - if cachedTask != nil { - cachedTaskLabel := fmt.Sprintf("%v/%v:%v", appID, cachedTask.Host, cachedTask.Ports[portIndex]) - instanceNames = append(instanceNames, cachedTaskLabel) - } - } - } - - config, err := manager.makeConfig(app) - if err != nil { - return err - } - - manager.appConfigFragments[app.Id] = config - for _, task := range app.Tasks { - manager.setAppStateCacheEntry(&task) - } - - err = manager.writeConfig() - if err != nil { - return err - } - - task := app.GetTaskById(taskID) - - // go right away reload the config if that is the first start of the - // underlying task and we got just health - if task != nil && task.IsAlive() { - // no health checks defined or app got just spawned the first time? - if len(app.HealthChecks) == 0 || isAppJustSpawned(app) { - log.Printf("[haproxy] App %v on host %v becomes healthy (or alive) first time. force reload config.\n", - app.Id, task.Host) - return manager.reloadConfig(true) - } - } - - // upstream server is already present, so send en enable or disable command - // to all app clusters of this name ($app-$portIndex-$servicePort/$taskLabel) - var updateCommandFmt string - if task != nil && task.IsAlive() { - updateCommandFmt = "enable server %v\n" - } else { - updateCommandFmt = "disable server %v\n" - } - - for _, instanceName := range instanceNames { - err := manager.sendCommandf(updateCommandFmt, instanceName) - if err != nil { - return err - } - } - - return nil -} - -func (manager *HaproxyMgr) sendCommandf(cmdFmt string, args ...interface{}) error { - log.Printf("[haproxy] "+cmdFmt, args...) - cmd := fmt.Sprintf(cmdFmt, args...) - conn, err := net.DialUnix("unix", nil, &net.UnixAddr{manager.AdminSockPath, "unix"}) - if err != nil { - return err - } - defer conn.Close() - - _, err = conn.Write([]byte(cmd)) - if err != nil { - return err - } - - var response []byte = make([]byte, 32768) - _, err = conn.Read(response) - if err != nil { - return err - } - - return nil -} - -func (manager *HaproxyMgr) makeConfig(app *marathon.App) (string, error) { - var result string - - // import application labels - if manager.appLabels == nil { - manager.appLabels = make(map[string]map[string]string) - } - manager.appLabels[app.Id] = make(map[string]string) - for k, v := range app.Labels { - manager.appLabels[app.Id][k] = v - } - - for portIndex, portDef := range app.PortDefinitions { - if manager.isGroupIncluded(portDef.Labels["lb-group"]) { - result += manager.makeConfigForPort(app, portIndex) - } - } - - return result, nil -} - -func (manager *HaproxyMgr) isGroupIncluded(groupName string) bool { - for _, filterGroup := range manager.FilterGroups { - if groupName == filterGroup || filterGroup == "*" { - return true - } - } - return false -} - -func (manager *HaproxyMgr) makeConfigForPort(app *marathon.App, portIndex int) string { - if GetTransportProtocol(app, portIndex) != "tcp" { - return "" - } - - var portDef = app.PortDefinitions[portIndex] - var servicePort = portDef.Port - var appID = PrettifyAppId(app.Id, portIndex, servicePort) - var bindAddr = manager.ServiceAddr - var healthCheck = GetHealthCheckForPortIndex(app.HealthChecks, portIndex) - var appProtocol = GetApplicationProtocol(app, portIndex) - - var lbVirtualHosts = makeStringArray(portDef.Labels[LB_VHOST_HTTP]) - if len(lbVirtualHosts) != 0 { - manager.vhosts[appID] = lbVirtualHosts - if portDef.Labels[LB_VHOST_DEFAULT_HTTP] == "1" { - manager.vhostDefault = appID - } - } else { - delete(manager.vhosts, appID) - if manager.vhostDefault == appID { - manager.vhostDefault = "" - } - } - - lbVirtualHosts = makeStringArray(portDef.Labels[LB_VHOST_HTTPS]) - if len(lbVirtualHosts) != 0 { - manager.vhostsHTTPS[appID] = lbVirtualHosts - if portDef.Labels[LB_VHOST_DEFAULT_HTTPS] == "1" { - manager.vhostDefaultHTTPS = appID - } - } else { - delete(manager.vhostsHTTPS, appID) - if manager.vhostDefaultHTTPS == appID { - manager.vhostDefaultHTTPS = "" - } - } - - result := "" - bindOpts := "" - - if runtime.GOOS == "linux" { // only enable on Linux (known to work) - bindOpts += " defer-accept" - } - - if Atoi(portDef.Labels[LB_ACCEPT_PROXY], 0) != 0 { - bindOpts += " accept-proxy" - } - - serverOpts := "" - - if manager.LocalHealthChecks { - serverOpts += " check" - } - - if healthCheck.IntervalSeconds > 0 { - serverOpts += fmt.Sprintf(" inter %v", healthCheck.IntervalSeconds*1000) - } - - switch Atoi(portDef.Labels[LB_PROXY_PROTOCOL], 0) { - case 2: - serverOpts += " send-proxy-v2" - case 1: - serverOpts += " send-proxy" - case 0: - // ignore - default: - log.Printf("Invalid proxy-protocol given for %v: %v - ignoring.", - app.Id, app.Labels["lb-proxy-protocol"]) - } - - switch appProtocol { - case "http": - result += fmt.Sprintf( - "frontend __frontend_%v\n"+ - " bind %v:%v%v\n"+ - " option dontlognull\n"+ - " default_backend %v\n"+ - "\n"+ - "backend %v\n"+ - " mode http\n"+ - " balance leastconn\n"+ - " option forwardfor\n"+ - " option http-server-close\n"+ - " option abortonclose\n"+ - " option httpchk GET %v HTTP/1.1\\r\\nHost:\\ %v\n", - appID, bindAddr, servicePort, bindOpts, appID, appID, - healthCheck.Path, "health-check") - case "redis-master", "redis-server", "redis": - result += fmt.Sprintf( - "listen %v\n"+ - " bind %v:%v%v\n"+ - " option dontlognull\n"+ - " mode tcp\n"+ - " balance leastconn\n"+ - " option tcp-check\n"+ - " tcp-check connect\n"+ - " tcp-check send PING\\r\\n\n"+ - " tcp-check expect string +PONG\n"+ - " tcp-check send info\\ replication\\r\\n\n"+ - " tcp-check expect string role:master\n"+ - " tcp-check send QUIT\\r\\n\n"+ - " tcp-check expect string +OK\n", - appID, bindAddr, servicePort, bindOpts) - case "smtp": - result += fmt.Sprintf( - "listen %v\n"+ - " bind %v:%v%v\n"+ - " option dontlognull\n"+ - " mode tcp\n"+ - " balance leastconn\n"+ - " option tcp-check\n"+ - " option smtpchk EHLO localhost\n", - appID, bindAddr, servicePort, bindOpts) - default: - result += fmt.Sprintf( - "listen %v\n"+ - " bind %v:%v%v\n"+ - " option dontlognull\n"+ - " mode tcp\n"+ - " balance leastconn\n", - appID, bindAddr, servicePort, bindOpts) - } - - result += " option redispatch\n" - result += " retries 1\n" - - for _, task := range sortTasks(app.Tasks, portIndex) { - // Include the task iff it is in running state or a state is not provided. - // The latter is kept for backwards compatibility with older - // Marathon services - if task.State == nil || *task.State == marathon.TaskRunning { - result += fmt.Sprintf( - " server %v:%v %v:%v%v\n", - task.Host, task.Ports[portIndex], // taskLabel == "$host:$port" - SoftResolveIPAddr(task.Host), - task.Ports[portIndex], - serverOpts) - } - } - - result += "\n" - - return result -} - -func (manager *HaproxyMgr) writeConfig() error { - config, err := manager.makeConfigHead() - if err != nil { - return err - } - - var clusterNames []string - for name, _ := range manager.appConfigFragments { - clusterNames = append(clusterNames, name) - } - sort.Strings(clusterNames) - for _, name := range clusterNames { - config += manager.appConfigFragments[name] - } - - tail, err := manager.makeConfigTail() - if err != nil { - return err - } - config += tail - - tempConfigFile := fmt.Sprintf("%v.tmp", manager.ConfigPath) - err = ioutil.WriteFile(tempConfigFile, []byte(config), 0666) - if err != nil { - return err - } - - err = manager.checkConfig(tempConfigFile) - if err != nil { - return err - } - - // if config file previousely did exist, attempt a rename - if _, err := os.Stat(manager.ConfigPath); err == nil { - if err = os.Rename(manager.ConfigPath, manager.OldConfigPath); err != nil { - return err - } - } - - return os.Rename(tempConfigFile, manager.ConfigPath) -} - -func (manager *HaproxyMgr) makeConfigHead() (string, error) { - headerFragment := fmt.Sprintf( - "# This is an auto generated haproxy configuration!!!\n"+ - "global\n"+ - " maxconn 32768\n"+ - " maxconnrate 32768\n"+ - " log 127.0.0.1 local0\n"+ - " stats socket %v mode 600 level admin\n"+ - "\n"+ - "defaults\n"+ - " maxconn 32768\n"+ - " timeout client 90000\n"+ - " timeout server 90000\n"+ - " timeout connect 90000\n"+ - " timeout queue 90000\n"+ - " timeout http-request 90000\n"+ - "\n", manager.AdminSockPath) - - mgntFragment := fmt.Sprintf( - "listen haproxy\n"+ - " bind %v:%v\n"+ - " mode http\n"+ - " stats enable\n"+ - " stats uri /\n"+ - " stats admin if TRUE\n"+ - " monitor-uri /haproxy?monitor\n"+ - "\n", - manager.ManagementAddr, manager.ManagementPort) - - if manager.GatewayEnabled { - gatewayHTTP := manager.makeGatewayHTTP() - gatewayHTTPS := manager.makeGatewayHTTPS() - return headerFragment + mgntFragment + gatewayHTTP + gatewayHTTPS, nil - } else { - return headerFragment + mgntFragment, nil - } -} - -func (manager *HaproxyMgr) makeGatewayHTTP() string { - var ( - suffixRoutes map[string]string = make(map[string]string) - suffixMatches []string - exactRoutes map[string]string = make(map[string]string) - exactMatches []string - vhostDefault string - port uint = manager.GatewayPortHTTP - ) - - for _, appID := range SortedVhostsKeys(manager.vhosts) { - vhosts := manager.vhosts[appID] - for _i, vhost := range vhosts { - log.Printf("[haproxy] appID:%v, vhost:%v, i:%v\n", appID, vhost, _i) - matchToken := "vhost_" + vhost - matchToken = strings.Replace(matchToken, ".", "_", -1) - matchToken = strings.Replace(matchToken, "*", "STAR", -1) - - if len(vhost) >= 3 && vhost[0] == '*' && vhost[1] == '.' { - suffixMatches = append(suffixMatches, - fmt.Sprintf(" acl %v hdr_dom(host) -i %v\n", matchToken, strings.SplitN(vhost, ".", 2)[1])) - suffixRoutes[matchToken] = appID - } else { - exactMatches = append(exactMatches, - fmt.Sprintf(" acl %v hdr(host) -i %v\n", matchToken, vhost)) - exactRoutes[matchToken] = appID - } - - if manager.vhostDefault == appID { - vhostDefault = appID - } - } - } - - var fragment string - fragment += fmt.Sprintf( - "frontend __gateway_http\n"+ - " bind %v:%v\n"+ - " mode http\n"+ - " option httplog\n"+ - " option dontlognull\n"+ - " option forwardfor\n"+ - " option http-server-close\n"+ - " reqadd X-Forwarded-Proto:\\ http\n"+ - "\n", - manager.GatewayAddr, - port) - - // write ACL statements - fragment += strings.Join(exactMatches, "") - fragment += strings.Join(suffixMatches, "") - if len(exactMatches) != 0 || len(suffixMatches) != 0 { - fragment += "\n" - } - - for _, acl := range SortedStrStrKeys(exactRoutes) { - appID := exactRoutes[acl] - fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) - } - - for _, acl := range SortedStrStrKeys(suffixRoutes) { - appID := suffixRoutes[acl] - fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) - } - - fragment += "\n" - - if len(vhostDefault) != 0 { - fragment += fmt.Sprintf(" default_backend %v\n\n", vhostDefault) - } - - return fragment -} - -func (manager *HaproxyMgr) makeGatewayHTTPS() string { - // SNI vhost selector - var ( - suffixRoutes map[string]string = make(map[string]string) - suffixMatches []string - exactRoutes map[string]string = make(map[string]string) - exactMatches []string - vhostDefault string - port uint = manager.GatewayPortHTTPS - ) - - for _, appID := range SortedVhostsKeys(manager.vhostsHTTPS) { - vhosts := manager.vhostsHTTPS[appID] - for _, vhost := range vhosts { - matchToken := "vhost_ssl_" + vhost - matchToken = strings.Replace(matchToken, ".", "_", -1) - matchToken = strings.Replace(matchToken, "*", "STAR", -1) - - if len(vhost) >= 3 && vhost[0] == '*' && vhost[1] == '.' { - suffixMatches = append(suffixMatches, - fmt.Sprintf(" acl %v req_ssl_sni -m dom %v\n", matchToken, strings.SplitN(vhost, ".", 2)[1])) - suffixRoutes[matchToken] = appID - } else { - exactMatches = append(exactMatches, - fmt.Sprintf(" acl %v req_ssl_sni -i %v\n", matchToken, vhost)) - exactRoutes[matchToken] = appID - } - - if manager.vhostDefaultHTTPS == appID { - vhostDefault = appID - } - } - } - - var fragment string - fragment += fmt.Sprintf( - "frontend __gateway_https\n"+ - " bind %v:%v\n"+ - " mode tcp\n"+ - " tcp-request inspect-delay 5s\n"+ - " tcp-request content accept if { req_ssl_hello_type 1 }\n"+ - "\n", - manager.GatewayAddr, - port) - - // write ACL statements - fragment += strings.Join(exactMatches, "") - fragment += strings.Join(suffixMatches, "") - if len(exactMatches) != 0 || len(suffixMatches) != 0 { - fragment += "\n" - } - - for _, acl := range SortedStrStrKeys(exactRoutes) { - appID := exactRoutes[acl] - fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) - } - - for _, acl := range SortedStrStrKeys(suffixRoutes) { - appID := suffixRoutes[acl] - fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) - } - - fragment += "\n" - - if len(vhostDefault) != 0 { - fragment += fmt.Sprintf(" default_backend %v\n\n", vhostDefault) - } - - return fragment -} - -func (manager *HaproxyMgr) makeConfigTail() (string, error) { - if len(manager.ConfigTailPath) == 0 { - return "", nil - } - - tail, err := ioutil.ReadFile(manager.ConfigTailPath) - if err != nil { - return "", err - } - - return string(tail), nil -} - -func (manager *HaproxyMgr) reloadConfig(force bool) error { - if !force && FileIsIdentical(manager.ConfigPath, manager.OldConfigPath) { - log.Printf("[haproxy] config file not changed. ignoring reload\n") - return nil - } - - pidStr, err := ioutil.ReadFile(manager.PidFile) - if err != nil { - return manager.startProcess() - } - pid, err := strconv.Atoi(strings.TrimSpace(string(pidStr))) - if err != nil { - return err - } - - err = syscall.Kill(pid, syscall.Signal(0)) - if err != nil { - // process doesn't exist; start up process - return manager.startProcess() - } else { - // process does exist; send SIGHUP to reload - return manager.reloadProcess(pid) - } -} - -func (manager *HaproxyMgr) checkConfig(path string) error { - return manager.exec("checking configuration", - "-f", path, "-p", manager.PidFile, "-c") -} - -func (manager *HaproxyMgr) startProcess() error { - return manager.exec("starting up process", - "-f", manager.ConfigPath, "-p", manager.PidFile, "-D", "-q") -} - -func (manager *HaproxyMgr) reloadProcess(pid int) error { - return manager.exec("reloading configuration", - "-f", manager.ConfigPath, "-p", manager.PidFile, "-D", "-sf", fmt.Sprint(pid)) -} - -func (manager *HaproxyMgr) exec(logMessage string, args ...string) error { - proc := exec.Command(manager.Executable, args...) - output, err := proc.CombinedOutput() - - log.Printf("[haproxy] %v: %v %v\n", logMessage, manager.Executable, args) - - exitCode := proc.ProcessState.Sys().(syscall.WaitStatus) - if exitCode != 0 { - log.Printf("[haproxy] Bad exit code %v.\n", exitCode) - err = ErrBadExit - } - - if len(output) != 0 && manager.Verbose { - log.Println("[haproxy] command output:") - log.Println(strings.TrimSpace(string(output))) - } - - return err -} - -func (manager *HaproxyMgr) clearAppStateCache() { - manager.appStateCache = make(map[string]map[string]*marathon.Task) -} - -func (manager *HaproxyMgr) getAppStateCacheEntry(appID, taskID string) *marathon.Task { - if app, ok := manager.appStateCache[appID]; ok { - if task, ok := app[taskID]; ok { - return task - } - } - return nil -} - -func (manager *HaproxyMgr) setAppStateCacheEntry(task *marathon.Task) { - app, ok := manager.appStateCache[task.AppId] - if !ok { - app = make(map[string]*marathon.Task) - manager.appStateCache[task.AppId] = app - } - app[task.Id] = task -} - -func (manager *HaproxyMgr) removeAppStateCacheEntry(appID, taskID string) { - if len(manager.appStateCache[appID]) != 0 { - delete(manager.appStateCache[appID], taskID) - - if len(manager.appStateCache[appID]) == 0 { - delete(manager.appStateCache, appID) - } - } -} - -// {{{ SortedTaskList - -type SortedTaskList struct { - Tasks []marathon.Task - PortIndex int -} - -func (tasks SortedTaskList) Len() int { - return len(tasks.Tasks) -} - -func (tasks SortedTaskList) Less(i, j int) bool { - var a = &tasks.Tasks[i] - var b = &tasks.Tasks[j] - - if len(a.Ports) < tasks.PortIndex || len(b.Ports) < tasks.PortIndex { - // XXX That's a very case; when you redeploy your app with the port count - // changed, you might run into here. - return a.Host < b.Host - } - - return a.Host < b.Host || a.Ports[tasks.PortIndex] < b.Ports[tasks.PortIndex] -} - -func (tasks SortedTaskList) Swap(i, j int) { - tmp := tasks.Tasks[i] - tasks.Tasks[i] = tasks.Tasks[j] - tasks.Tasks[j] = tmp -} - -// }}} - -func sortTasks(tasks []marathon.Task, portIndex int) []marathon.Task { - stl := SortedTaskList{Tasks: tasks, PortIndex: portIndex} - sort.Sort(stl) - return stl.Tasks -} diff --git a/main.go b/main.go index 062d699..a9848e1 100644 --- a/main.go +++ b/main.go @@ -28,37 +28,34 @@ XXX Changes: import ( "encoding/json" - "errors" "fmt" "log" "net" "net/http" "os" + "os/signal" "path/filepath" "sort" - "strconv" "strings" + "syscall" "time" "github.com/dawanda/go-mesos/marathon" + "github.com/dawanda/mmsd/core" + "github.com/dawanda/mmsd/modules" + "github.com/dawanda/mmsd/sse" + "github.com/dawanda/mmsd/util" "github.com/gorilla/mux" flag "github.com/ogier/pflag" ) -type mmsdHandler interface { - Setup() error - Apply(apps []*marathon.App, force bool) error - Update(app *marathon.App, taskID string) error - Remove(appID string, taskID string, app *marathon.App) error -} - type mmsdService struct { HttpApiPort uint Verbose bool - Handlers []mmsdHandler + Handlers []core.EventListener quitChannel chan bool RunStateDir string - FilterGroups string + FilterGroups []string LocalHealthChecks bool ManagementAddr net.IP @@ -68,6 +65,8 @@ type mmsdService struct { // service discovery IP-management ManagedIP net.IP + EventLoggerEnabled bool + // file based service discovery FilesEnabled bool @@ -93,13 +92,19 @@ type mmsdService struct { UDPEnabled bool // DNS service discovery - DnsEnabled bool - DnsPort uint - DnsBaseName string - DnsTTL time.Duration - DnsPushSRV bool + DNSEnabled bool + DNSPort uint + DNSBaseName string + DNSTTL time.Duration + DNSPushSRV bool + + // runtime state + apps []*core.AppCluster // cache of latest state of all app clusters + taskEvents map[string]marathon.StatusUpdateEvent // map of last task status update events + killingTasks map[string]bool // set of tasks currently in killing state } +// {{{ HTTP endpoint func (mmsd *mmsdService) setupHttpService() { router := mux.NewRouter() router.HandleFunc("/ping", mmsd.v0Ping) @@ -148,59 +153,13 @@ func (mmsd *mmsdService) v1Apps(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s\n", strings.Join(appList, "\n")) } -var ErrInvalidPortRange = errors.New("Invalid port range") - -func parseRange(input string) (int, int, error) { - if len(input) == 0 { - return 0, 0, nil - } - - vals := strings.Split(input, ":") - log.Printf("vals: %+q\n", vals) - - if len(vals) == 1 { - i, err := strconv.Atoi(input) - return i, i, err - } - - if len(vals) > 2 { - return 0, 0, ErrInvalidPortRange - } - - var ( - begin int - end int - err error - ) - - // parse begin - if vals[0] != "" { - begin, err = strconv.Atoi(vals[0]) - if err != nil { - return begin, end, err - } - } - - // parse end - if vals[1] != "" { - end, err = strconv.Atoi(vals[1]) - if begin > end { - return begin, end, ErrInvalidPortRange - } - } else { - end = -1 // XXX that is: until the end - } - - return begin, end, err -} - func (mmsd *mmsdService) v1Instances(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) appID := vars["app_id"] noResolve := r.URL.Query().Get("noresolve") == "1" withServerID := r.URL.Query().Get("withid") == "1" - portBegin, portEnd, err := parseRange(r.URL.Query().Get("portIndex")) + portBegin, portEnd, err := util.ParseRange(r.URL.Query().Get("portIndex")) if err != nil { w.WriteHeader(http.StatusBadRequest) log.Printf("error parsing range. %v\n", err) @@ -250,10 +209,10 @@ func (mmsd *mmsdService) v1Instances(w http.ResponseWriter, r *http.Request) { for _, task := range app.Tasks { item := "" if withServerID { - item += fmt.Sprintf("%v:", Hash(task.SlaveId)) + item += fmt.Sprintf("%v:", util.Hash(task.SlaveId)) } - item += resolveIPAddr(task.Host, noResolve) + item += util.ResolveIPAddr(task.Host, noResolve) for portIndex := portBegin; portIndex <= portEnd; portIndex++ { item += fmt.Sprintf(":%d", task.Ports[portIndex]) @@ -269,177 +228,286 @@ func (mmsd *mmsdService) v1Instances(w http.ResponseWriter, r *http.Request) { } } -func resolveIPAddr(dns string, skip bool) string { - if skip { - return dns - } else { - ip, err := net.ResolveIPAddr("ip", dns) - if err != nil { - return dns - } else { - return ip.String() +// }}} + +func (mmsd *mmsdService) isGroupIncluded(groupName string) bool { + for _, filterGroup := range mmsd.FilterGroups { + if groupName == filterGroup || filterGroup == "*" { + return true } } + return false } -func (mmsd *mmsdService) setupEventBusListener() { - var url = fmt.Sprintf("http://%v:%v/v2/events", - mmsd.MarathonIP, mmsd.MarathonPort) +func (mmsd *mmsdService) getMarathonApps() []marathon.App { + m, err := marathon.NewService(mmsd.MarathonIP, mmsd.MarathonPort) + if err != nil { + log.Printf("Failed to get marathon.Service: %v\n", err) + return nil + } - var sse = NewEventSource(url, mmsd.ReconnectDelay) + mApps, err := m.GetApps() + var mApps2 []marathon.App + for _, mApp := range mApps { + mApps2 = append(mApps2, *mApp) + } + + return mApps2 +} - sse.OnOpen = func(event, data string) { - log.Printf("Listening for events from Marathon on %v\n", url) +func (mmsd *mmsdService) getMarathonApp(appID string) (*marathon.App, error) { + m, err := marathon.NewService(mmsd.MarathonIP, mmsd.MarathonPort) + if err != nil { + return nil, err } - sse.OnError = func(event, data string) { - log.Printf("Marathon Event Stream Error. %v. %v\n", event, data) + app, err := m.GetApp(appID) + if err != nil { + return nil, err } - sse.AddEventListener("status_update_event", func(data string) { - var event marathon.StatusUpdateEvent - err := json.Unmarshal([]byte(data), &event) - if err != nil { - log.Printf("Failed to unmarshal status_update_event. %v\n", err) - log.Printf("status_update_event: %+v\n", data) - } else { - mmsd.statusUpdateEvent(&event) - } - }) + return app, nil +} - sse.AddEventListener("health_status_changed_event", func(data string) { - var event marathon.HealthStatusChangedEvent - err := json.Unmarshal([]byte(data), &event) - if err != nil { - log.Printf("Failed to unmarshal health_status_changed_event. %v\n", err) - } else { - mmsd.healthStatusChangedEvent(&event) +// convertMarathonApps converts an array of marathon.App into a []AppCluster. +func (mmsd *mmsdService) convertMarathonApps(mApps []marathon.App) []*core.AppCluster { + var apps []*core.AppCluster + for _, mApp := range mApps { + for portIndex, portDef := range mApp.PortDefinitions { + if mmsd.isGroupIncluded(portDef.Labels["lb-group"]) { + var healthCheck *core.AppHealthCheck + if mHealthCheck := util.FindHealthCheckForPortIndex(mApp.HealthChecks, portIndex); mHealthCheck != nil { + var mCommand *string + if mHealthCheck.Command != nil { + mCommand = new(string) + *mCommand = mHealthCheck.Command.Value + } + healthCheck = &core.AppHealthCheck{ + Protocol: mHealthCheck.Protocol, + Path: mHealthCheck.Path, + Command: mCommand, + GracePeriodSeconds: mHealthCheck.GracePeriodSeconds, + IntervalSeconds: mHealthCheck.IntervalSeconds, + TimeoutSeconds: mHealthCheck.TimeoutSeconds, + MaxConsecutiveFailures: mHealthCheck.MaxConsecutiveFailures, + IgnoreHttp1xx: mHealthCheck.IgnoreHttp1xx, + } + } + + var backends []core.AppBackend + for _, mTask := range mApp.Tasks { + backends = append(backends, core.AppBackend{ + Id: mTask.Id, + Host: mTask.Host, + Port: mTask.Ports[portIndex], + State: string(*mTask.State), + }) + } + // TODO: sort backends ASC + + labels := make(map[string]string) + for k, v := range mApp.Labels { + labels[k] = v + } + for k, v := range mApp.PortDefinitions[portIndex].Labels { + labels[k] = v + } + + servicePort := mApp.PortDefinitions[portIndex].Port + app := &core.AppCluster{ + Name: mApp.Id, + Id: util.PrettifyAppId(mApp.Id, portIndex, servicePort), + ServicePort: servicePort, + Protocol: mApp.PortDefinitions[portIndex].Protocol, + PortName: mApp.PortDefinitions[portIndex].Name, + Labels: labels, + HealthCheck: healthCheck, + Backends: backends, + PortIndex: portIndex, + } + apps = append(apps, app) + } } - }) - - go sse.RunForever() + } + return apps } -func (mmsd *mmsdService) statusUpdateEvent(event *marathon.StatusUpdateEvent) { - switch event.TaskStatus { - case marathon.TaskRunning: - app, err := mmsd.getMarathonApp(event.AppId) - if err != nil { - log.Printf("App %v task %v on %v is running but failed to fetch infos. %v\n", - event.AppId, event.TaskId, event.Host, err) - return - } - log.Printf("App %v task %v on %v changed status. %v.\n", event.AppId, event.TaskId, event.Host, event.TaskStatus) +func (mmsd *mmsdService) getAppByMarathonId(appId string, portIndex int) *core.AppCluster { + log.Printf("Application %v with port index %v not found.", + appId, portIndex) + return nil +} - // XXX Only update propagate no health checks have been configured. - // So we consider thie TASK_RUNNING state as healthy-notice. - if len(app.HealthChecks) == 0 { - mmsd.Update(event.AppId, event.TaskId, true) +// findAppsByMarathonId returns list of all applications that belong to the +// given Marathon App mAppId. +func (mmsd *mmsdService) findAppsByMarathonId(mAppId string) []*core.AppCluster { + var apps []*core.AppCluster + for _, app := range mmsd.apps { + if app.Name == mAppId { + apps = append(apps, app) } - case marathon.TaskFinished, marathon.TaskFailed, marathon.TaskKilling, marathon.TaskKilled, marathon.TaskLost: - log.Printf("App %v task %v on %v changed status. %v.\n", event.AppId, event.TaskId, event.Host, event.TaskStatus) - app, err := mmsd.getMarathonApp(event.AppId) - if err != nil { - log.Printf("Failed to fetch Marathon app. %+v. %v\n", event, err) - return - } - mmsd.Remove(event.AppId, event.TaskId, app) } + return apps } -func (mmsd *mmsdService) healthStatusChangedEvent(event *marathon.HealthStatusChangedEvent) { - app, err := mmsd.getMarathonApp(event.AppId) - if err != nil { - log.Printf("Failed to fetch Marathon app. %+v. %v\n", event, err) - return - } - if app == nil { - log.Printf("App %v not found anymore.\n", event.AppId) - return - } - - task := app.GetTaskById(event.TaskId) - if task == nil { - log.Printf("App %v task %v not found anymore.\n", event.AppId, event.TaskId) - mmsd.Remove(event.AppId, event.TaskId, app) - return - } +func (mmsd *mmsdService) setupEventBusListener() { + var url = fmt.Sprintf("http://%v:%v/v2/events", + mmsd.MarathonIP, mmsd.MarathonPort) - // app & task definitely do exist, so propagate health change event + var bus = sse.NewEventSource(url, mmsd.ReconnectDelay) - if event.Alive { - log.Printf("App %v task %v on %v is healthy.\n", event.AppId, event.TaskId, task.Host) - } else { - log.Printf("App %v task %v on %v is unhealthy.\n", event.AppId, event.TaskId, task.Host) - } + bus.OnOpen = mmsd.OnMarathonConnected + bus.OnError = mmsd.OnMarathonConnectionFailure + bus.AddEventListener("deployment_info", mmsd.DeploymentStarted) + //bus.AddEventListener("deployment_success", mmsd.DeploymentSuccess) + //bus.AddEventListener("deployment_failed", mmsd.DeploymentFailed) + bus.AddEventListener("status_update_event", mmsd.StatusUpdateEvent) + bus.AddEventListener("health_status_changed_event", mmsd.HealthStatusChangedEvent) - mmsd.Update(event.AppId, event.TaskId, event.Alive) + go bus.RunForever() } -func (mmsd *mmsdService) getMarathonApp(appID string) (*marathon.App, error) { - m, err := marathon.NewService(mmsd.MarathonIP, mmsd.MarathonPort) - if err != nil { - return nil, err - } - - app, err := m.GetApp(appID) - if err != nil { - return nil, err - } +func (mmsd *mmsdService) OnMarathonConnected(event, data string) { + log.Printf("Listening for events from Marathon on %v:%v\n", mmsd.MarathonIP, mmsd.MarathonPort) + mmsd.applyApps(mmsd.convertMarathonApps(mmsd.getMarathonApps())) +} - return app, nil +func (mmsd *mmsdService) OnMarathonConnectionFailure(event, data string) { + log.Printf("Marathon Event Stream Error. %v. %v\n", event, data) } -// enable/disable given app:task -func (mmsd *mmsdService) Update(appID string, taskID string, alive bool) { - m, err := marathon.NewService(mmsd.MarathonIP, mmsd.MarathonPort) +func (mmsd *mmsdService) DeploymentStarted(data string) { + var event marathon.DeploymentInfoEvent + err := json.Unmarshal([]byte(data), &event) if err != nil { - log.Printf("Update: NewService(%q, %v) failed. %v\n", mmsd.MarathonIP, mmsd.MarathonPort, err) + log.Printf("Failed to unmarshal DeploymentInfoEvent. %v", err) + log.Printf("deployment_info: %v", data) return } - app, err := m.GetApp(appID) + log.Printf("app: %v", data) + // for _, action := range event.CurrentStep.Actions { + // log.Printf("action: %+v", action) + // app := util.FindMarathonAppById(event.Plan.Target.Apps, action.App) + // log.Printf("app: %v", util.ConvertToJsonString(app)) + // } + // log.Printf("apps => %+v", event.Plan.Target.Apps) +} + +// StatusUpdateEvent is invoked by SSE when exactly this named event is fired. +func (mmsd *mmsdService) StatusUpdateEvent(data string) { + var event marathon.StatusUpdateEvent + err := json.Unmarshal([]byte(data), &event) if err != nil { - log.Printf("Update: GetApp(%q) failed. %v\n", appID, err) + log.Printf("Failed to unmarshal status_update_event. %v\n", err) + log.Printf("status_update_event: %+v\n", data) return } - for _, handler := range mmsd.Handlers { - err = handler.Update(app, taskID) + switch event.TaskStatus { + case marathon.TaskRunning: + app, err := mmsd.getMarathonApp(event.AppId) if err != nil { - log.Printf("Update failed. %v\n", err) + log.Printf("App %v task %v on %v is running but failed to fetch infos. %v\n", + event.AppId, event.TaskId, event.Host, err) + return } - } -} -func (mmsd *mmsdService) Remove(appID string, taskID string, app *marathon.App) { - for _, handler := range mmsd.Handlers { - err := handler.Remove(appID, taskID, app) - if err != nil { - log.Printf("Remove failed. %v\n", err) + log.Printf( + "App %v task %v on %v changed status. %v. %v\n", + event.AppId, event.TaskId, event.Host, event.TaskStatus, event.Message) + + // XXX Only update propagate no health checks have been configured. + // So we consider thie TASK_RUNNING state as healthy-notice. + if len(app.HealthChecks) == 0 { + mmsd.AddTask( + event.AppId, + event.TaskId, + string(event.TaskStatus), + event.Host, + event.Ports) + } else { + // Remember the update event so as soon as the task becomes healthy, + // we can send a full AddTask() event to the handlers. + mmsd.taskEvents[event.TaskId] = event + } + case marathon.TaskKilling: + log.Printf("App %v task %v on %v changed status. %v.\n", event.AppId, event.TaskId, event.Host, event.TaskStatus) + mmsd.killingTasks[event.TaskId] = true + mmsd.RemoveTask(event.AppId, event.TaskId, event.TaskStatus) + case marathon.TaskFinished, marathon.TaskFailed, marathon.TaskKilled, marathon.TaskLost: + log.Printf("App %v task %v on %v changed status. %v.\n", event.AppId, event.TaskId, event.Host, event.TaskStatus) + delete(mmsd.taskEvents, event.TaskId) + if !mmsd.killingTasks[event.TaskId] { + mmsd.RemoveTask(event.AppId, event.TaskId, event.TaskStatus) + } else { + delete(mmsd.killingTasks, event.TaskId) } } } -func (mmsd *mmsdService) MaybeResetFromTasks(force bool) error { - m, err := marathon.NewService(mmsd.MarathonIP, mmsd.MarathonPort) +// HealthStatusChangedEvent is invoked by SSE when exactly this named event is fired. +func (mmsd *mmsdService) HealthStatusChangedEvent(data string) { + var event marathon.HealthStatusChangedEvent + err := json.Unmarshal([]byte(data), &event) if err != nil { - return fmt.Errorf("Could not create new marathon service. %v", err) + log.Printf("Failed to unmarshal health_status_changed_event. %v\n", err) + } else if event.Alive { + if taskUpdateEvent, ok := mmsd.taskEvents[event.TaskId]; ok { + log.Printf("taskUpdateEvent: %+v", taskUpdateEvent) + // for portIndex, port := range taskUpdateEvent.Ports { + // task := &core.AppBackend{ + // Id: event.TaskId, + // Host: taskUpdateEvent.Host, + // Port: port, + // State: string(taskUpdateEvent.TaskStatus), + // } + // } + // TODO: feed mmsd.apps[appIds] with tasks + // TODO mmsd.AddTask(event.AppId, event.TaskId) + } + } else { + mmsd.RemoveTask(event.AppId, event.TaskId, "TASK_RUNNING") } +} - apps, err := m.GetApps() - if err != nil { - return fmt.Errorf("Could not get apps. %v", err) +// AddTask ensures the given task is added to the app and all handlers are +// notified as well. +func (mmsd *mmsdService) AddTask(appId, taskId, taskStatus, host string, ports []uint) { + for portIndex, port := range ports { + if app := mmsd.getAppByMarathonId(appId, portIndex); app != nil { + task := core.AppBackend{ + Id: taskId, + Host: host, + Port: port, + State: string(taskStatus), + } + app.Backends = append(app.Backends, task) + + for _, handler := range mmsd.Handlers { + handler.AddTask(&task, app) + } + } } +} - for _, handler := range mmsd.Handlers { - err = handler.Apply(apps, force) - if err != nil { - log.Printf("Failed to apply changes to handler. %v\n", err) +func (mmsd *mmsdService) RemoveTask(appId, taskId string, newStatus marathon.TaskStatus) bool { + var found uint + for _, app := range mmsd.findAppsByMarathonId(appId) { + for i, task := range app.Backends { + if task.Id == taskId { + // update task state, and remove task out of app cluster's task list + task.State = string(newStatus) + app.Backends = append(app.Backends[:0], app.Backends[i+1:]...) + + for _, handler := range mmsd.Handlers { + handler.RemoveTask(&task, app) + } + found++ + } } } - - return nil + return found > 0 } const appVersion = "0.9.12" @@ -451,17 +519,19 @@ func showVersion() { } func (mmsd *mmsdService) Run() { + var filterGroups = "*" flag.BoolVarP(&mmsd.Verbose, "verbose", "v", mmsd.Verbose, "Set verbosity level") flag.IPVar(&mmsd.MarathonIP, "marathon-ip", mmsd.MarathonIP, "Marathon endpoint TCP IP address") flag.UintVar(&mmsd.MarathonPort, "marathon-port", mmsd.MarathonPort, "Marathon endpoint TCP port number") flag.DurationVar(&mmsd.ReconnectDelay, "reconnect-delay", mmsd.ReconnectDelay, "Marathon reconnect delay") flag.StringVar(&mmsd.RunStateDir, "run-state-dir", mmsd.RunStateDir, "Path to directory to keep run-state") - flag.StringVar(&mmsd.FilterGroups, "filter-groups", mmsd.FilterGroups, "Application group filter") + flag.StringVar(&filterGroups, "filter-groups", filterGroups, "Application group filter") flag.IPVar(&mmsd.ManagedIP, "managed-ip", mmsd.ManagedIP, "IP-address to manage for mmsd") flag.BoolVar(&mmsd.GatewayEnabled, "enable-gateway", mmsd.GatewayEnabled, "Enables gateway support") flag.IPVar(&mmsd.GatewayAddr, "gateway-bind", mmsd.GatewayAddr, "gateway bind address") flag.UintVar(&mmsd.GatewayPortHTTP, "gateway-port-http", mmsd.GatewayPortHTTP, "gateway port for HTTP") flag.UintVar(&mmsd.GatewayPortHTTPS, "gateway-port-https", mmsd.GatewayPortHTTPS, "gateway port for HTTPS") + flag.BoolVar(&mmsd.EventLoggerEnabled, "enable-eventlogger", mmsd.EventLoggerEnabled, "enables eventlogger module") flag.BoolVar(&mmsd.FilesEnabled, "enable-files", mmsd.FilesEnabled, "enables file based service discovery") flag.BoolVar(&mmsd.UDPEnabled, "enable-udp", mmsd.UDPEnabled, "enables UDP load balancing") flag.BoolVar(&mmsd.TCPEnabled, "enable-tcp", mmsd.TCPEnabled, "enables haproxy TCP load balancing") @@ -470,11 +540,11 @@ func (mmsd *mmsdService) Run() { flag.StringVar(&mmsd.HaproxyTailCfg, "haproxy-cfgtail", mmsd.HaproxyTailCfg, "path to haproxy tail config file") flag.IPVar(&mmsd.ServiceAddr, "haproxy-bind", mmsd.ServiceAddr, "haproxy management port") flag.UintVar(&mmsd.HaproxyPort, "haproxy-port", mmsd.HaproxyPort, "haproxy management port") - flag.BoolVar(&mmsd.DnsEnabled, "enable-dns", mmsd.DnsEnabled, "Enables DNS-based service discovery") - flag.UintVar(&mmsd.DnsPort, "dns-port", mmsd.DnsPort, "DNS service discovery port") - flag.BoolVar(&mmsd.DnsPushSRV, "dns-push-srv", mmsd.DnsPushSRV, "DNS service discovery to also push SRV on A") - flag.StringVar(&mmsd.DnsBaseName, "dns-basename", mmsd.DnsBaseName, "DNS service discovery's base name") - flag.DurationVar(&mmsd.DnsTTL, "dns-ttl", mmsd.DnsTTL, "DNS service discovery's reply message TTL") + flag.BoolVar(&mmsd.DNSEnabled, "enable-dns", mmsd.DNSEnabled, "Enables DNS-based service discovery") + flag.UintVar(&mmsd.DNSPort, "dns-port", mmsd.DNSPort, "DNS service discovery port") + flag.BoolVar(&mmsd.DNSPushSRV, "dns-push-srv", mmsd.DNSPushSRV, "DNS service discovery to also push SRV on A") + flag.StringVar(&mmsd.DNSBaseName, "dns-basename", mmsd.DNSBaseName, "DNS service discovery's base name") + flag.DurationVar(&mmsd.DNSTTL, "dns-ttl", mmsd.DNSTTL, "DNS service discovery's reply message TTL") showVersionAndExit := flag.BoolP("version", "V", false, "Shows version and exits") flag.Usage = func() { @@ -486,6 +556,8 @@ func (mmsd *mmsdService) Run() { flag.Parse() + mmsd.FilterGroups = strings.Split(filterGroups, ",") + if *showVersionAndExit { showVersion() os.Exit(0) @@ -496,40 +568,60 @@ func (mmsd *mmsdService) Run() { mmsd.setupHttpService() <-mmsd.quitChannel + + for _, handler := range mmsd.Handlers { + handler.Shutdown() + } +} + +func (mmsd *mmsdService) Shutdown() { + mmsd.quitChannel <- true +} + +func (mmsd *mmsdService) applyApps(apps []*core.AppCluster) { + mmsd.apps = apps + + for _, handler := range mmsd.Handlers { + handler.Apply(apps) + } } func (mmsd *mmsdService) setupHandlers() { - if mmsd.DnsEnabled { - mmsd.Handlers = append(mmsd.Handlers, &DnsManager{ + if mmsd.EventLoggerEnabled { + mmsd.Handlers = append(mmsd.Handlers, &modules.EventLoggerModule{ + Verbose: mmsd.Verbose, + }) + } + + if mmsd.DNSEnabled { + mmsd.Handlers = append(mmsd.Handlers, &modules.DNSModule{ Verbose: mmsd.Verbose, ServiceAddr: mmsd.ServiceAddr, - ServicePort: mmsd.DnsPort, - PushSRV: mmsd.DnsPushSRV, - BaseName: mmsd.DnsBaseName, - DnsTTL: mmsd.DnsTTL, + ServicePort: mmsd.DNSPort, + PushSRV: mmsd.DNSPushSRV, + BaseName: mmsd.DNSBaseName, + DNSTTL: mmsd.DNSTTL, }) } - if mmsd.UDPEnabled { - mmsd.Handlers = append(mmsd.Handlers, NewUdpManager( - mmsd.ServiceAddr, - mmsd.Verbose, - mmsd.UDPEnabled, - )) - } + // if mmsd.UDPEnabled { + // mmsd.Handlers = append(mmsd.Handlers, NewUdpManager( + // mmsd.ServiceAddr, + // mmsd.Verbose, + // mmsd.UDPEnabled, + // )) + // } if mmsd.TCPEnabled { - mmsd.Handlers = append(mmsd.Handlers, &HaproxyMgr{ - Enabled: mmsd.TCPEnabled, + mmsd.Handlers = append(mmsd.Handlers, &modules.HaproxyModule{ Verbose: mmsd.Verbose, LocalHealthChecks: mmsd.LocalHealthChecks, - FilterGroups: strings.Split(mmsd.FilterGroups, ","), ServiceAddr: mmsd.ServiceAddr, GatewayEnabled: mmsd.GatewayEnabled, GatewayAddr: mmsd.GatewayAddr, GatewayPortHTTP: mmsd.GatewayPortHTTP, GatewayPortHTTPS: mmsd.GatewayPortHTTPS, - Executable: mmsd.HaproxyBin, + HaproxyExe: mmsd.HaproxyBin, ConfigTailPath: mmsd.HaproxyTailCfg, ConfigPath: filepath.Join(mmsd.RunStateDir, "haproxy.cfg"), OldConfigPath: filepath.Join(mmsd.RunStateDir, "haproxy.cfg.old"), @@ -541,24 +633,14 @@ func (mmsd *mmsdService) setupHandlers() { } if mmsd.FilesEnabled { - mmsd.Handlers = append(mmsd.Handlers, &FilesManager{ - Enabled: mmsd.FilesEnabled, + mmsd.Handlers = append(mmsd.Handlers, &modules.FilesManager{ Verbose: mmsd.Verbose, BasePath: mmsd.RunStateDir + "/confd", }) } for _, handler := range mmsd.Handlers { - err := handler.Setup() - if err != nil { - log.Fatalf("Failed to setup handlers. %v\n", err) - } - } - - // trigger initial run - err := mmsd.MaybeResetFromTasks(true) - if err != nil { - log.Printf("Could not force task state reset. %v\n", err) + handler.Startup() } } @@ -574,34 +656,45 @@ func locateExe(name string) string { func main() { var mmsd = mmsdService{ - MarathonScheme: "http", - MarathonIP: net.ParseIP("127.0.0.1"), - MarathonPort: 8080, - ReconnectDelay: time.Second * 4, - RunStateDir: "/var/run/mmsd", - FilterGroups: "*", - GatewayEnabled: false, - GatewayAddr: net.ParseIP("0.0.0.0"), - GatewayPortHTTP: 80, - GatewayPortHTTPS: 443, - FilesEnabled: true, - UDPEnabled: true, - TCPEnabled: true, - LocalHealthChecks: true, - HaproxyBin: locateExe("haproxy"), - HaproxyTailCfg: "/etc/mmsd/haproxy-tail.cfg", - HaproxyPort: 8081, - ManagementAddr: net.ParseIP("0.0.0.0"), - ServiceAddr: net.ParseIP("0.0.0.0"), - HttpApiPort: 8082, - Verbose: false, - DnsEnabled: false, - DnsPort: 53, - DnsPushSRV: false, - DnsBaseName: "mmsd.", - DnsTTL: time.Second * 5, - quitChannel: make(chan bool), - } + MarathonScheme: "http", + MarathonIP: net.ParseIP("127.0.0.1"), + MarathonPort: 8080, + ReconnectDelay: time.Second * 4, + RunStateDir: "/var/run/mmsd", + GatewayEnabled: false, + GatewayAddr: net.ParseIP("0.0.0.0"), + GatewayPortHTTP: 80, + GatewayPortHTTPS: 443, + EventLoggerEnabled: false, + FilesEnabled: false, + UDPEnabled: false, + TCPEnabled: false, + LocalHealthChecks: true, + HaproxyBin: locateExe("haproxy"), + HaproxyTailCfg: "/etc/mmsd/haproxy-tail.cfg", + HaproxyPort: 8081, + ManagementAddr: net.ParseIP("0.0.0.0"), + ServiceAddr: net.ParseIP("0.0.0.0"), + HttpApiPort: 8082, + Verbose: false, + DNSEnabled: false, + DNSPort: 53, + DNSPushSRV: false, + DNSBaseName: "mmsd.", + DNSTTL: time.Second * 5, + quitChannel: make(chan bool), + taskEvents: make(map[string]marathon.StatusUpdateEvent), + killingTasks: make(map[string]bool), + } + + // trap SIGTERM and SIGINT + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT) + go func() { + s := <-sigc + log.Printf("Caught signal %v. Terminating", s) + mmsd.Shutdown() + }() mmsd.Run() } diff --git a/modules/dns.go b/modules/dns.go new file mode 100644 index 0000000..64caff2 --- /dev/null +++ b/modules/dns.go @@ -0,0 +1,213 @@ +package modules + +// DNS based service discovery +// --------------------------------------------------------------------------- +// +// Marathon: "/path/to/application" +// DNS-query: application.to.path.$basedomain (A, AAAA, TXT, SRV) +// DNS-reply: +// A => list of IPv4 addresses +// AAAA => list of IPv6 addresses +// SRV => ip:port array per task +// TXT => app labels + +// TODO: DNS forwarding +// TODO: DNS proxy cache (for speeding up) + +import ( + "fmt" + "log" + "net" + "strings" + "sync" + "time" + + "github.com/dawanda/mmsd/core" + "github.com/miekg/dns" +) + +// DNSModule provide that is pluginable in the EventListener +type DNSModule struct { + Verbose bool + ServiceAddr net.IP + ServicePort uint + BaseName string + DNSTTL time.Duration + PushSRV bool + udpServer *dns.Server + tcpServer *dns.Server + db map[string]*dbEntry + dbMutex sync.Mutex +} + +type dbEntry struct { + ipAddresses []net.IP + app *core.AppCluster +} + +// Startup bring up a DNS server list UDP and TCP connections +func (module *DNSModule) Startup() { + log.Printf("DNS Server base name: %s", module.BaseName) + dns.HandleFunc(module.BaseName, module.dnsHandler) + + go func() { + module.udpServer = &dns.Server{ + Addr: fmt.Sprintf("%v:%v", module.ServiceAddr, module.ServicePort), + Net: "udp", + TsigSecret: nil, + } + log.Printf("DNS Server listen on %v:%v UDP", module.ServiceAddr, module.ServicePort) + err := module.udpServer.ListenAndServe() + if err != nil { + log.Fatal(err) + } + }() + + go func() { + module.tcpServer = &dns.Server{ + Addr: fmt.Sprintf("%v:%v", module.ServiceAddr, module.ServicePort), + Net: "tcp", + TsigSecret: nil, + } + log.Printf("DNS Server listen on %v:%v TCP", module.ServiceAddr, module.ServicePort) + err := module.tcpServer.ListenAndServe() + if err != nil { + log.Fatal(err) + } + }() +} + +// Shutdown stop the DNS server +func (module *DNSModule) Shutdown() { + module.udpServer.Shutdown() + module.tcpServer.Shutdown() +} + +// Apply bootstrap the data store from list of AppCluster +func (module *DNSModule) Apply(apps []*core.AppCluster) { + log.Printf("DNS Apply : initialize %d apps", len(apps)) + module.dbMutex.Lock() + module.db = make(map[string]*dbEntry) + module.dbMutex.Unlock() + + for _, app := range apps { + err := module.update(app) + if err != nil { + return + } + } +} + +// AddTask replace or add the AppCluster in the data store +func (module *DNSModule) AddTask(task *core.AppBackend, app *core.AppCluster) { + log.Printf("DNS AddTask : %v, %v", task, app) + module.update(app) +} + +func (module *DNSModule) update(app *core.AppCluster) error { + var ipAddresses []net.IP + + for _, backend := range app.Backends { + ip, err := net.ResolveIPAddr("ip", backend.Host) + if err != nil { + return err + } + ipAddresses = append(ipAddresses, ip.IP) + } + + var reversed = module.makeDNSNameFromAppName(app.Name) + var entry = &dbEntry{ + ipAddresses: ipAddresses, + app: app, + } + + module.dbMutex.Lock() + module.db[reversed] = entry + module.dbMutex.Unlock() + + return nil +} + +// RemoveTask replace or remove the AppCluster from data store +func (module *DNSModule) RemoveTask(task *core.AppBackend, app *core.AppCluster) { + log.Printf("DNS RemoveTask : %v, %v", task, app) + module.update(app) +} + +func (module *DNSModule) dnsHandler(w dns.ResponseWriter, req *dns.Msg) { + m := new(dns.Msg) + m.SetReply(req) + + name := req.Question[0].Name + name = strings.TrimSuffix(name, "."+module.BaseName) + + module.dbMutex.Lock() + entry, ok := module.db[name] + module.dbMutex.Unlock() + + if ok { + switch req.Question[0].Qtype { + case dns.TypeSRV: + m.Answer = module.makeAllSRV(req.Question[0].Name, entry) + case dns.TypeA: + m.Answer = module.makeAllA(req.Question[0].Name, entry) + if module.PushSRV { + m.Extra = module.makeAllSRV(req.Question[0].Name, entry) + } + } + } + + w.WriteMsg(m) +} + +func (module *DNSModule) makeAllA(name string, entry *dbEntry) []dns.RR { + var result []dns.RR + + for _, ip := range entry.ipAddresses { + rr := &dns.A{ + Hdr: dns.RR_Header{ + Ttl: uint32(module.DNSTTL.Seconds()), + Name: name, + Class: dns.ClassINET, + Rrtype: dns.TypeA, + }, + A: ip.To4(), + } + result = append(result, rr) + } + + return result +} + +func (module *DNSModule) makeAllSRV(name string, entry *dbEntry) []dns.RR { + var result []dns.RR + + for _, task := range entry.app.Backends { + rr := &dns.SRV{ + Hdr: dns.RR_Header{ + Ttl: uint32(module.DNSTTL.Seconds()), + Name: name, + Class: dns.ClassINET, + Rrtype: dns.TypeSRV, + }, + Port: uint16(task.Port), + Target: task.Host + ".", + Weight: 1, + Priority: 1, + } + result = append(result, rr) + } + + return result +} + +func (module *DNSModule) makeDNSNameFromAppName(appName string) string { + var parts = strings.Split(appName, "/")[1:] + var reversedParts []string + for i := range parts { + reversedParts = append(reversedParts, parts[len(parts)-i-1]) + } + var reversed = strings.Join(reversedParts, ".") + + return reversed +} diff --git a/modules/eventlogger.go b/modules/eventlogger.go new file mode 100644 index 0000000..5855ac4 --- /dev/null +++ b/modules/eventlogger.go @@ -0,0 +1,46 @@ +package modules + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/dawanda/mmsd/core" +) + +/* EventLoggerModule adds simple event logging to the logger. + */ +type EventLoggerModule struct { + Verbose bool +} + +func (logger *EventLoggerModule) Startup() { + log.Printf("eventlogger: Initialize\n") +} + +func (logger *EventLoggerModule) Shutdown() { + log.Printf("eventlogger: Shutdown\n") +} + +func (logger *EventLoggerModule) Apply(apps []*core.AppCluster) { + if logger.Verbose { + out, err := json.MarshalIndent(apps, "", " ") + if err != nil { + log.Printf("eventlogger: Marshal failed. %v\n", err) + } else { + fmt.Printf("eventlogger: %v\n", string(out)) + } + } + + for _, app := range apps { + log.Printf("eventlogger: Apply: %v\n", app.Id) + } +} + +func (logger *EventLoggerModule) AddTask(task *core.AppBackend, app *core.AppCluster) { + log.Printf("eventlogger: Task Add: %v: %v %v\n", task.State, app.Id, task.Host) +} + +func (logger *EventLoggerModule) RemoveTask(task *core.AppBackend, app *core.AppCluster) { + log.Printf("eventlogger: Task Remove: %v: %v %v\n", task.State, app.Id, task.Host) +} diff --git a/modules/files.go b/modules/files.go new file mode 100644 index 0000000..ea0d625 --- /dev/null +++ b/modules/files.go @@ -0,0 +1,147 @@ +package modules + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "strings" + + "github.com/dawanda/mmsd/core" + "github.com/dawanda/mmsd/util" +) + +type FilesManager struct { + Verbose bool + BasePath string +} + +func (upstream *FilesManager) Log(msg string) { + if upstream.Verbose { + log.Printf("upstream: %v\n", msg) + } +} + +func (manager *FilesManager) Startup() { +} + +func (manager *FilesManager) Shutdown() { +} + +func (manager *FilesManager) RemoveTask(task *core.AppBackend, app *core.AppCluster) { + if app != nil { + manager.writeApp(app) + } else { + // TODO: remove files for app-$portIndex + } +} + +func (upstream *FilesManager) AddTask(task *core.AppBackend, app *core.AppCluster) { + upstream.writeApp(app) +} + +func (upstream *FilesManager) Apply(apps []*core.AppCluster) { + err := os.MkdirAll(upstream.BasePath, 0770) + if err != nil { + log.Printf("Failed to mkdir. %v", err) + return + } + + var newFiles []string + oldFiles, err := upstream.collectFiles() + if err != nil { + log.Printf("Failed to collect files. %v", err) + return + } + + for _, app := range apps { + filenames, _ := upstream.writeApp(app) + newFiles = append(newFiles, filenames...) + } + + // check for superfluous files + diff := util.FindMissing(oldFiles, newFiles) + for _, superfluous := range diff { + upstream.Log(fmt.Sprintf("Removing superfluous file: %v\n", superfluous)) + os.Remove(superfluous) + } +} + +func (upstream *FilesManager) writeApp(app *core.AppCluster) ([]string, error) { + var files []string + + app_id := app.Id + cfgfile := filepath.Join(upstream.BasePath, app_id+".instances") + tmpfile := cfgfile + ".tmp" + + err := upstream.writeFile(tmpfile, app_id, app) + if err != nil { + return files, err + } + files = append(files, cfgfile) + + if _, err := os.Stat(cfgfile); os.IsNotExist(err) { + upstream.Log(fmt.Sprintf("new %v", cfgfile)) + os.Rename(tmpfile, cfgfile) + } else if !util.FileIsIdentical(tmpfile, cfgfile) { + upstream.Log(fmt.Sprintf("refresh %v", cfgfile)) + os.Rename(tmpfile, cfgfile) + } else { + // new file is identical to already existing one + os.Remove(tmpfile) + } + return files, nil +} + +func getApplicationProtocol1(app *core.AppCluster) string { + if proto := strings.ToLower(app.Labels["proto"]); len(proto) != 0 { + return proto + } + + if app.HealthCheck != nil && len(app.HealthCheck.Protocol) != 0 { + return strings.ToLower(app.HealthCheck.Protocol) + } + + if len(app.Protocol) != 0 { + return strings.ToLower(app.Protocol) + } + + return "tcp" +} + +func (upstream *FilesManager) writeFile(filename string, appId string, + app *core.AppCluster) error { + + var b bytes.Buffer + b.WriteString(fmt.Sprintf("Service-Name: %v\r\n", appId)) + b.WriteString(fmt.Sprintf("Service-Port: %v\r\n", app.ServicePort)) + b.WriteString(fmt.Sprintf("Service-Transport-Proto: %v\r\n", app.Protocol)) + b.WriteString(fmt.Sprintf("Service-Application-Proto: %v\r\n", getApplicationProtocol1(app))) + if app.HealthCheck != nil && len(app.HealthCheck.Protocol) != 0 { + b.WriteString(fmt.Sprintf("Health-Check-Proto: %v\r\n", strings.ToLower(app.HealthCheck.Protocol))) + } + b.WriteString("\r\n") + + for _, task := range app.Backends { + b.WriteString(fmt.Sprintf("%v:%v\n", task.Host, task.Port)) + } + + return ioutil.WriteFile(filename, b.Bytes(), 0660) +} + +func (upstream *FilesManager) collectFiles() ([]string, error) { + fileInfos, err := ioutil.ReadDir(upstream.BasePath) + if err != nil { + upstream.Log(fmt.Sprintf("Error reading directory %v. %v", upstream.BasePath, err)) + return nil, err + } + + var fileNames []string + for _, fileInfo := range fileInfos { + fileNames = append(fileNames, filepath.Join(upstream.BasePath, fileInfo.Name())) + } + + return fileNames, nil +} diff --git a/modules/haproxy.go b/modules/haproxy.go new file mode 100644 index 0000000..57530c1 --- /dev/null +++ b/modules/haproxy.go @@ -0,0 +1,590 @@ +package modules + +import ( + "errors" + "fmt" + "io/ioutil" + "log" + "net" + "os" + "os/exec" + "runtime" + "sort" + "strconv" + "strings" + "syscall" + + "github.com/dawanda/mmsd/core" + "github.com/dawanda/mmsd/util" +) + +const ( + LB_PROXY_PROTOCOL = "lb-proxy-protocol" + LB_ACCEPT_PROXY = "lb-accept-proxy" + LB_VHOST_HTTP = "lb-vhost" + LB_VHOST_DEFAULT_HTTP = "lb-vhost-default" + LB_VHOST_HTTPS = "lb-vhost-ssl" + LB_VHOST_DEFAULT_HTTPS = "lb-vhost-default-ssl" + LB_DISABLED = "lb-disabled" +) + +var ( + ErrBadExit = errors.New("Bad Process Exit.") +) + +type HaproxyModule struct { + Verbose bool // log verbose logging messages? + LocalHealthChecks bool // wether or not to perform local health checks or rely on Marathon events. + ServiceAddr net.IP // IP address the load balancers should bind on. + GatewayEnabled bool // Enables/disables HTTP/S gateway support + GatewayAddr net.IP // HTTP gateway bind IP address + GatewayPortHTTP uint // HTTP gateway HTTP port (usually 80) + GatewayPortHTTPS uint // HTTP gateway HTTPS port (usually 443) + HaproxyExe string // path to haproxy executable + ConfigPath string // path to generated config file (/var/run/mmsd/haproxy.cfg) + ConfigTailPath string // path to appended haproxy config file (/etc/mmsd/haproxy-tail.cfg) + OldConfigPath string // path to previousely generated config file + PidFile string // path to PID-file for currently running haproxy executable + ManagementAddr net.IP // haproxy stats listen addr + ManagementPort uint // haproxy stats port + AdminSockPath string // path to haproxy admin socket + + vhostsHTTP map[string][]string // [appId] = []vhost + vhostDefaultHTTP string // default HTTP vhost + vhostsHTTPS map[string][]string // [appId] = []vhost + vhostDefaultHTTPS string // default HTTPS vhost + appConfigCache map[string]string // [appId] = haproxy_config_fragment +} + +func (module *HaproxyModule) Startup() { + module.vhostsHTTP = make(map[string][]string) + module.vhostDefaultHTTP = "" + + module.vhostsHTTPS = make(map[string][]string) + module.vhostDefaultHTTPS = "" + + module.appConfigCache = make(map[string]string) +} + +func (module *HaproxyModule) Shutdown() { +} + +func (module *HaproxyModule) Apply(apps []*core.AppCluster) { + for _, app := range apps { + if module.supportsProtocol(app.Protocol) { + module.appConfigCache[app.Id] = module.makeConfig(app) + } + } + + module.Commit() +} + +func (module *HaproxyModule) Commit() { + err := module.writeConfig() + if err != nil { + log.Printf("Failed to write config. %v", err) + return + } + + err = module.reloadConfig() + if err != nil { + log.Printf("Reloading configuration failed. %v", err) + } +} + +func (module *HaproxyModule) AddTask(task *core.AppBackend, app *core.AppCluster) { + if !module.supportsProtocol(app.Protocol) { + return + } + log.Printf("Haproxy: AddTask()") + // TODO +} + +func (module *HaproxyModule) RemoveTask(task *core.AppBackend, app *core.AppCluster) { + log.Printf("Haproxy: RemoveTask()") + // TODO +} + +func (module *HaproxyModule) makeConfig(app *core.AppCluster) string { + module.updateGatewaySettings(app) + + // generate haproxy config fragment + result := "" + serverOpts := "" + bindOpts := "" + + if runtime.GOOS == "linux" { // only enable on Linux (known to work) + bindOpts += " defer-accept" + } + + if util.Atoi(app.Labels[LB_ACCEPT_PROXY], 0) != 0 { + bindOpts += " accept-proxy" + } + + if module.LocalHealthChecks { + serverOpts += " check" + } + + if app.HealthCheck != nil && app.HealthCheck.IntervalSeconds > 0 { + serverOpts += fmt.Sprintf(" inter %v", app.HealthCheck.IntervalSeconds*1000) + } + + switch util.Atoi(app.Labels[LB_PROXY_PROTOCOL], 0) { + case 2: + serverOpts += " send-proxy-v2" + case 1: + serverOpts += " send-proxy" + case 0: + // ignore + default: + log.Printf("Invalid proxy-protocol given for %v: %v - ignoring.", + app.Id, app.Labels["lb-proxy-protocol"]) + } + + switch util.Atoi(app.Labels[LB_PROXY_PROTOCOL], 0) { + case 2: + serverOpts += " send-proxy-v2" + case 1: + serverOpts += " send-proxy" + case 0: + // ignore + default: + log.Printf("Invalid proxy-protocol given for %v: %v - ignoring.", + app.Id, app.Labels["lb-proxy-protocol"]) + } + + var appProtocol = module.getAppProtocol(app) + switch appProtocol { + case "HTTP": + result += fmt.Sprintf( + "frontend __frontend_%v\n"+ + " bind %v:%v%v\n"+ + " option dontlognull\n"+ + " default_backend %v\n"+ + "\n"+ + "backend %v\n"+ + " mode http\n"+ + " balance leastconn\n"+ + " option forwardfor\n"+ + " option http-server-close\n"+ + " option abortonclose\n", + app.Id, module.ServiceAddr, app.ServicePort, bindOpts, app.Id, app.Id) + + if module.LocalHealthChecks && app.HealthCheck != nil { + result += fmt.Sprintf( + " option httpchk GET %v HTTP/1.1\\r\\nHost:\\ %v\n", + app.HealthCheck.Path, "health-check") + } + default: + result += fmt.Sprintf( + "listen %v\n"+ + " bind %v:%v%v\n"+ + " option dontlognull\n"+ + " mode tcp\n"+ + " balance leastconn\n", + app.Id, module.ServiceAddr, app.ServicePort, bindOpts) + } + + result += " option redispatch\n" + result += " retries 1\n" + + for _, task := range app.Backends { + if task.State == "TASK_RUNNING" { + result += fmt.Sprintf( + " server %v:%v %v:%v%v\n", + task.Host, task.Port, // taskLabel == "$host:$port" + util.SoftResolveIPAddr(task.Host), + task.Port, + serverOpts) + } else { + log.Printf("Haproxy: skip task %v (%v).", task.Id, task.State) + } + } + + result += "\n" + + return result +} + +func (module *HaproxyModule) updateGatewaySettings(app *core.AppCluster) { + // update HTTP virtual hosting + var lbVirtualHosts = util.MakeStringArray(app.Labels[LB_VHOST_HTTP]) + if len(lbVirtualHosts) != 0 { + module.vhostsHTTP[app.Id] = lbVirtualHosts + if app.Labels[LB_VHOST_DEFAULT_HTTP] == "1" { + module.vhostDefaultHTTP = app.Id + } + } else { + delete(module.vhostsHTTP, app.Id) + if module.vhostDefaultHTTP == app.Id { + module.vhostDefaultHTTP = "" + } + } + + // update HTTPS virtual hosting + lbVirtualHosts = util.MakeStringArray(app.Labels[LB_VHOST_HTTPS]) + if len(lbVirtualHosts) != 0 { + module.vhostsHTTPS[app.Id] = lbVirtualHosts + if app.Labels[LB_VHOST_DEFAULT_HTTPS] == "1" { + module.vhostDefaultHTTPS = app.Id + } + } else { + delete(module.vhostsHTTPS, app.Id) + if module.vhostDefaultHTTPS == app.Id { + module.vhostDefaultHTTPS = "" + } + } +} + +func (module *HaproxyModule) writeConfig() error { + config := "" + + config += module.makeConfigHead() + + // add apps sorted to config + var clusterNames []string + for name, _ := range module.appConfigCache { + clusterNames = append(clusterNames, name) + } + sort.Strings(clusterNames) + for _, name := range clusterNames { + config += module.appConfigCache[name] + } + + config += module.makeConfigTail() + + // write file + tempConfigFile := fmt.Sprintf("%v.tmp", module.ConfigPath) + err := ioutil.WriteFile(tempConfigFile, []byte(config), 0666) + if err != nil { + return err + } + + err = module.checkConfig(tempConfigFile) + if err != nil { + return err + } + + // if config file previousely did exist, attempt a rename + if _, err := os.Stat(module.ConfigPath); err == nil { + if err = os.Rename(module.ConfigPath, module.OldConfigPath); err != nil { + return err + } + } + + return os.Rename(tempConfigFile, module.ConfigPath) +} + +func (module *HaproxyModule) makeConfigHead() string { + config := "" + + config += fmt.Sprintf( + "# This is an auto generated haproxy configuration!!!\n"+ + "global\n"+ + " maxconn 32768\n"+ + " maxconnrate 32768\n"+ + " log 127.0.0.1 local0\n"+ + " stats socket %v mode 600 level admin\n"+ + "\n"+ + "defaults\n"+ + " maxconn 32768\n"+ + " timeout client 90000\n"+ + " timeout server 90000\n"+ + " timeout connect 90000\n"+ + " timeout queue 90000\n"+ + " timeout http-request 90000\n"+ + "\n", module.AdminSockPath) + + config += fmt.Sprintf( + "listen haproxy\n"+ + " bind %v:%v\n"+ + " mode http\n"+ + " stats enable\n"+ + " stats uri /\n"+ + " stats admin if TRUE\n"+ + " monitor-uri /haproxy?monitor\n"+ + "\n", + module.ManagementAddr, module.ManagementPort) + + if module.GatewayEnabled { + config += module.makeGatewayHTTP() + config += module.makeGatewayHTTPS() + } + return config +} + +func (module *HaproxyModule) makeGatewayHTTP() string { + var ( + suffixRoutes map[string]string = make(map[string]string) + suffixMatches []string + exactRoutes map[string]string = make(map[string]string) + exactMatches []string + vhostDefault string + port uint = module.GatewayPortHTTP + ) + + for _, appID := range SortedVhostsKeys(module.vhostsHTTP) { + vhosts := module.vhostsHTTP[appID] + for _i, vhost := range vhosts { + log.Printf("[haproxy] appID:%v, vhost:%v, i:%v\n", appID, vhost, _i) + matchToken := "vhost_" + vhost + matchToken = strings.Replace(matchToken, ".", "_", -1) + matchToken = strings.Replace(matchToken, "*", "STAR", -1) + + if len(vhost) >= 3 && vhost[0] == '*' && vhost[1] == '.' { + suffixMatches = append(suffixMatches, + fmt.Sprintf(" acl %v hdr_dom(host) -i %v\n", matchToken, strings.SplitN(vhost, ".", 2)[1])) + suffixRoutes[matchToken] = appID + } else { + exactMatches = append(exactMatches, + fmt.Sprintf(" acl %v hdr(host) -i %v\n", matchToken, vhost)) + exactRoutes[matchToken] = appID + } + + if module.vhostDefaultHTTP == appID { + vhostDefault = appID + } + } + } + + var fragment string + fragment += fmt.Sprintf( + "frontend __gateway_http\n"+ + " bind %v:%v\n"+ + " mode http\n"+ + " option dontlognull\n"+ + " option forwardfor\n"+ + " option http-server-close\n"+ + " reqadd X-Forwarded-Proto:\\ http\n"+ + "\n", + module.GatewayAddr, + port) + + // write ACL statements + fragment += strings.Join(exactMatches, "") + fragment += strings.Join(suffixMatches, "") + if len(exactMatches) != 0 || len(suffixMatches) != 0 { + fragment += "\n" + } + + for _, acl := range util.SortedStrStrKeys(exactRoutes) { + appID := exactRoutes[acl] + fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) + } + + for _, acl := range util.SortedStrStrKeys(suffixRoutes) { + appID := suffixRoutes[acl] + fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) + } + + fragment += "\n" + + if len(vhostDefault) != 0 { + fragment += fmt.Sprintf(" default_backend %v\n\n", vhostDefault) + } + + return fragment +} + +func (module *HaproxyModule) makeGatewayHTTPS() string { + // SNI vhost selector + var ( + suffixRoutes map[string]string = make(map[string]string) + suffixMatches []string + exactRoutes map[string]string = make(map[string]string) + exactMatches []string + vhostDefault string + port uint = module.GatewayPortHTTPS + ) + + for _, appID := range SortedVhostsKeys(module.vhostsHTTPS) { + vhosts := module.vhostsHTTPS[appID] + for _, vhost := range vhosts { + matchToken := "vhost_ssl_" + vhost + matchToken = strings.Replace(matchToken, ".", "_", -1) + matchToken = strings.Replace(matchToken, "*", "STAR", -1) + + if len(vhost) >= 3 && vhost[0] == '*' && vhost[1] == '.' { + suffixMatches = append(suffixMatches, + fmt.Sprintf(" acl %v req_ssl_sni -m dom %v\n", matchToken, strings.SplitN(vhost, ".", 2)[1])) + suffixRoutes[matchToken] = appID + } else { + exactMatches = append(exactMatches, + fmt.Sprintf(" acl %v req_ssl_sni -i %v\n", matchToken, vhost)) + exactRoutes[matchToken] = appID + } + + if module.vhostDefaultHTTPS == appID { + vhostDefault = appID + } + } + } + + var fragment string + fragment += fmt.Sprintf( + "frontend __gateway_https\n"+ + " bind %v:%v\n"+ + " mode tcp\n"+ + " tcp-request inspect-delay 5s\n"+ + " tcp-request content accept if { req_ssl_hello_type 1 }\n"+ + "\n", + module.GatewayAddr, + port) + + // write ACL statements + fragment += strings.Join(exactMatches, "") + fragment += strings.Join(suffixMatches, "") + if len(exactMatches) != 0 || len(suffixMatches) != 0 { + fragment += "\n" + } + + for _, acl := range util.SortedStrStrKeys(exactRoutes) { + appID := exactRoutes[acl] + fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) + } + + for _, acl := range util.SortedStrStrKeys(suffixRoutes) { + appID := suffixRoutes[acl] + fragment += fmt.Sprintf(" use_backend %v if %v\n", appID, acl) + } + + fragment += "\n" + + if len(vhostDefault) != 0 { + fragment += fmt.Sprintf(" default_backend %v\n\n", vhostDefault) + } + + return fragment +} + +func (module *HaproxyModule) makeConfigTail() string { + if len(module.ConfigTailPath) == 0 { + return "" + } + + tail, err := ioutil.ReadFile(module.ConfigTailPath) + if err != nil { + log.Printf("Failed to read config tail %v. %v", module.ConfigTailPath, err) + return "" + } + + return string(tail) +} + +func (module *HaproxyModule) reloadConfig() error { + if util.FileIsIdentical(module.ConfigPath, module.OldConfigPath) { + log.Printf("[haproxy] config file not changed. ignoring reload\n") + return nil + } + + pidStr, err := ioutil.ReadFile(module.PidFile) + if err != nil { + return module.startProcess() + } + + pid, err := strconv.Atoi(strings.TrimSpace(string(pidStr))) + if err != nil { + return err + } + + err = syscall.Kill(pid, syscall.Signal(0)) + if err != nil { + // process doesn't exist; start up process + return module.startProcess() + } else { + // process does exist; send SIGHUP to reload + return module.reloadProcess(pid) + } +} + +func (module *HaproxyModule) checkConfig(path string) error { + return module.exec("checking configuration", + "-f", path, "-p", module.PidFile, "-c") +} + +func (module *HaproxyModule) startProcess() error { + return module.exec("starting up process", + "-f", module.ConfigPath, "-p", module.PidFile, "-D", "-q") +} + +func (module *HaproxyModule) reloadProcess(pid int) error { + return module.exec("reloading configuration", + "-f", module.ConfigPath, "-p", module.PidFile, "-D", "-sf", fmt.Sprint(pid)) +} + +func (module *HaproxyModule) exec(logMessage string, args ...string) error { + proc := exec.Command(module.HaproxyExe, args...) + output, err := proc.CombinedOutput() + + log.Printf("[haproxy] %v: %v %v\n", logMessage, module.HaproxyExe, args) + + exitCode := proc.ProcessState.Sys().(syscall.WaitStatus) + if exitCode != 0 { + log.Printf("[haproxy] Bad exit code %v.\n", exitCode) + err = ErrBadExit + } + + if len(output) != 0 && module.Verbose { + log.Println("[haproxy] command output:") + log.Println(strings.TrimSpace(string(output))) + } + + return err +} + +func (module *HaproxyModule) getAppProtocol(app *core.AppCluster) string { + if app.HealthCheck != nil { + if app.HealthCheck.Protocol != "COMMAND" { + return strings.ToUpper(app.HealthCheck.Protocol) + } + } + + return strings.ToUpper(app.Protocol) +} + +func (module *HaproxyModule) supportsProtocol(proto string) bool { + proto = strings.ToUpper(proto) + + if proto == "TCP" || proto == "HTTP" { + return true + } else { + log.Printf("Haproxy: Protocol not supported: %v", proto) + return false + } +} + +// {{{ SortedVhostsKeys +type sortedVhosts struct { + m map[string][]string + s []string +} + +func (sm *sortedVhosts) Len() int { + return len(sm.m) +} + +func (sm *sortedVhosts) Less(a, b int) bool { + // return sm.m[sm.s[a]] > sm.m[sm.s[b]] + return sm.s[a] < sm.s[b] +} + +func (sm *sortedVhosts) Swap(a, b int) { + sm.s[a], sm.s[b] = sm.s[b], sm.s[a] +} + +func SortedVhostsKeys(m map[string][]string) []string { + sm := new(sortedVhosts) + sm.m = m + sm.s = make([]string, len(m)) + + i := 0 + for key, _ := range m { + sm.s[i] = key + i++ + } + sort.Sort(sm) + + return sm.s +} + +// }}} diff --git a/udp_manager.go b/modules/udp.go similarity index 84% rename from udp_manager.go rename to modules/udp.go index cd34b7c..b1b23ba 100644 --- a/udp_manager.go +++ b/modules/udp.go @@ -1,4 +1,4 @@ -package main +package modules import ( "fmt" @@ -8,36 +8,27 @@ import ( "github.com/dawanda/go-mesos/marathon" "github.com/dawanda/mmsd/udpproxy" + "github.com/dawanda/mmsd/util" ) type UdpManager struct { - Enabled bool Verbose bool BindAddr net.IP Servers map[string]*udpproxy.Frontend } -func NewUdpManager(bindAddr net.IP, verbose bool, enabled bool) *UdpManager { +func NewUdpManager(bindAddr net.IP, verbose bool) *UdpManager { return &UdpManager{ - Enabled: enabled, Verbose: verbose, BindAddr: bindAddr, Servers: make(map[string]*udpproxy.Frontend), } } -func (manager *UdpManager) Setup() error { - return nil -} - -func (manager *UdpManager) IsEnabled() bool { - return manager.Enabled +func (manager *UdpManager) Startup() { } -func (manager *UdpManager) SetEnabled(value bool) { - if value != manager.Enabled { - manager.Enabled = value - } +func (manager *UdpManager) Shutdown() { } func (manager *UdpManager) Apply(apps []*marathon.App, force bool) error { @@ -53,7 +44,7 @@ func (manager *UdpManager) Apply(apps []*marathon.App, force bool) error { func (manager *UdpManager) GetFrontend(app *marathon.App, portIndex int, replace bool) (*udpproxy.Frontend, error) { servicePort := app.PortDefinitions[portIndex].Port - name := PrettifyAppId(app.Id, portIndex, servicePort) + name := util.PrettifyAppId(app.Id, portIndex, servicePort) server, ok := manager.Servers[name] if ok { @@ -101,7 +92,7 @@ func (manager *UdpManager) removeApp(appID string) error { func (manager *UdpManager) applyApp(app *marathon.App) error { for portIndex := range app.Ports { - if GetTransportProtocol(app, portIndex) == "udp" { + if util.GetTransportProtocol(app, portIndex) == "udp" { fe, err := manager.GetFrontend(app, portIndex, true) if err != nil { log.Printf("Error spawning UDP frontend. %v\n", err) diff --git a/sse.go b/sse/sse.go similarity index 99% rename from sse.go rename to sse/sse.go index dbd9789..ee89030 100644 --- a/sse.go +++ b/sse/sse.go @@ -1,4 +1,4 @@ -package main +package sse import ( "bufio" diff --git a/helper.go b/util/helper.go similarity index 58% rename from helper.go rename to util/helper.go index 77a95b0..864c4e0 100644 --- a/helper.go +++ b/util/helper.go @@ -1,19 +1,26 @@ -package main +package util import ( "bytes" + "encoding/json" + "errors" "fmt" "hash/fnv" "io" "log" "net" "os" + "sort" "strconv" "strings" "github.com/dawanda/go-mesos/marathon" ) +var ( + ErrInvalidPortRange = errors.New("Invalid port range") +) + func PrettifyAppId(name string, portIndex int, servicePort uint) (appID string) { appID = strings.Replace(name[1:], "/", ".", -1) appID = fmt.Sprintf("%v-%v-%v", appID, portIndex, servicePort) @@ -21,6 +28,13 @@ func PrettifyAppId(name string, portIndex int, servicePort uint) (appID string) return } +func PrettifyAppId2(name string, portIndex int) (appID string) { + appID = strings.Replace(name[1:], "/", ".", -1) + appID = fmt.Sprintf("%v-%v", appID, portIndex) + + return +} + func PrettifyDnsName(dns string) string { return strings.SplitN(dns, ".", 1)[0] } @@ -165,6 +179,16 @@ func GetHealthCheckProtocol(app *marathon.App, portIndex int) string { return "" } +func FindHealthCheckForPortIndex(healthChecks []marathon.HealthCheck, portIndex int) *marathon.HealthCheck { + for _, hs := range healthChecks { + if hs.PortIndex == portIndex { + return &hs + } + } + + return nil +} + func GetHealthCheckForPortIndex(healthChecks []marathon.HealthCheck, portIndex int) marathon.HealthCheck { for _, hs := range healthChecks { if hs.PortIndex == portIndex { @@ -180,3 +204,124 @@ func Hash(s string) uint32 { h.Write([]byte(s)) return h.Sum32() } + +func ResolveIPAddr(dns string, skip bool) string { + if skip { + return dns + } else { + ip, err := net.ResolveIPAddr("ip", dns) + if err != nil { + return dns + } else { + return ip.String() + } + } +} + +func ParseRange(input string) (int, int, error) { + if len(input) == 0 { + return 0, 0, nil + } + + vals := strings.Split(input, ":") + log.Printf("vals: %+q\n", vals) + + if len(vals) == 1 { + i, err := strconv.Atoi(input) + return i, i, err + } + + if len(vals) > 2 { + return 0, 0, ErrInvalidPortRange + } + + var ( + begin int + end int + err error + ) + + // parse begin + if vals[0] != "" { + begin, err = strconv.Atoi(vals[0]) + if err != nil { + return begin, end, err + } + } + + // parse end + if vals[1] != "" { + end, err = strconv.Atoi(vals[1]) + if begin > end { + return begin, end, ErrInvalidPortRange + } + } else { + end = -1 // XXX that is: until the end + } + + return begin, end, err +} + +func MakeStringArray(s string) []string { + if len(s) == 0 { + return []string{} + } else { + return strings.Split(s, ",") + } +} + +// {{{ SortedStrStrKeys +type sortedStrStrKeys struct { + m map[string]string + s []string +} + +func (sm *sortedStrStrKeys) Len() int { + return len(sm.m) +} + +func (sm *sortedStrStrKeys) Less(a, b int) bool { + // return sm.m[sm.s[a]] > sm.m[sm.s[b]] + return sm.s[a] < sm.s[b] +} + +func (sm *sortedStrStrKeys) Swap(a, b int) { + sm.s[a], sm.s[b] = sm.s[b], sm.s[a] +} + +func SortedStrStrKeys(m map[string]string) []string { + sm := new(sortedStrStrKeys) + sm.m = m + sm.s = make([]string, len(m)) + + i := 0 + for key, _ := range m { + sm.s[i] = key + i++ + } + sort.Sort(sm) + + return sm.s +} + +// }}} + +func ConvertToJsonString(obj interface{}) string { + out, err := json.MarshalIndent(&obj, "", " ") + if err == nil { + return string(out) + } else { + return "" + } +} + +func FindMarathonAppById(apps []marathon.App, id string) *marathon.App { + for i, mi := range apps { + if mi.Id == id { + return &apps[i] + } else { + log.Printf("FindMarathonAppById: skip %v", mi.Id) + } + } + return nil +} diff --git a/util/helper_test.go b/util/helper_test.go new file mode 100644 index 0000000..185d062 --- /dev/null +++ b/util/helper_test.go @@ -0,0 +1,45 @@ +package util + +import ( + "testing" +) + +func TestContains(t *testing.T) { + if Contains([]string{"a", "b"}, "b") == false { + t.Error("'b' not found.") + } + if Contains([]string{"a", "b"}, "aa") == true { + t.Error("'aa' found. weird.") + } +} + +func TestAtoi(t *testing.T) { + // test success + if i := Atoi("123", 17); i != 123 { + t.Errorf("Failed to run Atoi on \"123\". Got %v instead.", i) + } + // test default + if i := Atoi("blah", 17); i != 17 { + t.Errorf("Failed to run Atoi on \"\". Got %v instead.", i) + } + // test default + if i := Atoi("", 17); i != 17 { + t.Errorf("Failed to run Atoi on \"\". Got %v instead.", i) + } +} + +func TestFindMissing(t *testing.T) { + a := []string{"bar", "foo", "tar"} + b := []string{"foo", "com"} + c := FindMissing(a, b) + t.Logf("result: %+v", c) + if len(c) != 2 { + t.Fatalf("Result size differs. Expected 2 but got %v", len(c)) + } + if c[0] != "bar" { + t.Errorf("\"bar\" not found, but \"%v\".", c[0]) + } + if c[1] != "tar" { + t.Errorf("\"tar\" not found, but \"%v\".", c[1]) + } +}