diff --git a/action.go b/action.go index 0b838b9..0c4dbc1 100644 --- a/action.go +++ b/action.go @@ -345,7 +345,8 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha inst.CurrentStep(true) } } - + // set cancelcontext + inst.SetCancelContext(ctx) go func() { defer handler.Done() @@ -381,7 +382,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha fa.applyAssertionInterceptor(inst) handler.HandleResult(returnData, err) - } else if inst.Status() == model.FlowStatusFailed { + } else if inst.Status() == model.FlowStatusFailed || inst.Status() == model.FlowStatusCancelled { if inst.TracingContext() != nil { _ = trace.GetTracer().FinishTrace(inst.TracingContext(), inst.GetError()) } @@ -394,6 +395,8 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha logger.Infof("Flow Instance [%s] for event id [%s] completed in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String()) } else if inst.Status() == model.FlowStatusFailed { logger.Infof("Flow Instance [%s] for event id [%s] failed in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String()) + } else if inst.Status() == model.FlowStatusCancelled { + logger.Infof("Flow Instance [%s] for event id [%s] cancelled in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String()) } if stateRecorder != nil { diff --git a/instance/ind_instance.go b/instance/ind_instance.go index 55e439d..34c29a7 100755 --- a/instance/ind_instance.go +++ b/instance/ind_instance.go @@ -1,6 +1,7 @@ package instance import ( + "context" "errors" "fmt" "strconv" @@ -36,6 +37,8 @@ type IndependentInstance struct { startTime time.Time //Instance recorder instRecorder *stateInstanceRecorder + // context with cancel + canctx context.Context } const ( @@ -122,6 +125,13 @@ func (inst *IndependentInstance) ExecutionTime() time.Duration { return time.Since(inst.startTime) } +func (inst *IndependentInstance) GetCancelContext() context.Context { + return inst.canctx +} +func (inst *IndependentInstance) SetCancelContext(ctx context.Context) { + inst.canctx = ctx +} + func (inst *IndependentInstance) GetFlowState(inputs map[string]interface{}) *state.FlowState { retData, _ := inst.GetReturnData() @@ -298,7 +308,8 @@ func (inst *IndependentInstance) DoStep() bool { // track the fact that the work item was removed from the queue inst.changeTracker.WorkItemRemoved(workItem) - + // injecting cancelcontext further + workItem.taskInst.cancelContext = inst.GetCancelContext() inst.execTask(behavior, workItem.taskInst) hasNext = true @@ -523,13 +534,15 @@ func (inst *IndependentInstance) propagateSkip(taskEntries []*model.TaskEntry, a // handleTaskError handles the completion of a task in the Flow Instance func (inst *IndependentInstance) handleTaskError(taskBehavior model.TaskBehavior, taskInst *TaskInst, err error) { - if taskInst.traceContext != nil { _ = trace.GetTracer().FinishTrace(taskInst.traceContext, err) } // Set task status to failed for subflow activity - taskInst.SetStatus(model.TaskStatusFailed) - + if errors.Is(err, context.Canceled) { + taskInst.SetStatus(model.TaskStatusCancelled) + } else { + taskInst.SetStatus(model.TaskStatusFailed) + } handled, taskEntries := taskBehavior.Error(taskInst, err) containerInst := taskInst.flowInst @@ -615,8 +628,11 @@ func (inst *IndependentInstance) HandleGlobalError(containerInst *Instance, err } else { // Print error message if no error handler inst.logger.Error(err) - containerInst.SetStatus(model.FlowStatusFailed) - + if errors.Is(err, context.Canceled) { + containerInst.SetStatus(model.FlowStatusCancelled) + } else { + containerInst.SetStatus(model.FlowStatusFailed) + } if containerInst != inst.Instance { // Complete SubflowCreated trace diff --git a/instance/taskevents.go b/instance/taskevents.go index 46a30f8..f43b960 100644 --- a/instance/taskevents.go +++ b/instance/taskevents.go @@ -84,6 +84,8 @@ func convertTaskStatus(code model.TaskStatus) event.Status { return event.STARTED case model.TaskStatusFailed: return event.FAILED + case model.TaskStatusCancelled: + return event.CANCELLED case model.TaskStatusDone: return event.COMPLETED case model.TaskStatusWaiting: diff --git a/instance/taskinst.go b/instance/taskinst.go index 60d2273..2e464e4 100644 --- a/instance/taskinst.go +++ b/instance/taskinst.go @@ -1,8 +1,8 @@ package instance import ( + "context" "fmt" - "github.com/project-flogo/flow/support" "runtime/debug" "time" @@ -14,6 +14,7 @@ import ( "github.com/project-flogo/core/support/trace" "github.com/project-flogo/flow/definition" "github.com/project-flogo/flow/model" + "github.com/project-flogo/flow/support" ) func NewTaskInst(flowInst *Instance, task *definition.Task) *TaskInst { @@ -73,6 +74,8 @@ type TaskInst struct { returnError error traceContext trace.TracingContext + cancelContext context.Context + //needed for serialization taskID string } @@ -149,6 +152,10 @@ func (ti *TaskInst) GetTracingContext() trace.TracingContext { return ti.traceContext } +func (ti *TaskInst) GetCancelContext() context.Context { + return ti.cancelContext +} + func (ti *TaskInst) GetSharedTempData() map[string]interface{} { //todo implement return nil @@ -361,7 +368,27 @@ func (ti *TaskInst) EvalActivity() (done bool, evalErr error) { // If output interceptor is there then the activity should be mocked and activity evaluation should be skipped. // In the applyOutputInterceptor step the mock data will be applied to the activity if !hasOutputInterceptor(ti) { - done, evalErr = actCfg.Activity.Eval(ctx) + type evalReturnObject struct { + done bool + evalerror error + } + + evalchan := make(chan evalReturnObject) + + go func(evchan chan evalReturnObject) { + done, evalErr = actCfg.Activity.Eval(ctx) + evalchan <- evalReturnObject{done: done, evalerror: evalErr} + }(evalchan) + + select { + case evalresult := <-evalchan: + done = evalresult.done + evalErr = evalresult.evalerror + + case <-ctx.GetCancelContext().Done(): + done = false + evalErr = ctx.GetCancelContext().Err() + } if evalErr != nil { e, ok := evalErr.(*activity.Error) @@ -606,11 +633,15 @@ func NewErrorObj(taskId string, msg string) map[string]interface{} { return map[string]interface{}{"activity": taskId, "message": msg, "type": "unknown", "code": "", "data": nil} } -//DEPRECATED +// DEPRECATED type LegacyCtx struct { task *TaskInst } +func (l *LegacyCtx) GetCancelContext() context.Context { + return l.task.cancelContext +} + func (l *LegacyCtx) GetOutput(name string) interface{} { val, ok := l.task.outputs[name] if ok { diff --git a/model/status.go b/model/status.go index c63d38e..310d4a1 100644 --- a/model/status.go +++ b/model/status.go @@ -38,6 +38,9 @@ const ( // TaskStatusSkipped indicates that the Task was skipped TaskStatusSkipped TaskStatus = 50 + // TaskStatusFailed indicates that the Task failed + TaskStatusCancelled TaskStatus = 70 + // TaskStatusFailed indicates that the Task failed TaskStatusFailed TaskStatus = 100