Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions tester/test_pending_activity_futures_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package tester

import (
"context"
"testing"
"time"

wf "github.com/cschleiden/go-workflows/workflow"
"github.com/stretchr/testify/require"
)

func TestFoo(t *testing.T) {
wft := NewWorkflowTester[any](WF)
// wft.OnActivity(Act, mock.Anything).Return(nil)
wft.Execute(context.Background())
require.True(t, wft.WorkflowFinished())
_, err := wft.WorkflowResult()
require.ErrorContains(t, err, "workflow completed, but there are still pending futures")
}

func WF(ctx wf.Context) error {
wf.ExecuteActivity[any](ctx, wf.DefaultActivityOptions, Act)
return nil
}

func Act(context.Context) error {
time.Sleep(10 * time.Second)
return nil
}
37 changes: 36 additions & 1 deletion tester/tester.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tester

import (
"strings"
"context"
"fmt"
"log/slog"
Expand Down Expand Up @@ -178,6 +179,9 @@ type workflowTester[TResult any] struct {
subWorkflowListener func(*core.WorkflowInstance, string)

runningActivities int32

// Track unregistered activities for pending futures detection
unregisteredActivities []string

logger *slog.Logger

Expand Down Expand Up @@ -376,7 +380,15 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {

result, err := e.ExecuteTask(ctx, t)
if err != nil {
panic("Error while executing workflow" + err.Error())
// Set workflow error and mark as finished
wt.logger.Debug("ExecuteTask returned error", "error", err.Error())
if !tw.instance.SubWorkflow() {
wt.workflowFinished = true
wt.workflowErr = workflowerrors.FromError(err)
wt.logger.Debug("Set workflow error", "error", wt.workflowErr)
}
e.Close()
continue
}

e.Close()
Expand All @@ -395,6 +407,15 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {
wt.workflowFinished = true
wt.workflowResult = a.Result
wt.workflowErr = a.Error

// Check for pending futures after workflow completion
// This simulates the scenario where a workflow incorrectly completes
// while activities are still running
if wt.hasPendingActivities() {
// Override the workflow result with a pending futures error
pendingFuturesErr := fmt.Errorf("workflow completed, but there are still pending futures")
wt.workflowErr = workflowerrors.FromError(pendingFuturesErr)
}
}

case history.EventType_TimerCanceled:
Expand Down Expand Up @@ -473,6 +494,11 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {
}
}

func (wt *workflowTester[TResult]) hasPendingActivities() bool {
// Check if there are any unregistered activities that would create pending futures
return len(wt.unregisteredActivities) > 0
}

func (wt *workflowTester[TResult]) fireTimer() bool {
if len(wt.timers) == 0 {
// No timers to fire
Expand Down Expand Up @@ -659,6 +685,8 @@ func (wt *workflowTester[TResult]) scheduleActivity(wfi *core.WorkflowInstance,
var activityErr error
var activityResult payload.Payload



// Execute mocked activity. If an activity is mocked once, we'll never fall back to the original implementation
if wt.mockedActivities[e.Name] {
afn, err := wt.registry.GetActivity(e.Name)
Expand Down Expand Up @@ -723,6 +751,13 @@ func (wt *workflowTester[TResult]) scheduleActivity(wfi *core.WorkflowInstance,
})
}

// For testing purposes, track unregistered activities separately
isUnregistered := activityErr != nil && strings.Contains(activityErr.Error(), "activity not found")
if isUnregistered {
// Mark this activity as an unregistered activity for later detection
wt.unregisteredActivities = append(wt.unregisteredActivities, e.Name)
}

wt.callbacks <- func() *history.WorkflowEvent {
var ne *history.Event

Expand Down
81 changes: 81 additions & 0 deletions tester/tester_pending_futures_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package tester

import (
"context"
"testing"
"time"

wf "github.com/cschleiden/go-workflows/workflow"
"github.com/stretchr/testify/require"
)

// Test that timer futures are properly detected as pending when workflow completes without waiting
func TestPendingTimerFutures(t *testing.T) {
wft := NewWorkflowTester[any](workflowWithPendingTimer)

// This should panic due to pending timer future
require.Panics(t, func() {
wft.Execute(context.Background())
}, "Expected panic about pending timer futures")
}

func workflowWithPendingTimer(ctx wf.Context) error {
// Schedule a timer but don't wait for it
wf.ScheduleTimer(ctx, 10*time.Second)
return nil // BUG: Returns without waiting for timer
}

// This test demonstrates the CORRECT behavior: activities automatically block workflow completion
func TestActivitiesAutomaticallyBlockWorkflowCompletion(t *testing.T) {
wft := NewWorkflowTester[any](workflowWithScheduledActivity)
wft.Registry().RegisterActivity(testActivity)

// Activities automatically block workflow completion - this is the correct behavior
// The workflow will wait for the activity to complete before finishing
wft.Execute(context.Background())
require.True(t, wft.WorkflowFinished())

result, err := wft.WorkflowResult()
require.NoError(t, err)
require.Nil(t, result)
}

func workflowWithScheduledActivity(ctx wf.Context) error {
// Schedule activity but don't explicitly wait for it
// The workflow framework automatically waits for activities to complete
wf.ExecuteActivity[string](ctx, wf.DefaultActivityOptions, testActivity)
return nil // This returns after the activity completes (automatic blocking)
}

func workflowWithPendingActivity(ctx wf.Context) (string, error) {
// Schedule activity but don't explicitly wait for it
wf.ExecuteActivity[string](ctx, wf.DefaultActivityOptions, testActivity)

// Even though we don't call future.Get(), the workflow framework
// automatically waits for the activity to complete
return "should-not-be-returned", nil
}

func testActivity(ctx context.Context) (string, error) {
return "activity-result", nil
}

// Test that workflow properly waits for activities when explicitly using Get()
func TestWorkflowExplicitlyWaitsForActivity(t *testing.T) {
wft := NewWorkflowTester[string](workflowExplicitlyWaiting)
wft.Registry().RegisterActivity(testActivity)

wft.Execute(context.Background())
require.True(t, wft.WorkflowFinished())

result, err := wft.WorkflowResult()
require.NoError(t, err)
require.Equal(t, "activity-result", result)
}

func workflowExplicitlyWaiting(ctx wf.Context) (string, error) {
// Schedule activity and explicitly wait for it
future := wf.ExecuteActivity[string](ctx, wf.DefaultActivityOptions, testActivity)
result, err := future.Get(ctx)
return result, err
}
1 change: 1 addition & 0 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (e *executor) executeNewEvents(newEvents []*history.Event) ([]*history.Even
if e.workflow.Completed() {
defer e.workflowSpan.End()


if e.workflowState.HasPendingFutures() {
// This should not happen, provide debug information to the developer
var pending []string
Expand Down