Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
35adf1a
implement valkey using glide valkey SDK
DerkSchooltink Nov 25, 2025
3cdb78e
add queue tests for valkey integration
DerkSchooltink Nov 25, 2025
08de4da
add workflow test for valkey integration
DerkSchooltink Nov 25, 2025
c487fc7
change type of NewValkeyBackend to be compatible with signal backend …
DerkSchooltink Nov 25, 2025
728e4e9
attempt to solve crossslot problems in valkey clusters when dequeueing
DerkSchooltink Nov 25, 2025
87708bc
swap glide SDK for valkey-go SDK
DerkSchooltink Nov 25, 2025
f6696cd
pass equal amount of ids as keys in xreadgroup
DerkSchooltink Nov 25, 2025
f6440ec
make keys cluster-safe (and keep compatibility with non-cluster-mode)
DerkSchooltink Nov 25, 2025
94aeb80
Revert "make keys cluster-safe (and keep compatibility with non-clust…
DerkSchooltink Nov 25, 2025
6ad7660
dont return completing workflow when removing canceled timers
DerkSchooltink Nov 25, 2025
549d44b
add valkey nil check to readInstance
DerkSchooltink Nov 25, 2025
611f099
add block to xreadgroup in dequeing, and add some debug in the error …
DerkSchooltink Nov 25, 2025
fb9cb3d
return internal errors in diag endpoints
DerkSchooltink Nov 25, 2025
f6519b4
modify GetWorkflowInstances to use BYSCORE limitting
DerkSchooltink Nov 25, 2025
bd6ead1
simplify dequeue
DerkSchooltink Nov 25, 2025
e0586d7
reverse zrange scoring of getworkflow instances
DerkSchooltink Nov 25, 2025
167baa5
use zmscore, and also use proper min values for zrange
DerkSchooltink Nov 25, 2025
f45557d
use prepare lua script instead of sdk directly
DerkSchooltink Nov 25, 2025
ac20609
remove hashtag (in favor of optional keyprefix hashing)
DerkSchooltink Nov 25, 2025
58b37bf
bring back lua scripts for queues
DerkSchooltink Nov 25, 2025
f4217bf
revert some minor changes to redis impl. that are unnecessary
DerkSchooltink Nov 25, 2025
f85e196
avoid shadowing keys name
DerkSchooltink Nov 25, 2025
9bcabf1
treat scores as float slice not a single float
DerkSchooltink Dec 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,46 @@ jobs:
${{ github.workspace }}/report.xml
if: always()

test_valkey:
runs-on: ubuntu-latest
needs: build

steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.24
check-latest: true
cache: true

- name: Start Valkey (Docker)
run: |
docker run -d --name valkey -p 6379:6379 valkey/valkey:latest valkey-server --requirepass ValkeyPassw0rd

- name: Wait for Valkey readiness
run: |
for i in {1..60}; do
if docker exec valkey valkey-cli -a RedisPassw0rd PING | grep -q PONG; then
echo "Valkey is ready"; exit 0; fi;
sleep 1;
done
echo "Valkey did not become ready in time";
docker logs valkey || true
exit 1

- name: Tests (valkey backend, integration)
run: |
go test -tags=valkey_integration -timeout 240s -race -count 1 -v github.com/cschleiden/go-workflows/backend/valkey 2>&1 | go tool go-junit-report -set-exit-code -iocopy -out "${{ github.workspace }}/report.xml"

- name: Test Summary
uses: test-summary/action@v2
with:
paths: |
${{ github.workspace }}/report.xml
if: always()

test_sqlite:
runs-on: ubuntu-latest
needs: build
Expand Down
80 changes: 80 additions & 0 deletions backend/valkey/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package valkey

import (
"context"
"fmt"

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/workflow"
)

func (vb *valkeyBackend) PrepareActivityQueues(ctx context.Context, queues []workflow.Queue) error {
return vb.activityQueue.Prepare(ctx, vb.client, queues)
}

func (vb *valkeyBackend) GetActivityTask(ctx context.Context, queues []workflow.Queue) (*backend.ActivityTask, error) {
activityTask, err := vb.activityQueue.Dequeue(ctx, vb.client, queues, vb.options.ActivityLockTimeout, vb.options.BlockTimeout)
if err != nil {
return nil, err
}

if activityTask == nil {
return nil, nil
}

return &backend.ActivityTask{
WorkflowInstance: activityTask.Data.Instance,
Queue: workflow.Queue(activityTask.Data.Queue),
ID: activityTask.TaskID,
ActivityID: activityTask.Data.ID,
Event: activityTask.Data.Event,
}, nil
}

