Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions providers/rabbitmq/grpcsync_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package grpcsync implements additional synchronization primitives built upon
// the sync package.
package grpcsync

import (
"sync"
"sync/atomic"
)

// Event represents a one-time event that may occur in the future.
type Event struct {
fired int32
c chan struct{}
o sync.Once
}

// Fire causes e to complete. It is safe to call multiple times, and
// concurrently. It returns true iff this call to Fire caused the signaling
// channel returned by Done to close.
func (e *Event) Fire() bool {
ret := false
e.o.Do(func() {
atomic.StoreInt32(&e.fired, 1)
close(e.c)
ret = true
})
return ret
}

// Done returns a channel that will be closed when Fire is called.
func (e *Event) Done() <-chan struct{} {
return e.c
}

// HasFired returns true if Fire has been called.
func (e *Event) HasFired() bool {
return atomic.LoadInt32(&e.fired) == 1
}

// NewEvent returns a new, ready-to-use Event.
func NewEvent() *Event {
return &Event{c: make(chan struct{})}
}
56 changes: 56 additions & 0 deletions providers/rabbitmq/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package rabbitmq

import (
"crypto/sha1"
"fmt"
"os"

"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)

// session composes an amqp.Connection with an amqp.Channel
type session struct {
*amqp.Connection
*amqp.Channel
}

// identity returns the same host/process unique string for the lifetime of
// this process so that subscriber reconnections reuse the same queue name.
func identity() string {
hostname, err := os.Hostname()
h := sha1.New()
_, _ = fmt.Fprint(h, hostname)
_, _ = fmt.Fprint(h, err)
_, _ = fmt.Fprint(h, os.Getpid())
return fmt.Sprintf("%x", h.Sum(nil))
}

// Close tears the connection down, taking the channel with it.
func (s session) Close() error {
if s.Connection == nil {
return nil
}
return s.Connection.Close()
}

func warnOnError(err error, msg string) {
if err != nil {
logrus.Warnf("%s: %s", msg, err)
}
}

func failOnError(err error, msg string) {
if err != nil {
logrus.Fatalf("%s: %s", msg, err)
}
}

// Func is current function name provider,
// like `__FUNCTION__` of PHP.
func currentFunc() string {
pc, _, _, _ := runtime.Caller(depthOfFunctionCaller)
fn := runtime.FuncForPC(pc)
elems := strings.Split(fn.Name(), ".")
return elems[len(elems)-1]
}
Loading