From 15b8d64ae9dc5f43f6d2942b7fe2193233bc8f4d Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 18 Nov 2025 10:10:49 +0100 Subject: [PATCH 1/2] CRE-1325 --- pkg/workflows/dontime/plugin.go | 8 ++++++-- pkg/workflows/dontime/request.go | 15 +++++++++++---- pkg/workflows/dontime/transmitter.go | 4 ++-- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/workflows/dontime/plugin.go b/pkg/workflows/dontime/plugin.go index ece80d7ddd..240e8e6c26 100644 --- a/pkg/workflows/dontime/plugin.go +++ b/pkg/workflows/dontime/plugin.go @@ -75,7 +75,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, if req.ExpiryTime().Before(timeoutCheck) { // Request has been sitting in queue too long p.store.RemoveRequest(req.WorkflowExecutionID) - req.SendTimeout(nil) + ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime()) + req.SendTimeout(ctx) + cancel() continue } @@ -89,7 +91,8 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, if req.SeqNum > numObservedDonTimes { p.store.RemoveRequest(req.WorkflowExecutionID) - req.SendResponse(nil, + ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime()) + req.SendResponse(ctx, Response{ WorkflowExecutionID: req.WorkflowExecutionID, SeqNum: req.SeqNum, @@ -97,6 +100,7 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, Err: fmt.Errorf("requested seqNum %d for executionID %s is greater than the number of observed don times %d", req.SeqNum, req.WorkflowExecutionID, numObservedDonTimes), }) + cancel() continue } diff --git a/pkg/workflows/dontime/request.go b/pkg/workflows/dontime/request.go index 381fb3196b..52bc6d020c 100644 --- a/pkg/workflows/dontime/request.go +++ b/pkg/workflows/dontime/request.go @@ -25,21 +25,28 @@ func (r *Request) ExpiryTime() time.Time { return r.ExpiresAt } -func (r *Request) SendResponse(_ context.Context, resp Response) { +func (r *Request) SendResponse(ctx context.Context, resp Response) { select { case r.CallbackCh <- resp: close(r.CallbackCh) - default: // Don't block trying to send + default: + // Don't block if receiver not ready, but check if context is actually expired + select { + case <-ctx.Done(): + // Context cancelled or deadline exceeded before send + default: + // Try once more without blocking + } } } -func (r *Request) SendTimeout(_ context.Context) { +func (r *Request) SendTimeout(ctx context.Context) { timeoutResponse := Response{ WorkflowExecutionID: r.WorkflowExecutionID, SeqNum: r.SeqNum, Err: fmt.Errorf("timeout exceeded: could not process request before expiry, workflowExecutionID %s", r.WorkflowExecutionID), } - r.SendResponse(nil, timeoutResponse) + r.SendResponse(ctx, timeoutResponse) } func (r *Request) Copy() *Request { diff --git a/pkg/workflows/dontime/transmitter.go b/pkg/workflows/dontime/transmitter.go index 0f31e5aee2..99ee961b2b 100644 --- a/pkg/workflows/dontime/transmitter.go +++ b/pkg/workflows/dontime/transmitter.go @@ -26,7 +26,7 @@ func NewTransmitter(lggr logger.Logger, store *Store, fromAccount types.Account) return &Transmitter{lggr: lggr, store: store, fromAccount: fromAccount} } -func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error { +func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error { outcome := &pb.Outcome{} if err := proto.Unmarshal(r.Report, outcome); err != nil { t.lggr.Errorf("failed to unmarshal report") @@ -51,7 +51,7 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64 if len(donTimes.Timestamps) > request.SeqNum { donTime := donTimes.Timestamps[request.SeqNum] t.store.RemoveRequest(executionID) // Make space for next request before delivering - request.SendResponse(nil, Response{ + request.SendResponse(ctx, Response{ WorkflowExecutionID: executionID, SeqNum: request.SeqNum, Timestamp: donTime, From 44c1d345b9ca02e3b745aed5be991f74eb3ee23f Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 18 Nov 2025 12:08:36 +0100 Subject: [PATCH 2/2] shorter --- pkg/workflows/dontime/plugin.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/workflows/dontime/plugin.go b/pkg/workflows/dontime/plugin.go index 240e8e6c26..611875bb04 100644 --- a/pkg/workflows/dontime/plugin.go +++ b/pkg/workflows/dontime/plugin.go @@ -75,9 +75,7 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext, if req.ExpiryTime().Before(timeoutCheck) { // Request has been sitting in queue too long p.store.RemoveRequest(req.WorkflowExecutionID) - ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime()) - req.SendTimeout(ctx) - cancel() + req.SendTimeout(context.Background()) continue }