Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions backend/postgres/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
# PostgreSQL backend

## Features

### LISTEN/NOTIFY for Reactive Task Polling

The PostgreSQL backend supports reactive task polling using PostgreSQL's LISTEN/NOTIFY feature. When enabled, workers are notified immediately when new tasks become available, instead of polling at regular intervals.

**Benefits:**
- **Faster task execution**: Workers respond immediately to new work
- **Reduced database load**: Fewer polling queries
- **Better scalability**: More efficient use of database connections

**Usage:**
```go
import (
"github.com/cschleiden/go-workflows/backend/postgres"
)

b := postgres.NewPostgresBackend(
"localhost", 5432, "user", "password", "database",
postgres.WithNotifications(true), // Enable LISTEN/NOTIFY
)
```

**How it works:**
1. Database triggers on `pending_events` and `activities` tables send NOTIFY messages when new tasks are inserted
2. The backend maintains dedicated LISTEN connections to PostgreSQL
3. When a notification is received, waiting workers immediately check for new tasks
4. Falls back to traditional polling if no notification is received

See the [postgres-notify sample](../../samples/postgres-notify) for a working example.

## Adding a migration

1. Install [golang-migrate/migrate](https://github.com/golang-migrate/migrate)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Remove LISTEN/NOTIFY triggers

DROP TRIGGER IF EXISTS pending_events_notify ON pending_events;
DROP FUNCTION IF EXISTS notify_pending_event();

DROP TRIGGER IF EXISTS activities_notify ON activities;
DROP FUNCTION IF EXISTS notify_activity();
34 changes: 34 additions & 0 deletions backend/postgres/db/migrations/000002_add_notify_triggers.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- Add triggers for LISTEN/NOTIFY support on workflow tasks

-- Create function to notify when pending events are inserted
CREATE OR REPLACE FUNCTION notify_pending_event() RETURNS TRIGGER AS $$
BEGIN
-- Notify on the workflow_tasks channel with the queue name as payload
-- This SELECT is efficient due to the unique index on (instance_id, execution_id)
PERFORM pg_notify('workflow_tasks', (
SELECT queue FROM instances WHERE instance_id = NEW.instance_id AND execution_id = NEW.execution_id LIMIT 1
)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger on pending_events table
CREATE TRIGGER pending_events_notify
AFTER INSERT ON pending_events
FOR EACH ROW
EXECUTE FUNCTION notify_pending_event();

-- Create function to notify when activities are inserted
CREATE OR REPLACE FUNCTION notify_activity() RETURNS TRIGGER AS $$
BEGIN
-- Notify on the activity_tasks channel with the queue name as payload
PERFORM pg_notify('activity_tasks', NEW.queue);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger on activities table
CREATE TRIGGER activities_notify
AFTER INSERT ON activities
FOR EACH ROW
EXECUTE FUNCTION notify_activity();
214 changes: 214 additions & 0 deletions backend/postgres/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package postgres

import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"github.com/lib/pq" // Required for LISTEN/NOTIFY support
)

const (
workflowTasksChannel = "workflow_tasks"
activityTasksChannel = "activity_tasks"

// Connection parameters for pq.Listener
listenerMinReconnectInterval = 10 * time.Second
listenerMaxReconnectInterval = time.Minute

// Ping interval to keep LISTEN connection alive
listenerPingInterval = 90 * time.Second
)

// notificationListener manages LISTEN/NOTIFY connections for reactive task polling
type notificationListener struct {
dsn string
logger *slog.Logger

workflowListener *pq.Listener
activityListener *pq.Listener

workflowNotify chan struct{}
activityNotify chan struct{}

mu sync.Mutex
started bool
closed bool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func newNotificationListener(dsn string, logger *slog.Logger) *notificationListener {
return &notificationListener{
dsn: dsn,
logger: logger,
workflowNotify: make(chan struct{}, 1),
activityNotify: make(chan struct{}, 1),
}
}

// Start begins listening for notifications
func (nl *notificationListener) Start(ctx context.Context) error {
nl.mu.Lock()
defer nl.mu.Unlock()

if nl.started {
return nil
}

// Create a cancellable context for the handler goroutines
nl.ctx, nl.cancel = context.WithCancel(context.Background())

// Create listener for workflow tasks
nl.workflowListener = pq.NewListener(nl.dsn, listenerMinReconnectInterval, listenerMaxReconnectInterval, func(ev pq.ListenerEventType, err error) {
if err != nil {
nl.logger.Error("workflow listener event", "event", ev, "error", err)
}
})

if err := nl.workflowListener.Listen(workflowTasksChannel); err != nil {
return fmt.Errorf("listening to workflow tasks channel: %w", err)
}

// Create listener for activity tasks
nl.activityListener = pq.NewListener(nl.dsn, listenerMinReconnectInterval, listenerMaxReconnectInterval, func(ev pq.ListenerEventType, err error) {
if err != nil {
nl.logger.Error("activity listener event", "event", ev, "error", err)
}
})

if err := nl.activityListener.Listen(activityTasksChannel); err != nil {
nl.workflowListener.Close()
return fmt.Errorf("listening to activity tasks channel: %w", err)
}

nl.started = true

// Start goroutines to handle notifications
nl.wg.Add(2)
go nl.handleWorkflowNotifications()
go nl.handleActivityNotifications()

return nil
}

// Close stops the listeners
func (nl *notificationListener) Close() error {
nl.mu.Lock()
if nl.closed {
nl.mu.Unlock()
return nil
}
nl.closed = true

// Cancel the context to stop handler goroutines
if nl.cancel != nil {
nl.cancel()
}
nl.mu.Unlock()

// Wait for handler goroutines to finish
nl.wg.Wait()

// Now safe to close listeners and channels
var errs []error
if nl.workflowListener != nil {
if err := nl.workflowListener.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing workflow listener: %w", err))
}
}

if nl.activityListener != nil {
if err := nl.activityListener.Close(); err != nil {
errs = append(errs, fmt.Errorf("closing activity listener: %w", err))
}
}

close(nl.workflowNotify)
close(nl.activityNotify)

if len(errs) > 0 {
return fmt.Errorf("errors closing listeners: %v", errs)
}

return nil
}

func (nl *notificationListener) handleWorkflowNotifications() {
defer nl.wg.Done()

for {
select {
case <-nl.ctx.Done():
return
case notification, ok := <-nl.workflowListener.Notify:
if !ok {
return
}
if notification != nil {
// Non-blocking send to notify channel
select {
case nl.workflowNotify <- struct{}{}:
default:
// Channel already has a pending notification
}
}
case <-time.After(listenerPingInterval):
// Periodic ping to keep connection alive
if err := nl.workflowListener.Ping(); err != nil {
nl.logger.Error("workflow listener ping failed", "error", err)
}
}
}
}

func (nl *notificationListener) handleActivityNotifications() {
defer nl.wg.Done()

for {
select {
case <-nl.ctx.Done():
return
case notification, ok := <-nl.activityListener.Notify:
if !ok {
return
}
if notification != nil {
// Non-blocking send to notify channel
select {
case nl.activityNotify <- struct{}{}:
default:
// Channel already has a pending notification
}
}
case <-time.After(listenerPingInterval):
// Periodic ping to keep connection alive
if err := nl.activityListener.Ping(); err != nil {
nl.logger.Error("activity listener ping failed", "error", err)
}
}
}
}

// WaitForWorkflowTask waits for a workflow task notification or timeout
func (nl *notificationListener) WaitForWorkflowTask(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-nl.workflowNotify:
return true
}
}

// WaitForActivityTask waits for an activity task notification or timeout
func (nl *notificationListener) WaitForActivityTask(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-nl.activityNotify:
return true
}
}
Loading