Skip to content

rakunlabs/alan

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

alan

License Coverage GitHub Workflow Status Go Report Card Go PKG

UDP peer discovery and communication library for Go with optional ChaCha20-Poly1305 encryption.

Features

  • DNS-based peer discovery - Resolve a DNS name to discover cluster members
  • Automatic membership - JOIN/LEAVE/HEARTBEAT protocol for peer tracking
  • Encrypted communication - Optional ChaCha20-Poly1305 authenticated encryption
  • Request-Reply pattern - Send requests and wait for responses from peers
  • Distributed locking - Named locks with automatic release on peer disconnect
  • Quorum support - Configurable quorum requirement for distributed operations
  • Simple API - Start(), Send(), Stop() - that's it
  • Callbacks - Get notified when peers join or leave
  • Auto-refresh - Optionally re-resolve DNS to discover new peers

Installation

go get github.com/rakunlabs/alan

Quick Start

package main

import (
    "context"
    "fmt"
    "net"
    "github.com/rakunlabs/alan"
)

func main() {
    // Create Alan instance
    a, err := alan.New(alan.Config{
        DNSAddr: "my-cluster.local",  // DNS name for peer discovery
        Port:    5000,
    })
    if err != nil {
        panic(err)
    }

    // Optional: Get notified when peers join/leave
    a.OnPeerJoin(func(addr *net.UDPAddr) {
        fmt.Printf("Peer joined: %s\n", addr)
    })
    a.OnPeerLeave(func(addr *net.UDPAddr) {
        fmt.Printf("Peer left: %s\n", addr)
    })

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start in background
    go func() {
        a.Start(ctx, func(ctx context.Context, msg alan.Message) {
            fmt.Printf("Received from %s: %s\n", msg.Addr, msg.Data)
        })
    }()

    // Send to all peers (waits for quorum if configured)
    a.Send(ctx, []byte("Hello everyone!"))

    // Send to specific peer (direct send, no quorum check)
    a.SendTo(specificAddr, []byte("Hello you!"))

    // Graceful shutdown
    a.Stop()
}

With Encryption

a, err := alan.New(alan.Config{
    DNSAddr: "my-cluster.local",
    Port:    5000,
    Security: &alan.SecurityConfig{
        Key:     []byte("12345678901234567890123456789012"), // 32 bytes
        Enabled: true,
    },
})

All messages (including membership protocol) are automatically encrypted.

Request-Reply Pattern

Alan supports a request-reply pattern for scenarios where you need responses from peers:

Send to All Peers and Collect Responses

// Broadcast request to all peers and wait for their responses
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

replies, err := a.SendAndWaitReply(ctx, []byte("status-request"))
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
    log.Fatal(err)
}

for _, reply := range replies {
    fmt.Printf("Response from %s: %s\n", reply.Addr, reply.Data)
}

Send to Specific Peer and Wait for Response

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

reply, err := a.SendToAndWaitReply(ctx, peerAddr, []byte("ping"))
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Got response: %s\n", reply.Data)

Handling Requests (Responder Side)

a.Start(ctx, func(ctx context.Context, msg alan.Message) {
    if msg.IsRequest() {
        // This is a request expecting a reply
        response := processRequest(msg.Data)
        a.Reply(msg, response)
    } else {
        // Regular fire-and-forget message
        handleMessage(msg.Data)
    }
})

Notes on Request-Reply

  • Smart peer tracking: The library tracks which peers you're waiting for responses from
  • Early return on disconnect: If a peer disconnects (gracefully or via heartbeat timeout) while waiting, the library automatically adjusts:
    • SendAndWaitReply: Removes the disconnected peer from expected responses and returns when all remaining peers have responded
    • SendToAndWaitReply: Returns immediately with ErrPeerDisconnected if the target peer disconnects
  • No infinite waits: Because peer disconnects are detected via the membership protocol, requests won't wait forever for unresponsive peers
  • The request ID correlation is handled automatically by the library

Distributed Locking

Alan provides distributed named locks for coordinating work across peers:

Basic Lock Usage

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Acquire a named lock (blocks until acquired or context cancelled)
err := a.Lock(ctx, "my-job")
if err != nil {
    log.Fatal("failed to acquire lock:", err)
}