func (vb *valkeyBackend) ExtendActivityTask(ctx context.Context, task *backend.ActivityTask) error {
if err := vb.activityQueue.Extend(ctx, vb.client, task.Queue, task.ID); err != nil {
return err
}

return nil
}

func (vb *valkeyBackend) CompleteActivityTask(ctx context.Context, task *backend.ActivityTask, result *history.Event) error {
instance, err := readInstance(ctx, vb.client, vb.keys.instanceKey(task.WorkflowInstance))
if err != nil {
return err
}

eventData, payload, err := marshalEvent(result)
if err != nil {
return err
}

activityQueueKeys := vb.activityQueue.Keys(task.Queue)
workflowQueueKeys := vb.workflowQueue.Keys(workflow.Queue(instance.Queue))

err = completeActivityTaskScript.Exec(ctx, vb.client, []string{
activityQueueKeys.SetKey,
activityQueueKeys.StreamKey,
vb.keys.pendingEventsKey(task.WorkflowInstance),
vb.keys.payloadKey(task.WorkflowInstance),
vb.workflowQueue.queueSetKey,
workflowQueueKeys.SetKey,
workflowQueueKeys.StreamKey,
}, []string{
task.ID,
vb.activityQueue.groupName,
result.ID,
eventData,
payload,
vb.workflowQueue.groupName,
instanceSegment(task.WorkflowInstance),
}).Error()

if err != nil {
return fmt.Errorf("completing activity task: %w", err)
}

return nil
}
31 changes: 31 additions & 0 deletions backend/valkey/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package valkey

import (
"context"
"fmt"

"github.com/cschleiden/go-workflows/core"
)

// deleteInstance deletes an instance from Valkey. It does not attempt to remove any future events or pending
// workflow tasks. It's assumed that the instance is in the finished state.
//
// Note: might want to revisit this in the future if we want to support removing hung instances.
func (vb *valkeyBackend) deleteInstance(ctx context.Context, instance *core.WorkflowInstance) error {
err := deleteInstanceScript.Exec(ctx, vb.client, []string{
vb.keys.instanceKey(instance),
vb.keys.pendingEventsKey(instance),
vb.keys.historyKey(instance),
vb.keys.payloadKey(instance),
vb.keys.activeInstanceExecutionKey(instance.InstanceID),
vb.keys.instancesByCreation(),
}, []string{
instanceSegment(instance),
}).Error()

if err != nil {
return fmt.Errorf("failed to delete instance: %w", err)
}

return nil
}
100 changes: 100 additions & 0 deletions backend/valkey/diagnostics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package valkey

import (
"context"
"encoding/json"
"fmt"

"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/diag"
"github.com/cschleiden/go-workflows/internal/log"
)

var _ diag.Backend = (*valkeyBackend)(nil)

func (vb *valkeyBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
zrangeCmd := vb.client.B().Zrange().Key(vb.keys.instancesByCreation()).Min("+inf").Max("-inf").Byscore().Rev().Limit(0, int64(count))
if afterInstanceID != "" {
afterSegmentID := instanceSegment(core.NewWorkflowInstance(afterInstanceID, afterExecutionID))
scores, err := vb.client.Do(ctx, vb.client.B().Zmscore().Key(vb.keys.instancesByCreation()).Member(afterSegmentID).Build()).AsFloatSlice()
if err != nil {
return nil, fmt.Errorf("getting instance score for %v: %w", afterSegmentID, err)
}

if len(scores) == 0 || scores[0] == 0 {
vb.Options().Logger.Error("could not find instance %v",
log.NamespaceKey+".valkey.afterInstanceID", afterInstanceID,
log.NamespaceKey+".valkey.afterExecutionID", afterExecutionID,
)
return nil, nil
}

zrangeCmd = vb.client.B().Zrange().Key(vb.keys.instancesByCreation()).Min("+inf").Max(fmt.Sprintf("(%f", scores[0])).Byscore().Rev().Limit(0, int64(count))
}

instanceSegments, err := vb.client.Do(ctx, zrangeCmd.Build()).AsStrSlice()
if err != nil {
return nil, fmt.Errorf("getting instances: %w", err)
}

if len(instanceSegments) == 0 {
return nil, nil
}

instanceKeys := make([]string, 0)
for _, r := range instanceSegments {
instanceKeys = append(instanceKeys, vb.keys.instanceKeyFromSegment(r))
}

cmd := vb.client.B().Mget().Key(instanceKeys...)
instances, err := vb.client.Do(ctx, cmd.Build()).AsStrSlice()
if err != nil {
return nil, fmt.Errorf("getting instances: %w", err)
}

instanceRefs := make([]*diag.WorkflowInstanceRef, 0, len(instances))
for _, instance := range instances {
if instance == "" {
continue
}

var state instanceState
if err := json.Unmarshal([]byte(instance), &state); err != nil {
return nil, fmt.Errorf("unmarshaling instance state: %w", err)
}

instanceRefs = append(instanceRefs, &diag.WorkflowInstanceRef{
Instance: state.Instance,
CreatedAt: state.CreatedAt,
CompletedAt: state.CompletedAt,
State: state.State,
Queue: state.Queue,
})
}

return instanceRefs, nil
}

