From d964beed606b22e274e411a83e3c4c1f06760642 Mon Sep 17 00:00:00 2001 From: vanilcha Date: Tue, 21 Mar 2023 11:07:34 +0530 Subject: [PATCH 1/3] cancel context for activities when app is stopped --- action.go | 7 +++++-- instance/ind_instance.go | 23 ++++++++++++++++++----- instance/taskinst.go | 37 ++++++++++++++++++++++++++++++++++--- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/action.go b/action.go index 8ecc8d68..398410df 100644 --- a/action.go +++ b/action.go @@ -342,7 +342,8 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha inst.CurrentStep(true) } } - + // set cancelcontext + inst.SetCancelContext(ctx) go func() { defer handler.Done() @@ -378,7 +379,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha fa.applyAssertionInterceptor(inst) handler.HandleResult(returnData, err) - } else if inst.Status() == model.FlowStatusFailed { + } else if inst.Status() == model.FlowStatusFailed || inst.Status() == model.FlowStatusCancelled { if inst.TracingContext() != nil { _ = trace.GetTracer().FinishTrace(inst.TracingContext(), inst.GetError()) } @@ -391,6 +392,8 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha logger.Infof("Flow Instance [%s] for event id [%s] completed in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String()) } else if inst.Status() == model.FlowStatusFailed { logger.Infof("Flow Instance [%s] for event id [%s] failed in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String()) + } else if inst.Status() == model.FlowStatusCancelled { + logger.Infof("Flow Instance [%s] for event id [%s] cancelled in %s", inst.ID(), trigger.GetHandlerEventIdFromContext(ctx), inst.ExecutionTime().String()) } if stateRecorder != nil { diff --git a/instance/ind_instance.go b/instance/ind_instance.go index b2105e65..9bfd7caa 100755 --- a/instance/ind_instance.go +++ b/instance/ind_instance.go @@ -1,6 +1,7 @@ package instance import ( + "context" "errors" "fmt" "strconv" @@ -36,6 +37,8 @@ type IndependentInstance struct { startTime time.Time //Instance recorder instRecorder *stateInstanceRecorder + // context with cancel + canctx context.Context } const ( @@ -122,6 +125,13 @@ func (inst *IndependentInstance) ExecutionTime() time.Duration { return time.Since(inst.startTime) } +func (inst *IndependentInstance) GetCancelContext() context.Context { + return inst.canctx +} +func (inst *IndependentInstance) SetCancelContext(ctx context.Context) { + inst.canctx = ctx +} + func (inst *IndependentInstance) GetFlowState(inputs map[string]interface{}) *state.FlowState { return &state.FlowState{ UserId: flowsupport.GetUserName(), @@ -294,7 +304,8 @@ func (inst *IndependentInstance) DoStep() bool { // track the fact that the work item was removed from the queue inst.changeTracker.WorkItemRemoved(workItem) - + // injecting cancelcontext further + workItem.taskInst.cancelContext = inst.GetCancelContext() inst.execTask(behavior, workItem.taskInst) hasNext = true @@ -519,7 +530,6 @@ func (inst *IndependentInstance) propagateSkip(taskEntries []*model.TaskEntry, a // handleTaskError handles the completion of a task in the Flow Instance func (inst *IndependentInstance) handleTaskError(taskBehavior model.TaskBehavior, taskInst *TaskInst, err error) { - if taskInst.traceContext != nil { _ = trace.GetTracer().FinishTrace(taskInst.traceContext, err) } @@ -611,8 +621,11 @@ func (inst *IndependentInstance) HandleGlobalError(containerInst *Instance, err } else { // Print error message if no error handler inst.logger.Error(err) - containerInst.SetStatus(model.FlowStatusFailed) - + if errors.Is(err, context.Canceled) { + containerInst.SetStatus(model.FlowStatusCancelled) + } else { + containerInst.SetStatus(model.FlowStatusFailed) + } if containerInst != inst.Instance { // Complete SubflowCreated trace @@ -727,7 +740,7 @@ func getFlowModel(flow *definition.Definition) (*model.FlowModel, error) { } -//// Restart indicates that this FlowInstance was restarted +// // Restart indicates that this FlowInstance was restarted func (inst *IndependentInstance) Restart(logger log.Logger, id string, initStepId int) error { inst.id = id inst.logger = logger diff --git a/instance/taskinst.go b/instance/taskinst.go index 60d2273d..2e464e4a 100644 --- a/instance/taskinst.go +++ b/instance/taskinst.go @@ -1,8 +1,8 @@ package instance import ( + "context" "fmt" - "github.com/project-flogo/flow/support" "runtime/debug" "time" @@ -14,6 +14,7 @@ import ( "github.com/project-flogo/core/support/trace" "github.com/project-flogo/flow/definition" "github.com/project-flogo/flow/model" + "github.com/project-flogo/flow/support" ) func NewTaskInst(flowInst *Instance, task *definition.Task) *TaskInst { @@ -73,6 +74,8 @@ type TaskInst struct { returnError error traceContext trace.TracingContext + cancelContext context.Context + //needed for serialization taskID string } @@ -149,6 +152,10 @@ func (ti *TaskInst) GetTracingContext() trace.TracingContext { return ti.traceContext } +func (ti *TaskInst) GetCancelContext() context.Context { + return ti.cancelContext +} + func (ti *TaskInst) GetSharedTempData() map[string]interface{} { //todo implement return nil @@ -361,7 +368,27 @@ func (ti *TaskInst) EvalActivity() (done bool, evalErr error) { // If output interceptor is there then the activity should be mocked and activity evaluation should be skipped. // In the applyOutputInterceptor step the mock data will be applied to the activity if !hasOutputInterceptor(ti) { - done, evalErr = actCfg.Activity.Eval(ctx) + type evalReturnObject struct { + done bool + evalerror error + } + + evalchan := make(chan evalReturnObject) + + go func(evchan chan evalReturnObject) { + done, evalErr = actCfg.Activity.Eval(ctx) + evalchan <- evalReturnObject{done: done, evalerror: evalErr} + }(evalchan) + + select { + case evalresult := <-evalchan: + done = evalresult.done + evalErr = evalresult.evalerror + + case <-ctx.GetCancelContext().Done(): + done = false + evalErr = ctx.GetCancelContext().Err() + } if evalErr != nil { e, ok := evalErr.(*activity.Error) @@ -606,11 +633,15 @@ func NewErrorObj(taskId string, msg string) map[string]interface{} { return map[string]interface{}{"activity": taskId, "message": msg, "type": "unknown", "code": "", "data": nil} } -//DEPRECATED +// DEPRECATED type LegacyCtx struct { task *TaskInst } +func (l *LegacyCtx) GetCancelContext() context.Context { + return l.task.cancelContext +} + func (l *LegacyCtx) GetOutput(name string) interface{} { val, ok := l.task.outputs[name] if ok { From 106d4a0bad3da5dad4c4e2f4d0407c0defe220d4 Mon Sep 17 00:00:00 2001 From: vanilcha Date: Mon, 3 Jul 2023 17:49:52 +0530 Subject: [PATCH 2/3] added cancel task status code. --- instance/ind_instance.go | 7 +++++-- model/status.go | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/instance/ind_instance.go b/instance/ind_instance.go index bd90afcb..34c29a79 100755 --- a/instance/ind_instance.go +++ b/instance/ind_instance.go @@ -538,8 +538,11 @@ func (inst *IndependentInstance) handleTaskError(taskBehavior model.TaskBehavior _ = trace.GetTracer().FinishTrace(taskInst.traceContext, err) } // Set task status to failed for subflow activity - taskInst.SetStatus(model.TaskStatusFailed) - + if errors.Is(err, context.Canceled) { + taskInst.SetStatus(model.TaskStatusCancelled) + } else { + taskInst.SetStatus(model.TaskStatusFailed) + } handled, taskEntries := taskBehavior.Error(taskInst, err) containerInst := taskInst.flowInst diff --git a/model/status.go b/model/status.go index c63d38e6..05c9caed 100644 --- a/model/status.go +++ b/model/status.go @@ -41,6 +41,9 @@ const ( // TaskStatusFailed indicates that the Task failed TaskStatusFailed TaskStatus = 100 + // TaskStatusFailed indicates that the Task failed + TaskStatusCancelled TaskStatus = 150 + // LinkStatusFalse indicates that the Link evaluated to false LinkStatusFalse LinkStatus = 1 From a7764552626d626cda62a81f2db8a00b817e6547 Mon Sep 17 00:00:00 2001 From: vanilcha Date: Thu, 6 Jul 2023 16:01:00 +0530 Subject: [PATCH 3/3] convert cancelled context status for event --- instance/taskevents.go | 2 ++ model/status.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/instance/taskevents.go b/instance/taskevents.go index 46a30f88..f43b960f 100644 --- a/instance/taskevents.go +++ b/instance/taskevents.go @@ -84,6 +84,8 @@ func convertTaskStatus(code model.TaskStatus) event.Status { return event.STARTED case model.TaskStatusFailed: return event.FAILED + case model.TaskStatusCancelled: + return event.CANCELLED case model.TaskStatusDone: return event.COMPLETED case model.TaskStatusWaiting: diff --git a/model/status.go b/model/status.go index 05c9caed..310d4a18 100644 --- a/model/status.go +++ b/model/status.go @@ -39,10 +39,10 @@ const ( TaskStatusSkipped TaskStatus = 50 // TaskStatusFailed indicates that the Task failed - TaskStatusFailed TaskStatus = 100 + TaskStatusCancelled TaskStatus = 70 // TaskStatusFailed indicates that the Task failed - TaskStatusCancelled TaskStatus = 150 + TaskStatusFailed TaskStatus = 100 // LinkStatusFalse indicates that the Link evaluated to false LinkStatusFalse LinkStatus = 1