// Do protected work...
doExclusiveWork()

// Release the lock
err = a.Unlock("my-job")
if err != nil {
    log.Fatal("failed to release lock:", err)
}

TryLock (Non-blocking)

// Try to acquire lock without blocking
if a.TryLock("my-job") {
    // Got the lock
    defer a.Unlock("my-job")
    doWork()
} else {
    // Lock is held by another peer
    log.Println("could not acquire lock")
}

Lock Features

  • Named locks: Multiple independent locks identified by key
  • Auto-release: Locks are automatically released when the holder disconnects
  • Quorum-aware: When quorum is enabled, Lock() waits for quorum before acquiring
  • Context support: Lock() respects context cancellation and deadlines

Lock Limitations

The distributed lock provides best-effort coordination, not strong consistency:

  • Split-brain possible: During network partitions, multiple peers might acquire the same lock
  • No fencing tokens: There's no mechanism to prove lock ownership to external systems
  • Startup race: If peers start simultaneously before discovering each other, both might acquire a lock

Use quorum configuration to mitigate the startup race condition.

Quorum

Quorum ensures operations only proceed when enough peers are present in the cluster:

Configuration

a, err := alan.New(alan.Config{
    DNSAddr: "my-cluster.local",
    Port:    5000,
    Quorum:  3, // Expected cluster size
})

With Quorum: 3, operations require (3/2)+1 = 2 peers to be present.

Quorum Setting Required Peers
0 (default) Disabled
1 1
2 2
3 2
4 3
5 3

Quorum-Aware Operations

Operation Quorum Behavior
Lock(ctx, key) Waits for quorum, then acquires lock
TryLock(key) Returns false if quorum not met
Send(ctx, data) Waits for quorum, then broadcasts
SendAndWaitReply(ctx, data) Waits for quorum, then broadcasts
SendTo(addr, data) No quorum check (direct send)

Checking Quorum Status

// Check if quorum is currently met
if a.HasQuorum() {
    // Safe to proceed
}

// Get required peer count
required := a.QuorumSize() // Returns (Quorum/2)+1

// Wait for quorum before starting work
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := a.WaitForQuorum(ctx); err != nil {
    log.Fatal("cluster not ready:", err)
}

Configuration

type Config struct {
    // DNSAddr is the DNS name to resolve for discovering peers (required)
    DNSAddr string
    
    // BindAddr is the local IP address to bind to (default: "0.0.0.0" for all interfaces)
    // Useful when running multiple instances on same machine with different IPs
    BindAddr string
    
    // Port is the UDP port to use (default: 5000)
    // IMPORTANT: All peers in the cluster MUST use the same port
    Port int
    
    // Timeout is the read/write timeout (default: 5s)
    Timeout time.Duration
    
    // BufferSize for receiving messages (default: 4096)
    BufferSize int
    
    // HeartbeatInterval - how often to send heartbeats (default: 5s)
    HeartbeatInterval time.Duration
    
    // HeartbeatTimeout - when a peer is considered dead (default: 15s)
    HeartbeatTimeout time.Duration
    
    // RefreshInterval - how often to re-resolve DNS (default: 30s, set to -1 to disable)
    // Note: Refresh only adds new peers; stale peers are removed via heartbeat timeout
    RefreshInterval time.Duration
    
    // MessageQueueSize - per-peer message buffer size (default: 256)
    // Messages from the same peer are processed in order.
    // When the queue is full, the listener blocks until space is available.
    MessageQueueSize int
    
    // Quorum - expected cluster size for distributed operations (default: 0 = disabled)
    // When set, operations like Lock() and SendCtx() wait until (Quorum/2)+1 peers are present
    Quorum int
    
    // Security for encryption (optional)
    Security *SecurityConfig
}

type SecurityConfig struct {
    // Key must be exactly 32 bytes for ChaCha20-Poly1305
    Key     []byte
    Enabled bool
}

Note: All peers in the cluster must use the same port. DNS only provides IP addresses, so the library assumes all peers listen on the configured port.

How It Works

