diff --git a/cluster/discovery.go b/cluster/discovery.go new file mode 100644 index 00000000..2ee705c5 --- /dev/null +++ b/cluster/discovery.go @@ -0,0 +1,205 @@ +package cluster + +import ( + "context" + "net" + "runtime" + + "github.com/grandcat/zeroconf" + "github.com/oleksandr/bonjour" +) + +type ServiceRecord struct { + Instance string `json:"name"` + Service string `json:"type"` + Domain string `json:"domain"` +} + +type ServiceEntry struct { + ServiceRecord + HostName string `json:"hostname"` + Port int `json:"port"` + Text []string `json:"text"` + TTL uint32 `json:"ttl"` + AddrIPv4 []net.IP `json:"-"` + AddrIPv6 []net.IP `json:"-"` +} + +type Resolver interface { + Browse(ctx context.Context, service string, domain string, entries chan<- *ServiceEntry) error + Lookup(ctx context.Context, instance string, service string, domain string, entries chan<- *ServiceEntry) error +} + +const ( + darwin = "darwin" +) + +func NewResolver(ifs ...net.Interface) (Resolver, error) { + switch runtime.GOOS { + case darwin: + var iface *net.Interface + if len(ifs) > 0 { + iface = &ifs[0] + } + r, err := bonjour.NewResolver(iface) + if err != nil { + return nil, err + } + + return &BonjourResolver{ + Resolver: r, + }, nil + default: + var opt []zeroconf.ClientOption + if len(ifs) > 0 { + opt = append(opt, zeroconf.SelectIfaces(ifs)) + } + r, err := zeroconf.NewResolver(opt...) + if err != nil { + return nil, err + } + + return &ZeroconfResolver{ + Resolver: r, + }, nil + } +} + +type ZeroconfResolver struct { + *zeroconf.Resolver +} + +func (r *ZeroconfResolver) Browse(ctx context.Context, service string, domain string, entries chan<- *ServiceEntry) error { + ch := make(chan *zeroconf.ServiceEntry, cap(entries)) + go func(zeroconfEntries <-chan *zeroconf.ServiceEntry) { + defer close(entries) + for e := range zeroconfEntries { + entries <- &ServiceEntry{ + ServiceRecord: ServiceRecord{ + Instance: e.Instance, + Service: e.Service, + Domain: e.Domain, + }, + HostName: e.HostName, + Port: e.Port, + Text: e.Text, + TTL: e.TTL, + AddrIPv4: e.AddrIPv4, + AddrIPv6: e.AddrIPv6, + } + } + }(ch) + + return r.Resolver.Browse(ctx, service, domain, ch) +} + +func (r *ZeroconfResolver) Lookup(ctx context.Context, instance string, service string, domain string, entries chan<- *ServiceEntry) error { + ch := make(chan *zeroconf.ServiceEntry, cap(entries)) + go func(zeroconfEntries <-chan *zeroconf.ServiceEntry) { + defer close(entries) + for e := range zeroconfEntries { + entries <- &ServiceEntry{ + ServiceRecord: ServiceRecord{ + Instance: e.Instance, + Service: e.Service, + Domain: e.Domain, + }, + HostName: e.HostName, + Port: e.Port, + Text: e.Text, + TTL: e.TTL, + AddrIPv4: e.AddrIPv4, + AddrIPv6: e.AddrIPv6, + } + } + }(ch) + + return r.Resolver.Lookup(ctx, instance, service, domain, ch) +} + +type BonjourResolver struct { + *bonjour.Resolver +} + +func (r *BonjourResolver) Browse(_ context.Context, service string, domain string, entries chan<- *ServiceEntry) error { + ch := make(chan *bonjour.ServiceEntry, cap(entries)) + go func(bonjourEntries chan *bonjour.ServiceEntry) { + defer close(entries) + for e := range bonjourEntries { + entries <- &ServiceEntry{ + ServiceRecord: ServiceRecord{ + Instance: e.Instance, + Service: e.Service, + Domain: e.Domain, + }, + HostName: e.HostName, + Port: e.Port, + Text: e.Text, + TTL: e.TTL, + AddrIPv4: []net.IP{e.AddrIPv4}, + AddrIPv6: []net.IP{e.AddrIPv6}, + } + } + }(ch) + + return r.Resolver.Browse(service, domain, ch) +} + +func (r *BonjourResolver) Lookup(_ context.Context, instance string, service string, domain string, entries chan<- *ServiceEntry) error { + ch := make(chan *bonjour.ServiceEntry, cap(entries)) + go func(bonjourEntries chan *bonjour.ServiceEntry) { + defer close(entries) + for e := range bonjourEntries { + entries <- &ServiceEntry{ + ServiceRecord: ServiceRecord{ + Instance: e.Instance, + Service: e.Service, + Domain: e.Domain, + }, + HostName: e.HostName, + Port: e.Port, + Text: e.Text, + TTL: e.TTL, + AddrIPv4: []net.IP{e.AddrIPv4}, + AddrIPv6: []net.IP{e.AddrIPv6}, + } + } + }(ch) + + return r.Resolver.Lookup(instance, service, domain, ch) +} + +type Discovery interface { + RegisterProxy(instance string, service string, domain string, port int, host string, ips []string, text []string, ifaces []net.Interface) (Shutdownable, error) +} + +type ZeroconfDiscovery struct { +} + +func NewDiscovery() Discovery { + switch runtime.GOOS { + case darwin: + return new(BonjourDiscovery) + default: + return new(ZeroconfDiscovery) + } +} + +func (z *ZeroconfDiscovery) RegisterProxy(instance string, service string, domain string, port int, host string, ips []string, text []string, ifaces []net.Interface) (Shutdownable, error) { + server, err := zeroconf.RegisterProxy(instance, service, domain, port, host, ips, text, ifaces) + + return server, err +} + +type BonjourDiscovery struct { +} + +func (z *BonjourDiscovery) RegisterProxy(instance string, service string, domain string, port int, host string, ips []string, text []string, ifaces []net.Interface) (Shutdownable, error) { + var iface *net.Interface + if len(ifaces) > 0 { + iface = &ifaces[0] + } + server, err := bonjour.RegisterProxy(instance, service, domain, port, host, append(ips, "")[0], text, iface) + + return server, err +} diff --git a/cluster/selfmanaged.go b/cluster/selfmanaged.go index 38bacc76..2a940df7 100644 --- a/cluster/selfmanaged.go +++ b/cluster/selfmanaged.go @@ -11,7 +11,6 @@ import ( "time" "github.com/anthdm/hollywood/actor" - "github.com/grandcat/zeroconf" ) const ( @@ -48,6 +47,9 @@ func (c SelfManagedConfig) WithBootstrapMember(member MemberAddr) SelfManagedCon return c } +type Shutdownable interface { + Shutdown() +} type SelfManaged struct { config SelfManagedConfig cluster *Cluster @@ -59,8 +61,8 @@ type SelfManaged struct { membersAlive *MemberSet - resolver *zeroconf.Resolver - announcer *zeroconf.Server + resolver Resolver + announcer Shutdownable ctx context.Context cancel context.CancelFunc @@ -169,7 +171,7 @@ func (s *SelfManaged) start(c *actor.Context) { } func (s *SelfManaged) initAutoDiscovery() { - resolver, err := zeroconf.NewResolver() + resolver, err := NewResolver() if err != nil { log.Fatal(err) } @@ -184,31 +186,39 @@ func (s *SelfManaged) initAutoDiscovery() { log.Fatal(err) } - server, err := zeroconf.RegisterProxy( + server, err := NewDiscovery().RegisterProxy( s.cluster.ID(), serviceName, domain, port, fmt.Sprintf("member_%s", s.cluster.ID()), []string{host}, - []string{"txtv=0", "lo=1", "la=2"}, nil) + []string{"txtv=0", "lo=1", "la=2"}, + nil, + ) if err != nil { log.Fatal(err) } s.announcer = server } +const localhost = "127.0.0.1" + func (s *SelfManaged) startAutoDiscovery() { - entries := make(chan *zeroconf.ServiceEntry) - go func(results <-chan *zeroconf.ServiceEntry) { + entries := make(chan *ServiceEntry) + go func(results <-chan *ServiceEntry) { for entry := range results { if entry.Instance != s.cluster.ID() { - host := fmt.Sprintf("%s:%d", entry.AddrIPv4[0], entry.Port) + host := localhost + if len(entry.AddrIPv4) > 0 && len(entry.AddrIPv4[0]) > 0 { + host = entry.AddrIPv4[0].String() + } + addr := fmt.Sprintf("%s:%d", host, entry.Port) hs := &Handshake{ Member: s.cluster.Member(), } // create the reachable PID for this member. - memberPID := actor.NewPID(host, "provider/"+entry.Instance) + memberPID := actor.NewPID(addr, "provider/"+entry.Instance) self := actor.NewPID(s.cluster.agentPID.Address, "provider/"+s.cluster.ID()) s.cluster.engine.SendWithSender(memberPID, hs, self) } diff --git a/go.mod b/go.mod index c271e62a..198e2afc 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/miekg/dns v1.1.27 // indirect + github.com/oleksandr/bonjour v0.0.0-20210301155756-30f43c61b915 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/go.sum b/go.sum index b0bb4c41..00141cae 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvls github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/miekg/dns v1.1.27 h1:aEH/kqUzUxGJ/UHcEKdJY+ugH6WEzsEBBSPa8zuy1aM= github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= +github.com/oleksandr/bonjour v0.0.0-20210301155756-30f43c61b915 h1:d291KOLbN1GthTPA1fLKyWdclX3k1ZP+CzYtun+a5Es= +github.com/oleksandr/bonjour v0.0.0-20210301155756-30f43c61b915/go.mod h1:MGuVJ1+5TX1SCoO2Sx0eAnjpdRytYla2uC1YIZfkC9c= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/planetscale/vtprotobuf v0.5.0 h1:l8PXm6Colok5z6qQLNhAj2Jq5BfoMTIHxLER5a6nDqM=