From d6c3d4bf571bcbd9fffd02d5965216e658333bba Mon Sep 17 00:00:00 2001 From: mapogolions Date: Sun, 2 Feb 2025 15:15:21 +0500 Subject: [PATCH 1/3] reproduce actor blocking on duplicate Respond call --- actor/engine_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/actor/engine_test.go b/actor/engine_test.go index a58f36e9..3ec05c94 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -480,3 +480,25 @@ func TestMultipleStops(t *testing.T) { <-done } } + +func TestShouldNotBlockActorOnAccidentalDuplicateRespond(t *testing.T) { + e, err := NewEngine(NewEngineConfig()) + require.NoError(t, err) + pid := e.SpawnFunc(func(ctx *Context) { + msg := ctx.Message() + if s, ok := msg.(string); ok { + if s == "foo" { + ctx.Respond(len(s)) + // missing `return` statement, causing an unintended second response + } + ctx.Respond(len(s)) // sends a duplicate response + } + }, "str", WithID("len")) + + _ = e.Request(pid, "foo", 2*time.Second) // response will be consumed later + resp := e.Request(pid, "barbaz", 2*time.Second) + + r, err := resp.Result() + require.NoError(t, err) + require.Equal(t, 6, r) +} From 1c7caf3851f21fa37dd43504f673ea2e93bfc15a Mon Sep 17 00:00:00 2001 From: mapogolions Date: Sun, 2 Feb 2025 15:22:06 +0500 Subject: [PATCH 2/3] reproduce receiving accidentally sent second result --- actor/engine_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/actor/engine_test.go b/actor/engine_test.go index 3ec05c94..698bb37e 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -502,3 +502,46 @@ func TestShouldNotBlockActorOnAccidentalDuplicateRespond(t *testing.T) { require.NoError(t, err) require.Equal(t, 6, r) } + +func TestShouldNotReceiveAccidentallySentSecondResult(t *testing.T) { + e, err := NewEngine(NewEngineConfig()) + require.NoError(t, err) + done := make(chan struct{}) + pid := e.SpawnFunc(func(ctx *Context) { + msg := ctx.Message() + if s, ok := msg.(string); ok && s == "foo" { + defer close(done) + if s == "foo" { + ctx.Respond(1) + // missing `return` statement, causing an unintended second response + } + select { // sends a duplicate response + case <-time.After(100 * time.Millisecond): + case <-runAsync(func() { + ctx.Respond(2) + }): + } + } + }, "kind") + + resp := e.Request(pid, "foo", 200*time.Millisecond) + <-done + + r, err := resp.Result() + require.NoError(t, err) + require.Equal(t, 1, r) + require.Nil(t, e.Registry.get(resp.pid)) + + r, err = resp.Result() + require.Error(t, err) + require.Nil(t, r) +} + +func runAsync(f func()) <-chan struct{} { + ch := make(chan struct{}) + go func() { + defer close(ch) + f() + }() + return ch +} From 47b938c26076c5c31ec296b3b63aa3ce1f788b1d Mon Sep 17 00:00:00 2001 From: mapogolions Date: Sun, 2 Feb 2025 15:25:36 +0500 Subject: [PATCH 3/3] fix issues caused by duplicate response --- actor/registry.go | 10 ++++++++++ actor/response.go | 6 +++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/actor/registry.go b/actor/registry.go index 8f578b81..f5418773 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -69,3 +69,13 @@ func (r *Registry) add(proc Processer) { r.mu.Unlock() proc.Start() } + +func (r *Registry) remove(pid *PID) bool { + r.mu.Lock() + defer r.mu.Unlock() + _, ok := r.lookup[pid.ID] + if ok { + delete(r.lookup, pid.ID) + } + return ok +} diff --git a/actor/response.go b/actor/response.go index c6493f79..68d4aff3 100644 --- a/actor/response.go +++ b/actor/response.go @@ -40,7 +40,11 @@ func (r *Response) Result() (any, error) { } func (r *Response) Send(_ *PID, msg any, _ *PID) { - r.result <- msg + // Under normal conditions, the method is expected to be called only once. + // To prevent accidental duplicate responses, we promptly remove the process from the registry + if r.engine.Registry.remove(r.pid) { + r.result <- msg + } } func (r *Response) PID() *PID { return r.pid }