From bfb6bf5f4775e71fa3fefd506fd849e35271ed9e Mon Sep 17 00:00:00 2001 From: nadilas Date: Sat, 23 May 2020 13:57:56 +0200 Subject: [PATCH 1/8] Create rabbitmq.go A baseline rabbitMQ provider for subscribing to exchanges with optional support for routing keys. Publish still needs to be implemented. --- providers/rabbitmq/rabbitmq.go | 199 +++++++++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 providers/rabbitmq/rabbitmq.go diff --git a/providers/rabbitmq/rabbitmq.go b/providers/rabbitmq/rabbitmq.go new file mode 100644 index 0000000..98cbef4 --- /dev/null +++ b/providers/rabbitmq/rabbitmq.go @@ -0,0 +1,199 @@ +package rabbitmq + +import ( + "context" + "crypto/sha1" + "fmt" + "os" + "strconv" + "time" + + "github.com/lileio/pubsub/v2" + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" +) + +// session composes an amqp.Connection with an amqp.Channel +type session struct { + *amqp.Connection + *amqp.Channel +} + +type RabbitMQProvider struct { + amqpUrl string + conn *amqp.Connection + ch *amqp.Channel + pubsubDone context.CancelFunc + shutdown bool +} + +// Creates a new Subscriber and sets the AMQP url for the internal rabbitMQ client +func NewRabbitMQProvider(amqpUrl string) *RabbitMQProvider { + return &RabbitMQProvider{amqpUrl: amqpUrl} +} + +func (r *RabbitMQProvider) Publish(ctx context.Context, topic string, m *pubsub.Msg) error { + panic("implement me") +} + +func (r *RabbitMQProvider) Subscribe(opts pubsub.HandlerOptions, handler pubsub.MsgHandler) { + go func() { + queue := identity() + + for session := range r.redial(context.Background(), r.amqpUrl) { + sub := <-session + + if _, err := sub.QueueDeclare(queue, false, true, true, false, nil); err != nil { + logrus.Warnf("cannot consume from exclusive queue: %q, %v", queue, err) + return + } + + exchange := opts.Topic + routingKey := "" + if opts.Name != "-" { + routingKey = opts.Name + } + + if err := sub.QueueBind(queue, routingKey, exchange, false, nil); err != nil { + logrus.Warnf("cannot consume without a binding to exchange: %q, %v", exchange, err) + return + } + + deliveries, err := sub.Consume(queue, "", opts.AutoAck, true, false, false, nil) + if err != nil { + logrus.Warnf("cannot consume from: %q, %v", queue, err) + return + } + + logrus.Infof("Queue (%s) attached to exchange '%s' via routing key '%s'", queue, exchange, routingKey) + + for msg := range deliveries { + if r.shutdown { + break + } + + returnMessage := pubsub.Msg{ + ID: msg.MessageId, + Metadata: map[string]string{ + "AppId": msg.AppId, + "ConsumerTag": msg.ConsumerTag, + "ContentEncoding": msg.ContentEncoding, + "ContentType": msg.ContentType, + "CorrelationId": msg.CorrelationId, + "Exchange": msg.Exchange, + "RoutingKey": msg.RoutingKey, + "Expiration": msg.Expiration, + "ReplyTo": msg.ReplyTo, + "Type": msg.Type, + "UserId": msg.UserId, + "Priority": strconv.Itoa(int(msg.Priority)), + "Redelivered": strconv.FormatBool(msg.Redelivered), + }, + Data: msg.Body, + PublishTime: &msg.Timestamp, + Ack: func() { + err := sub.Ack(msg.DeliveryTag, false) + warnOnError(err, "Failed to acknowledge message") + }, + Nack: func() { + err := sub.Nack(msg.DeliveryTag, false, true) // automatically requeue + warnOnError(err, "Failed to acknowledge message") + if err == nil { + logrus.Debugf("Unprocessed message (%s) successfully requeued", msg.DeliveryTag) + } + }, + } + + dlCtx, _ := context.WithDeadline(context.Background(), time.Now().Add(opts.Deadline)) + + // handle message + err := handler(dlCtx, returnMessage) + if err != nil { + returnMessage.Nack() + continue + } + + if !opts.AutoAck { + returnMessage.Ack() + } + } + } + }() +} + +// redial continually connects to the URL, exiting the program when no longer possible +func (r *RabbitMQProvider) redial(ctx context.Context, url string) chan chan session { + sessions := make(chan chan session) + + go func() { + sess := make(chan session) + defer close(sessions) + + for { + select { + case sessions <- sess: + case <-ctx.Done(): + logrus.Debugf("shutting down session factory") + return + } + + if r.conn == nil { + // initialize connection only once + var err error + r.conn, err = amqp.Dial(url) + if err != nil { + logrus.Fatalf("cannot (re)dial: %v: %q", err, url) + } + } + + ch, err := r.conn.Channel() + if err != nil { + logrus.Fatalf("cannot create channel: %v", err) + } + + // declare short-lived exchanges + //if err := ch.ExchangeDeclare(exchange, "fanout", false, true, false, false, nil); err != nil { + // logrus.Fatalf("cannot declare fanout exchange: %v", err) + //} + + select { + case sess <- session{r.conn, ch}: + case <-ctx.Done(): + logrus.Debugf("shutting down new session") + return + } + } + }() + + return sessions +} + +func (r *RabbitMQProvider) Shutdown() { + r.shutdown = true + if r.pubsubDone != nil { + r.pubsubDone() + } + if r.ch != nil { + _ = r.ch.Close() + } + if r.conn != nil { + _ = r.conn.Close() + } +} + +// identity returns the same host/process unique string for the lifetime of +// this process so that subscriber reconnections reuse the same queue name. +func identity() string { + hostname, err := os.Hostname() + h := sha1.New() + _, _ = fmt.Fprint(h, hostname) + _, _ = fmt.Fprint(h, err) + _, _ = fmt.Fprint(h, os.Getpid()) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func warnOnError(err error, msg string) { + if err != nil { + logrus.Warnf("%s: %s", msg, err) + } +} From 13224bbab72781e1b9085c258f4d00129ead0766 Mon Sep 17 00:00:00 2001 From: nadilas Date: Sat, 23 May 2020 14:03:33 +0200 Subject: [PATCH 2/8] fix autoAck logic --- providers/rabbitmq/rabbitmq.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/rabbitmq/rabbitmq.go b/providers/rabbitmq/rabbitmq.go index 98cbef4..75539bc 100644 --- a/providers/rabbitmq/rabbitmq.go +++ b/providers/rabbitmq/rabbitmq.go @@ -108,12 +108,12 @@ func (r *RabbitMQProvider) Subscribe(opts pubsub.HandlerOptions, handler pubsub. // handle message err := handler(dlCtx, returnMessage) - if err != nil { + if err != nil && opts.AutoAck { returnMessage.Nack() continue } - if !opts.AutoAck { + if opts.AutoAck { returnMessage.Ack() } } From 23de3fe82c8d47aaf7d1538e2e4e5dc80b8fa820 Mon Sep 17 00:00:00 2001 From: nadilas Date: Thu, 18 Jun 2020 16:25:57 +0200 Subject: [PATCH 3/8] Create helpers --- providers/rabbitmq/helpers | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 providers/rabbitmq/helpers diff --git a/providers/rabbitmq/helpers b/providers/rabbitmq/helpers new file mode 100644 index 0000000..f180429 --- /dev/null +++ b/providers/rabbitmq/helpers @@ -0,0 +1,10 @@ +package rabbitmq + +// Func is current function name provider, +// like `__FUNCTION__` of PHP. +func currentFunc() string { + pc, _, _, _ := runtime.Caller(depthOfFunctionCaller) + fn := runtime.FuncForPC(pc) + elems := strings.Split(fn.Name(), ".") + return elems[len(elems)-1] +} From be7eb1d568947b3acc39cc638133a16d5e3a6329 Mon Sep 17 00:00:00 2001 From: nadilas Date: Thu, 18 Jun 2020 16:26:21 +0200 Subject: [PATCH 4/8] Create grpcsync_event.go --- providers/rabbitmq/grpcsync_event.go | 61 ++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 providers/rabbitmq/grpcsync_event.go diff --git a/providers/rabbitmq/grpcsync_event.go b/providers/rabbitmq/grpcsync_event.go new file mode 100644 index 0000000..fbe697c --- /dev/null +++ b/providers/rabbitmq/grpcsync_event.go @@ -0,0 +1,61 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package grpcsync implements additional synchronization primitives built upon +// the sync package. +package grpcsync + +import ( + "sync" + "sync/atomic" +) + +// Event represents a one-time event that may occur in the future. +type Event struct { + fired int32 + c chan struct{} + o sync.Once +} + +// Fire causes e to complete. It is safe to call multiple times, and +// concurrently. It returns true iff this call to Fire caused the signaling +// channel returned by Done to close. +func (e *Event) Fire() bool { + ret := false + e.o.Do(func() { + atomic.StoreInt32(&e.fired, 1) + close(e.c) + ret = true + }) + return ret +} + +// Done returns a channel that will be closed when Fire is called. +func (e *Event) Done() <-chan struct{} { + return e.c +} + +// HasFired returns true if Fire has been called. +func (e *Event) HasFired() bool { + return atomic.LoadInt32(&e.fired) == 1 +} + +// NewEvent returns a new, ready-to-use Event. +func NewEvent() *Event { + return &Event{c: make(chan struct{})} +} From b52a28de82b9be1ba856f2eefbca6b10b88c74c6 Mon Sep 17 00:00:00 2001 From: nadilas Date: Thu, 18 Jun 2020 16:27:04 +0200 Subject: [PATCH 5/8] Delete helpers --- providers/rabbitmq/helpers | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 providers/rabbitmq/helpers diff --git a/providers/rabbitmq/helpers b/providers/rabbitmq/helpers deleted file mode 100644 index f180429..0000000 --- a/providers/rabbitmq/helpers +++ /dev/null @@ -1,10 +0,0 @@ -package rabbitmq - -// Func is current function name provider, -// like `__FUNCTION__` of PHP. -func currentFunc() string { - pc, _, _, _ := runtime.Caller(depthOfFunctionCaller) - fn := runtime.FuncForPC(pc) - elems := strings.Split(fn.Name(), ".") - return elems[len(elems)-1] -} From 489a17e99ca6fb0a98bb14549825b46e1775459b Mon Sep 17 00:00:00 2001 From: nadilas Date: Thu, 18 Jun 2020 16:27:13 +0200 Subject: [PATCH 6/8] Create helpers.go --- providers/rabbitmq/helpers.go | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 providers/rabbitmq/helpers.go diff --git a/providers/rabbitmq/helpers.go b/providers/rabbitmq/helpers.go new file mode 100644 index 0000000..f180429 --- /dev/null +++ b/providers/rabbitmq/helpers.go @@ -0,0 +1,10 @@ +package rabbitmq + +// Func is current function name provider, +// like `__FUNCTION__` of PHP. +func currentFunc() string { + pc, _, _, _ := runtime.Caller(depthOfFunctionCaller) + fn := runtime.FuncForPC(pc) + elems := strings.Split(fn.Name(), ".") + return elems[len(elems)-1] +} From 7ff5836e1ee1d07f8c03006fd3b8bcbe7d99cb76 Mon Sep 17 00:00:00 2001 From: nadilas Date: Thu, 18 Jun 2020 16:27:50 +0200 Subject: [PATCH 7/8] Add publish and refactor fixes issues --- providers/rabbitmq/rabbitmq.go | 381 +++++++++++++++++++++++++++------ 1 file changed, 315 insertions(+), 66 deletions(-) diff --git a/providers/rabbitmq/rabbitmq.go b/providers/rabbitmq/rabbitmq.go index 75539bc..e5f7193 100644 --- a/providers/rabbitmq/rabbitmq.go +++ b/providers/rabbitmq/rabbitmq.go @@ -2,48 +2,125 @@ package rabbitmq import ( "context" - "crypto/sha1" "fmt" - "os" "strconv" + "strings" + "sync" "time" - + "github.com/lileio/pubsub/v2" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) -// session composes an amqp.Connection with an amqp.Channel -type session struct { - *amqp.Connection - *amqp.Channel +const RoutingKey = "RoutingKey" + +type PublishExchange struct{} + +var ( + defaultConfig = ProviderConfig{ + AppId: "", + EagerPublishExchanges: nil, + MaxRedialCount: 5, + RoutingKeySeparator: "@", + } +) + +type ProviderConfig struct { + AmqpUrl string + AppId string + EagerPublishExchanges []string + MaxRedialCount int + RoutingKeySeparator string } -type RabbitMQProvider struct { - amqpUrl string - conn *amqp.Connection - ch *amqp.Channel - pubsubDone context.CancelFunc - shutdown bool +type Provider struct { + config ProviderConfig + conn *amqp.Connection + ch *amqp.Channel + + // To synchronize shutdown + quit *Event + + // exchange channel map + exchangeQueuesMux sync.RWMutex + exchangeQueues map[string]chan *pubsub.Msg +} + +func mergeConfig(config ProviderConfig) ProviderConfig { + cfg := defaultConfig + if config.AppId != "" { + cfg.AppId = config.AppId + } + if config.AmqpUrl != "" { + cfg.AmqpUrl = config.AmqpUrl + } + if config.MaxRedialCount != cfg.MaxRedialCount && config.MaxRedialCount != 0 { + cfg.MaxRedialCount = config.MaxRedialCount + } + return cfg } // Creates a new Subscriber and sets the AMQP url for the internal rabbitMQ client -func NewRabbitMQProvider(amqpUrl string) *RabbitMQProvider { - return &RabbitMQProvider{amqpUrl: amqpUrl} +// +// You can send a +func NewProvider(config ProviderConfig, eagerExchanges ...string) *Provider { + providerCfg := mergeConfig(config) + providerCfg.EagerPublishExchanges = eagerExchanges + + p := &Provider{ + config: providerCfg, + exchangeQueuesMux: sync.RWMutex{}, + exchangeQueues: make(map[string]chan *pubsub.Msg), + quit: NewEvent(), + } + + // setup eager exchanges if any + go p.setupPublishTopics() + + return p } -func (r *RabbitMQProvider) Publish(ctx context.Context, topic string, m *pubsub.Msg) error { - panic("implement me") +// Publish queues a message to a topic (a.k.a an exchange) +// +// In order to supply a routing key for a topic use the following syntax when supplying the topic parameter: +// // the default config.RoutingKeySeparator is '@' +// topic := "routingKeys@TopicExchange" +// +// You can also use the helper method provider.RoutingKeyAtExchangeHelper, which uses the separator from the config +func (p *Provider) Publish(ctx context.Context, topic string, message *pubsub.Msg) error { + sp, _ := opentracing.StartSpanFromContext(ctx, currentFunc()) + defer sp.Finish() + + exchange, routing, err := p.extractExchangeAndRouting(topic) + if err != nil { + return err + } + queue, err := p.exchangeQueue(exchange) + if err != nil { + return err + } + message.Metadata[RoutingKey] = routing + + // post message + queue <- message + return nil } -func (r *RabbitMQProvider) Subscribe(opts pubsub.HandlerOptions, handler pubsub.MsgHandler) { +// Subscribes to a specific exchange by creating a new queue consumer +// +// By default the routing key is derived from the Name field of the opts parameter. +// In order to set an empty routing key, use "-" for the mandatory pubsub.HandlerOptions.Name parameter +func (p *Provider) Subscribe(opts pubsub.HandlerOptions, handler pubsub.MsgHandler) { go func() { queue := identity() - for session := range r.redial(context.Background(), r.amqpUrl) { + for session := range p.redial() { sub := <-session - if _, err := sub.QueueDeclare(queue, false, true, true, false, nil); err != nil { + if _, err := sub.QueueDeclare(queue, false, true, false, false, nil); err != nil { logrus.Warnf("cannot consume from exclusive queue: %q, %v", queue, err) return } @@ -59,16 +136,17 @@ func (r *RabbitMQProvider) Subscribe(opts pubsub.HandlerOptions, handler pubsub. return } - deliveries, err := sub.Consume(queue, "", opts.AutoAck, true, false, false, nil) + deliveries, err := sub.Consume(queue, "", opts.AutoAck, false, false, false, nil) if err != nil { logrus.Warnf("cannot consume from: %q, %v", queue, err) return } - logrus.Infof("Queue (%s) attached to exchange '%s' via routing key '%s'", queue, exchange, routingKey) + logrus.Infof("Queue (%s) attached to exchange '%s/%s' via routing key '%s'", queue, sub.Connection.Config.Vhost, exchange, routingKey) for msg := range deliveries { - if r.shutdown { + if p.quit.HasFired() { + logrus.Infof("Shutting down queue (%s) attached to exchange '%s/%s' via routing key '%s'", queue, sub.Connection.Config.Vhost, exchange, routingKey) break } @@ -121,32 +199,233 @@ func (r *RabbitMQProvider) Subscribe(opts pubsub.HandlerOptions, handler pubsub. }() } -// redial continually connects to the URL, exiting the program when no longer possible -func (r *RabbitMQProvider) redial(ctx context.Context, url string) chan chan session { +// Shutdown is to be called for graceful termination +func (p *Provider) Shutdown() { + // trigger shutdown + p.quit.Fire() + + // TODO should wait for all go-rotuines to shut down + + if p.ch != nil { + _ = p.ch.Close() + } + if p.conn != nil { + _ = p.conn.Close() + } +} + +// Helpers + +// RoutingKeyAtExchangeHelper is a shorthand for creating a publish topic string +// +// It is essentially the same as calling: +// topic := fmt.Sprintf("%s%s%s", routingKey, p.config.RoutingKeySeparator, exchange) +func (p *Provider) RoutingKeyAtExchangeHelper(routingKey, exchange string) (topic string) { + return fmt.Sprintf("%s%s%s", routingKey, p.config.RoutingKeySeparator, exchange) +} + +// Prepares and creates queues for eager publish exchanges and starts processing on them +func (p *Provider) setupPublishTopics() { + for _, exchange := range p.config.EagerPublishExchanges { + exchange, _, err := p.extractExchangeAndRouting(exchange) + if err != nil { + logrus.Warnf("Failed to eagerly prepare publish exchange: %s. %v", exchange, err) + continue + } + _, err = p.exchangeQueue(exchange) + if err != nil { + logrus.Warnf("Failed to eagerly create queue for publish exchange: %s. %v", exchange, err) + } + } +} + +// Returns the underlying queue for this topic exchange +// +// If the exchange has not been used before, start a new redial go-routine for processing the queue +func (p *Provider) exchangeQueue(exchange string) (chan *pubsub.Msg, error) { + p.exchangeQueuesMux.RLock() + if xch, ok := p.exchangeQueues[exchange]; !ok { + p.exchangeQueuesMux.RUnlock() + // create new queue + newQueue := make(chan *pubsub.Msg, 10) + + // save for reuse + p.exchangeQueuesMux.Lock() + p.exchangeQueues[exchange] = newQueue + p.exchangeQueuesMux.Unlock() + + // start processing until context exists + go p.publishToExchange(p.redial(), exchange, newQueue) + + return newQueue, nil + } else { + p.exchangeQueuesMux.RUnlock() + return xch, nil + } +} + +// Separates the incoming publish topic to exchange name and routing key +func (p *Provider) extractExchangeAndRouting(ogTopic string) (exchange, routingKey string, err error) { + if !strings.Contains(ogTopic, p.config.RoutingKeySeparator) { + return ogTopic, "", nil // is exchange + } + s := strings.Split(ogTopic, p.config.RoutingKeySeparator) + if len(s) < 2 { + return "", "", errors.New(fmt.Sprintf("Too many separators '%s'. Syntax is: '$routingKey%s$exchange'", p.config.RoutingKeySeparator, p.config.RoutingKeySeparator)) + } + return s[1], s[0], nil +} + +// publishToTopic keeps a reconnecting session to a topic exchange open and waits for queued messages. +// +// It receives from the application specific source of messages. +func (p *Provider) publishToExchange(sessions chan chan session, exchange string, messages <-chan *pubsub.Msg) { + for session := range sessions { + var ( + running bool + reading = messages + pending = make(chan *pubsub.Msg, 1) + confirm = make(chan amqp.Confirmation, 1) + ) + + pub := <-session + + // publisher confirms for this channel/connection + if err := pub.Confirm(false); err != nil { + logrus.Warnf("publisher confirms not supported") + close(confirm) // confirms not supported, simulate by always nacking + } else { + pub.NotifyPublish(confirm) + } + + Publish: + for { + if p.quit.HasFired() { + logrus.Infof("Shutting down publishToExchange: %s/%s...", p.conn.Config.Vhost, exchange) + if pending != nil { + close(pending) + } + if confirm != nil { + close(confirm) + } + return + } + var msg *pubsub.Msg + select { + // confirm + case confirmed, ok := <-confirm: + if !ok { + break Publish + } + if !confirmed.Ack { + logrus.Debugf("nack message %d, msg: %q", confirmed.DeliveryTag, string(msg.Data)) + } + reading = messages + // pending + case msg = <-pending: + routingKey := "" // "ignored for fanout exchanges, application dependent for other exchanges" + if rk, ok := msg.Metadata[RoutingKey]; ok { + routingKey = rk + } + + // transfer metadata + headers := amqp.Table{} + for k, v := range msg.Metadata { + headers[k] = v + } + publishing := amqp.Publishing{ + Headers: headers, + // ContentType: "", + // ContentEncoding: "", + // DeliveryMode: 0, + // Priority: 0, + // CorrelationId: "", + // ReplyTo: "", + // Expiration: "", + // Type: "", + // UserId: "", + AppId: p.config.AppId, + Body: msg.Data, + } + if msg.ID != "" { + publishing.MessageId = msg.ID + } + if msg.PublishTime != nil { + publishing.Timestamp = *msg.PublishTime + } + err := pub.Publish(exchange, routingKey, false, false, publishing) + // Retry failed delivery on the next session + if err != nil { + logrus.Errorf("failed to publish %q. %v", string(msg.Data), err) + pending <- msg + _ = pub.Channel.Close() + break Publish + } else { + // write back publish time + if msg.PublishTime == nil { + now := time.Now() + msg.PublishTime = &now + } + logrus.Debugf("sent %q", string(msg.Data)) + } + // reading + case msg, running = <-reading: + // all messages consumed + if !running { + return + } + // work on pending delivery until ack'd + pending <- msg + reading = nil + // loop over for shudown check + case <-time.After(time.Millisecond * 100): + } + } + } +} + +// redial continually connects to the URL +// +// This method exits the program when retry count reaches max retry count defined in the config +func (p *Provider) redial() chan chan session { sessions := make(chan chan session) go func() { sess := make(chan session) - defer close(sessions) - + defer func() { + close(sessions) + close(sess) + }() + retryCounter := 1 for { select { case sessions <- sess: - case <-ctx.Done(): - logrus.Debugf("shutting down session factory") + case <-p.quit.Done(): + logrus.Info("Shutting down redial:sessionfactory...") return } - if r.conn == nil { + if p.conn == nil { // initialize connection only once var err error - r.conn, err = amqp.Dial(url) + p.conn, err = amqp.Dial(p.config.AmqpUrl) if err != nil { - logrus.Fatalf("cannot (re)dial: %v: %q", err, url) + if retryCounter > p.config.MaxRedialCount { + logrus.Fatalf("cannot (re)dial: %v: %q", err, p.config.AmqpUrl) + } else { + sleep := time.Second * time.Duration(retryCounter) + logrus.Errorf("waiting %s before redialing AQMP address. Error: %v") + // sleep before next retry + time.Sleep(sleep) + retryCounter++ + continue // try again + } + } else { + retryCounter = 1 // reset retry counter } } - ch, err := r.conn.Channel() + ch, err := p.conn.Channel() if err != nil { logrus.Fatalf("cannot create channel: %v", err) } @@ -157,9 +436,9 @@ func (r *RabbitMQProvider) redial(ctx context.Context, url string) chan chan ses //} select { - case sess <- session{r.conn, ch}: - case <-ctx.Done(): - logrus.Debugf("shutting down new session") + case sess <- session{p.conn, ch}: + case <-p.quit.Done(): + logrus.Info("Shutting down redial:sessions...") return } } @@ -167,33 +446,3 @@ func (r *RabbitMQProvider) redial(ctx context.Context, url string) chan chan ses return sessions } - -func (r *RabbitMQProvider) Shutdown() { - r.shutdown = true - if r.pubsubDone != nil { - r.pubsubDone() - } - if r.ch != nil { - _ = r.ch.Close() - } - if r.conn != nil { - _ = r.conn.Close() - } -} - -// identity returns the same host/process unique string for the lifetime of -// this process so that subscriber reconnections reuse the same queue name. -func identity() string { - hostname, err := os.Hostname() - h := sha1.New() - _, _ = fmt.Fprint(h, hostname) - _, _ = fmt.Fprint(h, err) - _, _ = fmt.Fprint(h, os.Getpid()) - return fmt.Sprintf("%x", h.Sum(nil)) -} - -func warnOnError(err error, msg string) { - if err != nil { - logrus.Warnf("%s: %s", msg, err) - } -} From 1389be9f14397f7cfd8540b7982fa483831b3c9b Mon Sep 17 00:00:00 2001 From: nadilas Date: Thu, 18 Jun 2020 19:02:18 +0200 Subject: [PATCH 8/8] add helpers --- providers/rabbitmq/helpers.go | 46 +++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/providers/rabbitmq/helpers.go b/providers/rabbitmq/helpers.go index f180429..cb61c3d 100644 --- a/providers/rabbitmq/helpers.go +++ b/providers/rabbitmq/helpers.go @@ -1,5 +1,51 @@ package rabbitmq +import ( + "crypto/sha1" + "fmt" + "os" + + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" +) + +// session composes an amqp.Connection with an amqp.Channel +type session struct { + *amqp.Connection + *amqp.Channel +} + +// identity returns the same host/process unique string for the lifetime of +// this process so that subscriber reconnections reuse the same queue name. +func identity() string { + hostname, err := os.Hostname() + h := sha1.New() + _, _ = fmt.Fprint(h, hostname) + _, _ = fmt.Fprint(h, err) + _, _ = fmt.Fprint(h, os.Getpid()) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +// Close tears the connection down, taking the channel with it. +func (s session) Close() error { + if s.Connection == nil { + return nil + } + return s.Connection.Close() +} + +func warnOnError(err error, msg string) { + if err != nil { + logrus.Warnf("%s: %s", msg, err) + } +} + +func failOnError(err error, msg string) { + if err != nil { + logrus.Fatalf("%s: %s", msg, err) + } +} + // Func is current function name provider, // like `__FUNCTION__` of PHP. func currentFunc() string {