Peer Discovery

  1. On Start(), the library resolves DNSAddr to get initial peer IPs
  2. Sends JOIN message to all discovered peers
  3. Other peers add the new member to their peer list

Membership Protocol

The library uses a simple internal protocol:

Message Purpose
JOIN Announce joining the cluster
LEAVE Announce graceful departure
HEARTBEAT Periodic keepalive
DATA User data message
REQUEST Request message expecting a response
RESPONSE Response to a request message
LOCK_REQUEST Request to acquire a distributed lock
LOCK_GRANT Grant lock to requester
LOCK_DENY Deny lock (already held)
LOCK_RELEASE Notify lock has been released
  • JOIN: Sent on startup to all known peers
  • HEARTBEAT: Sent every HeartbeatInterval to all peers
  • LEAVE: Sent on Stop() to notify peers of graceful shutdown
  • Timeout: Peers not seen within HeartbeatTimeout are removed

Message Ordering

Messages from the same peer are guaranteed to be processed in order:

  • Each peer has a dedicated message queue (per-peer channel)
  • A worker goroutine processes messages from each queue sequentially
  • This ensures DATA and REQUEST messages from the same peer are handled in the order received
  • Queue size is configurable via MessageQueueSize (default: 256)
  • When a queue is full, the listener blocks (backpressure)
  • Queues are automatically cleaned up when peers leave or timeout

Peer Event Ordering

Peer join/leave events are also processed in order:

  • A single event queue handles all OnPeerJoin and OnPeerLeave callbacks
  • Events are processed sequentially by a dedicated worker
  • When the queue is full, the listener blocks (backpressure)
  • This ensures handlers see events in the order they occurred

Security

When encryption is enabled:

  • All messages (JOIN/LEAVE/HEARTBEAT/DATA) are encrypted
  • Uses XChaCha20-Poly1305 (AEAD)
  • Random 24-byte nonce per message
  • Wire format: [nonce:24][ciphertext+tag]

API Reference

Alan

Method Description
New(Config) Create new Alan instance
OnPeerJoin(handler) Set callback for peer join events
OnPeerLeave(handler) Set callback for peer leave events
Start(ctx, handler) Start the peer discovery system (blocking)
Stop() Gracefully stop and notify peers
Send(ctx, data) Send data to all peers (waits for quorum if enabled)
SendTo(addr, data) Send data to a specific peer (no quorum check)
SendAndWaitReply(ctx, data) Send request to all peers and wait for responses
SendToAndWaitReply(ctx, addr, data) Send request to specific peer and wait for response
Reply(msg, data) Send response to a request message
Lock(ctx, key) Acquire a distributed lock (blocking)
TryLock(key) Try to acquire a lock (non-blocking)
Unlock(key) Release a distributed lock
HasQuorum() Check if quorum is currently met
WaitForQuorum(ctx) Block until quorum is reached
QuorumSize() Get required peer count for quorum
Peers() Get list of current peer addresses
PeerCount() Get number of connected peers
Refresh() Manually re-resolve DNS
Ready() Returns channel closed when ready to send/receive
LocalAddr() Get local listening address
IsSecure() Check if encryption is enabled
Config() Get current configuration

Types

// Message received from a peer
type Message struct {
    Data []byte       // Decrypted payload
    Addr *net.UDPAddr // Sender's address
}

// Check if message is a request expecting a reply
func (m Message) IsRequest() bool

// Reply received from a peer (for request-reply pattern)
type Reply struct {
    Data []byte       // Response payload
    Addr *net.UDPAddr // Responder's address
}

// Result of sending to a peer
type SendResult struct {
    Addr  *net.UDPAddr
    Sent  int
    Error error
}

// Callbacks
type PeerHandler func(addr *net.UDPAddr)
type MessageHandler func(ctx context.Context, msg Message)

// Errors
var ErrPeerDisconnected = errors.New("peer disconnected before responding")
var ErrNoQuorum = errors.New("quorum not reached")
var ErrLockNotHeld = errors.New("lock not held by this instance")

License

MIT License - see LICENSE for details.

About

distributed communication

Resources

License

Stars

Watchers

Forks

Packages

No packages published