func (vb *valkeyBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
instanceState, err := readInstance(ctx, vb.client, vb.keys.instanceKey(instance))
if err != nil {
return nil, err
}

return mapWorkflowInstance(instanceState), nil
}

func (vb *valkeyBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
itb := diag.NewInstanceTreeBuilder(vb)
return itb.BuildWorkflowInstanceTree(ctx, instance)
}

func mapWorkflowInstance(instance *instanceState) *diag.WorkflowInstanceRef {
return &diag.WorkflowInstanceRef{
Instance: instance.Instance,
CreatedAt: instance.CreatedAt,
CompletedAt: instance.CompletedAt,
State: instance.State,
Queue: instance.Queue,
}
}
117 changes: 117 additions & 0 deletions backend/valkey/diagnostics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package valkey

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/backend/test"
"github.com/cschleiden/go-workflows/client"
"github.com/cschleiden/go-workflows/diag"
"github.com/stretchr/testify/require"
"github.com/valkey-io/valkey-go"
)

func getClient() valkey.Client {
newClient, _ := valkey.NewClient(valkey.ClientOption{
InitAddress: []string{"localhost:6379"},
Password: "ValkeyPassw0rd",
SelectDB: 0,
})
return newClient
}

func getCreateBackend(client valkey.Client, additionalOptions ...BackendOption) func(options ...backend.BackendOption) test.TestBackend {
return func(options ...backend.BackendOption) test.TestBackend {
// Flush database
if err := client.Do(context.Background(), client.B().Flushdb().Build()).Error(); err != nil {
panic(err)
}

r, err := client.Do(context.Background(), client.B().Keys().Pattern("*").Build()).AsStrSlice()
if err != nil {
panic(err)
}

if len(r) > 0 {
panic("Keys should've been empty" + strings.Join(r, ", "))
}

redisOptions := []BackendOption{
WithBlockTimeout(time.Millisecond * 10),
WithBackendOptions(options...),
}

redisOptions = append(redisOptions, additionalOptions...)

b, err := NewValkeyBackend(client, redisOptions...)
if err != nil {
panic(err)
}

return b
}
}

var _ test.TestBackend = (*valkeyBackend)(nil)

// GetFutureEvents
func (vb *valkeyBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
r, err := vb.client.Do(ctx, vb.client.B().Zrangebyscore().Key(vb.keys.futureEventsKey()).Min("-inf").Max("+inf").Build()).AsStrSlice()
if err != nil {
return nil, fmt.Errorf("getting future events: %w", err)
}

events := make([]*history.Event, 0)

for _, eventID := range r {
eventStr, err := vb.client.Do(ctx, vb.client.B().Hget().Key(eventID).Field("event").Build()).AsBytes()
if err != nil {
return nil, fmt.Errorf("getting event %v: %w", eventID, err)
}

var event *history.Event
if err := json.Unmarshal(eventStr, &event); err != nil {
return nil, fmt.Errorf("unmarshaling event %v: %w", eventID, err)
}

events = append(events, event)
}

return events, nil
}

func Test_Diag_GetWorkflowInstances(t *testing.T) {
if testing.Short() {
t.Skip()
}

c := getClient()
t.Cleanup(func() { c.Close() })

vc := getCreateBackend(c)()

bd := vc.(diag.Backend)

ctx := context.Background()
instances, err := bd.GetWorkflowInstances(ctx, "", "", 5)
require.NoError(t, err)
require.Empty(t, instances)

cl := client.New(bd)

_, err = cl.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
InstanceID: "ex1",
}, "some-workflow")
require.NoError(t, err)

instances, err = bd.GetWorkflowInstances(ctx, "", "", 5)
require.NoError(t, err)
require.Len(t, instances, 1)
require.Equal(t, "ex1", instances[0].Instance.InstanceID)
}
Loading