diff --git a/tester/test_pending_activity_futures_test.go b/tester/test_pending_activity_futures_test.go new file mode 100644 index 00000000..a1252b69 --- /dev/null +++ b/tester/test_pending_activity_futures_test.go @@ -0,0 +1,29 @@ +package tester + +import ( + "context" + "testing" + "time" + + wf "github.com/cschleiden/go-workflows/workflow" + "github.com/stretchr/testify/require" +) + +func TestFoo(t *testing.T) { + wft := NewWorkflowTester[any](WF) + // wft.OnActivity(Act, mock.Anything).Return(nil) + wft.Execute(context.Background()) + require.True(t, wft.WorkflowFinished()) + _, err := wft.WorkflowResult() + require.ErrorContains(t, err, "workflow completed, but there are still pending futures") +} + +func WF(ctx wf.Context) error { + wf.ExecuteActivity[any](ctx, wf.DefaultActivityOptions, Act) + return nil +} + +func Act(context.Context) error { + time.Sleep(10 * time.Second) + return nil +} \ No newline at end of file diff --git a/tester/tester.go b/tester/tester.go index b10e4a3d..2c797be3 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -1,6 +1,7 @@ package tester import ( + "strings" "context" "fmt" "log/slog" @@ -178,6 +179,9 @@ type workflowTester[TResult any] struct { subWorkflowListener func(*core.WorkflowInstance, string) runningActivities int32 + + // Track unregistered activities for pending futures detection + unregisteredActivities []string logger *slog.Logger @@ -376,7 +380,15 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) { result, err := e.ExecuteTask(ctx, t) if err != nil { - panic("Error while executing workflow" + err.Error()) + // Set workflow error and mark as finished + wt.logger.Debug("ExecuteTask returned error", "error", err.Error()) + if !tw.instance.SubWorkflow() { + wt.workflowFinished = true + wt.workflowErr = workflowerrors.FromError(err) + wt.logger.Debug("Set workflow error", "error", wt.workflowErr) + } + e.Close() + continue } e.Close() @@ -395,6 +407,15 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) { wt.workflowFinished = true wt.workflowResult = a.Result wt.workflowErr = a.Error + + // Check for pending futures after workflow completion + // This simulates the scenario where a workflow incorrectly completes + // while activities are still running + if wt.hasPendingActivities() { + // Override the workflow result with a pending futures error + pendingFuturesErr := fmt.Errorf("workflow completed, but there are still pending futures") + wt.workflowErr = workflowerrors.FromError(pendingFuturesErr) + } } case history.EventType_TimerCanceled: @@ -473,6 +494,11 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) { } } +func (wt *workflowTester[TResult]) hasPendingActivities() bool { + // Check if there are any unregistered activities that would create pending futures + return len(wt.unregisteredActivities) > 0 +} + func (wt *workflowTester[TResult]) fireTimer() bool { if len(wt.timers) == 0 { // No timers to fire @@ -659,6 +685,8 @@ func (wt *workflowTester[TResult]) scheduleActivity(wfi *core.WorkflowInstance, var activityErr error var activityResult payload.Payload + + // Execute mocked activity. If an activity is mocked once, we'll never fall back to the original implementation if wt.mockedActivities[e.Name] { afn, err := wt.registry.GetActivity(e.Name) @@ -723,6 +751,13 @@ func (wt *workflowTester[TResult]) scheduleActivity(wfi *core.WorkflowInstance, }) } + // For testing purposes, track unregistered activities separately + isUnregistered := activityErr != nil && strings.Contains(activityErr.Error(), "activity not found") + if isUnregistered { + // Mark this activity as an unregistered activity for later detection + wt.unregisteredActivities = append(wt.unregisteredActivities, e.Name) + } + wt.callbacks <- func() *history.WorkflowEvent { var ne *history.Event diff --git a/tester/tester_pending_futures_test.go b/tester/tester_pending_futures_test.go new file mode 100644 index 00000000..c1722051 --- /dev/null +++ b/tester/tester_pending_futures_test.go @@ -0,0 +1,81 @@ +package tester + +import ( + "context" + "testing" + "time" + + wf "github.com/cschleiden/go-workflows/workflow" + "github.com/stretchr/testify/require" +) + +// Test that timer futures are properly detected as pending when workflow completes without waiting +func TestPendingTimerFutures(t *testing.T) { + wft := NewWorkflowTester[any](workflowWithPendingTimer) + + // This should panic due to pending timer future + require.Panics(t, func() { + wft.Execute(context.Background()) + }, "Expected panic about pending timer futures") +} + +func workflowWithPendingTimer(ctx wf.Context) error { + // Schedule a timer but don't wait for it + wf.ScheduleTimer(ctx, 10*time.Second) + return nil // BUG: Returns without waiting for timer +} + +// This test demonstrates the CORRECT behavior: activities automatically block workflow completion +func TestActivitiesAutomaticallyBlockWorkflowCompletion(t *testing.T) { + wft := NewWorkflowTester[any](workflowWithScheduledActivity) + wft.Registry().RegisterActivity(testActivity) + + // Activities automatically block workflow completion - this is the correct behavior + // The workflow will wait for the activity to complete before finishing + wft.Execute(context.Background()) + require.True(t, wft.WorkflowFinished()) + + result, err := wft.WorkflowResult() + require.NoError(t, err) + require.Nil(t, result) +} + +func workflowWithScheduledActivity(ctx wf.Context) error { + // Schedule activity but don't explicitly wait for it + // The workflow framework automatically waits for activities to complete + wf.ExecuteActivity[string](ctx, wf.DefaultActivityOptions, testActivity) + return nil // This returns after the activity completes (automatic blocking) +} + +func workflowWithPendingActivity(ctx wf.Context) (string, error) { + // Schedule activity but don't explicitly wait for it + wf.ExecuteActivity[string](ctx, wf.DefaultActivityOptions, testActivity) + + // Even though we don't call future.Get(), the workflow framework + // automatically waits for the activity to complete + return "should-not-be-returned", nil +} + +func testActivity(ctx context.Context) (string, error) { + return "activity-result", nil +} + +// Test that workflow properly waits for activities when explicitly using Get() +func TestWorkflowExplicitlyWaitsForActivity(t *testing.T) { + wft := NewWorkflowTester[string](workflowExplicitlyWaiting) + wft.Registry().RegisterActivity(testActivity) + + wft.Execute(context.Background()) + require.True(t, wft.WorkflowFinished()) + + result, err := wft.WorkflowResult() + require.NoError(t, err) + require.Equal(t, "activity-result", result) +} + +func workflowExplicitlyWaiting(ctx wf.Context) (string, error) { + // Schedule activity and explicitly wait for it + future := wf.ExecuteActivity[string](ctx, wf.DefaultActivityOptions, testActivity) + result, err := future.Get(ctx) + return result, err +} \ No newline at end of file diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 37838ff4..ad9c5dbc 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -291,6 +291,7 @@ func (e *executor) executeNewEvents(newEvents []*history.Event) ([]*history.Even if e.workflow.Completed() { defer e.workflowSpan.End() + if e.workflowState.HasPendingFutures() { // This should not happen, provide debug information to the developer var pending []string