From 391bed44b1696d0ca041df87199b1e1d2124511a Mon Sep 17 00:00:00 2001 From: mapogolions Date: Sat, 25 Jan 2025 17:06:11 +0500 Subject: [PATCH] Use nproc for clarity and add a unit test to verify that the message causing the crash is dropped --- actor/process.go | 19 +++++++++---------- actor/process_test.go | 27 ++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 13 deletions(-) diff --git a/actor/process.go b/actor/process.go index 3de42b79..98aec0fa 100644 --- a/actor/process.go +++ b/actor/process.go @@ -61,33 +61,32 @@ func (p *process) Invoke(msgs []Envelope) { nmsg = len(msgs) // numbers of msgs that are processed. nproc = 0 - // FIXME: We could use nrpoc here, but for some reason placing nproc++ on the - // bottom of the function it freezes some tests. Hence, I created a new counter - // for bookkeeping. - processed = 0 ) defer func() { // If we recovered, we buffer up all the messages that we could not process // so we can retry them on the next restart. if v := recover(); v != nil { + // Processed 'nproc' messages successfully, then encountered a crash on the next message. + // After restart, processing begins with the message following the one that caused the crash. + // 'nrecv' represents the total number of successfully received messages, including the one that caused the crash. + nrecv := nproc + 1 p.context.message = Stopped{} p.context.receiver.Receive(p.context) - p.mbuffer = make([]Envelope, nmsg-nproc) - for i := 0; i < nmsg-nproc; i++ { - p.mbuffer[i] = msgs[i+nproc] + p.mbuffer = make([]Envelope, nmsg-nrecv) + for i := 0; i < nmsg-nrecv; i++ { + p.mbuffer[i] = msgs[i+nrecv] } p.tryRestart(v) } }() for i := 0; i < len(msgs); i++ { - nproc++ msg := msgs[i] if pill, ok := msg.Msg.(poisonPill); ok { // If we need to gracefuly stop, we process all the messages // from the inbox, otherwise we ignore and cleanup. if pill.graceful { - msgsToProcess := msgs[processed:] + msgsToProcess := msgs[nproc:] for _, m := range msgsToProcess { p.invokeMsg(m) } @@ -96,7 +95,7 @@ func (p *process) Invoke(msgs []Envelope) { return } p.invokeMsg(msg) - processed++ + nproc++ } } diff --git a/actor/process_test.go b/actor/process_test.go index c4fcb9a4..7645fc8b 100644 --- a/actor/process_test.go +++ b/actor/process_test.go @@ -9,14 +9,35 @@ import ( "github.com/stretchr/testify/require" ) +type triggerPanic struct { + data int +} + +func Test_ProcessingStartsFromNextMessageAfterRestart(t *testing.T) { + e, err := NewEngine(NewEngineConfig()) + require.NoError(t, err) + done := make(chan struct{}) + pid := e.SpawnFunc(func(c *Context) { + fmt.Printf("Got message type %T\n", c.Message()) + if _, ok := c.Message().(triggerPanic); ok { + // message causing the failure is not processed again. + panicWrapper() + } + if s, ok := c.Message().(string); ok && s == "foo" { + close(done) + } + }, "kind", WithMaxRestarts(1)) + + e.Send(pid, triggerPanic{}) + e.Send(pid, "foo") + <-done +} + // Test_CleanTrace tests that the stack trace is cleaned up correctly and that the function // which triggers the panic is at the top of the stack trace. func Test_CleanTrace(t *testing.T) { e, err := NewEngine(NewEngineConfig()) require.NoError(t, err) - type triggerPanic struct { - data int - } stopCh := make(chan struct{}) pid := e.SpawnFunc(func(c *Context) { fmt.Printf("Got message type %T\n", c.Message())