From 79d2f8bd6cadf09acacc6c8f35baf1491ee3e3d0 Mon Sep 17 00:00:00 2001 From: "vinit.ka" Date: Wed, 31 Dec 2025 14:14:42 +0530 Subject: [PATCH 1/2] feat: add step start and finish events --- src/Enums/StreamEventType.php | 2 + .../Broadcasting/StepFinishBroadcast.php | 7 + .../Broadcasting/StepStartBroadcast.php | 7 + src/Providers/Anthropic/Handlers/Stream.php | 51 +++- src/Providers/DeepSeek/Handlers/Stream.php | 23 ++ src/Providers/Gemini/Handlers/Stream.php | 26 +++ src/Providers/Groq/Handlers/Stream.php | 26 +++ src/Providers/Mistral/Handlers/Stream.php | 23 ++ src/Providers/Ollama/Handlers/Stream.php | 26 +++ src/Providers/OpenAI/Handlers/Stream.php | 24 ++ src/Providers/OpenRouter/Handlers/Stream.php | 23 ++ src/Providers/XAI/Handlers/Stream.php | 23 ++ src/Streaming/Adapters/BroadcastAdapter.php | 6 + .../Adapters/DataProtocolAdapter.php | 24 ++ src/Streaming/Events/StepFinishEvent.php | 30 +++ src/Streaming/Events/StepStartEvent.php | 30 +++ src/Streaming/StreamState.php | 21 ++ src/Testing/PrismFake.php | 29 +++ tests/Providers/Anthropic/StreamTest.php | 77 ++++++ tests/Providers/DeepSeek/StreamTest.php | 76 ++++++ tests/Providers/Gemini/GeminiStreamTest.php | 72 ++++++ tests/Providers/Groq/StreamTest.php | 76 ++++++ tests/Providers/Mistral/StreamTest.php | 76 ++++++ tests/Providers/Ollama/StreamTest.php | 76 ++++++ tests/Providers/OpenAI/StreamTest.php | 74 ++++++ tests/Providers/OpenRouter/StreamTest.php | 109 ++++++++- tests/Providers/XAI/StreamTest.php | 72 ++++++ .../Streaming/PrismStreamIntegrationTest.php | 221 +++++++++++++++++- 28 files changed, 1307 insertions(+), 23 deletions(-) create mode 100644 src/Events/Broadcasting/StepFinishBroadcast.php create mode 100644 src/Events/Broadcasting/StepStartBroadcast.php create mode 100644 src/Streaming/Events/StepFinishEvent.php create mode 100644 src/Streaming/Events/StepStartEvent.php diff --git a/src/Enums/StreamEventType.php b/src/Enums/StreamEventType.php index b0af6d1c8..187d80dbe 100644 --- a/src/Enums/StreamEventType.php +++ b/src/Enums/StreamEventType.php @@ -21,4 +21,6 @@ enum StreamEventType: string case Artifact = 'artifact'; case Error = 'error'; case StreamEnd = 'stream_end'; + case StepStart = 'step_start'; + case StepFinish = 'step_finish'; } diff --git a/src/Events/Broadcasting/StepFinishBroadcast.php b/src/Events/Broadcasting/StepFinishBroadcast.php new file mode 100644 index 000000000..abc2c58b1 --- /dev/null +++ b/src/Events/Broadcasting/StepFinishBroadcast.php @@ -0,0 +1,7 @@ +processEvent($event); if ($streamEvent instanceof Generator) { - yield from $streamEvent; + foreach ($streamEvent as $event) { + yield $event; + } } elseif ($streamEvent instanceof StreamEvent) { yield $streamEvent; } } if ($this->state->hasToolCalls()) { - yield from $this->handleToolCalls($request, $depth); + foreach ($this->handleToolCalls($request, $depth) as $item) { + yield $item; + } return; } + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield $this->emitStreamEndEvent(); } @@ -115,8 +127,9 @@ protected function processEvent(array $event): StreamEvent|Generator|null /** * @param array $event + * @return Generator */ - protected function handleMessageStart(array $event): ?StreamStartEvent + protected function handleMessageStart(array $event): Generator { $message = $event['message'] ?? []; $this->state->withMessageId($message['id'] ?? EventID::generate()); @@ -132,18 +145,25 @@ protected function handleMessageStart(array $event): ?StreamStartEvent } // Only emit StreamStartEvent once per streaming session - if (! $this->state->shouldEmitStreamStart()) { - return null; + if ($this->state->shouldEmitStreamStart()) { + $this->state->markStreamStarted(); + + yield new StreamStartEvent( + id: EventID::generate(), + timestamp: time(), + model: $message['model'] ?? 'unknown', + provider: 'anthropic' + ); } - $this->state->markStreamStarted(); + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); - return new StreamStartEvent( - id: EventID::generate(), - timestamp: time(), - model: $message['model'] ?? 'unknown', - provider: 'anthropic' - ); + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } } /** @@ -544,6 +564,13 @@ protected function handleToolCalls(Request $request, int $depth): Generator $request->addMessage(new ToolResultMessage($toolResults)); + // Emit step finish after tool calls + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + // Continue streaming if within step limit $depth++; if ($depth < $request->maxSteps()) { diff --git a/src/Providers/DeepSeek/Handlers/Stream.php b/src/Providers/DeepSeek/Handlers/Stream.php index 688efce04..2b8694bc6 100644 --- a/src/Providers/DeepSeek/Handlers/Stream.php +++ b/src/Providers/DeepSeek/Handlers/Stream.php @@ -21,6 +21,8 @@ use Prism\Prism\Providers\DeepSeek\Maps\ToolMap; use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -96,6 +98,15 @@ protected function processStream(Response $response, Request $request, int $dept ); } + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + if ($this->hasToolCalls($data)) { $toolCalls = $this->extractToolCalls($data, $toolCalls); @@ -214,6 +225,12 @@ protected function processStream(Response $response, Request $request, int $dept return; } + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), @@ -381,6 +398,12 @@ protected function handleToolCalls(Request $request, string $text, array $toolCa $request->addMessage(new AssistantMessage($text, $mappedToolCalls)); $request->addMessage(new ToolResultMessage($toolResults)); + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + $this->state->resetTextState(); $this->state->withMessageId(EventID::generate()); diff --git a/src/Providers/Gemini/Handlers/Stream.php b/src/Providers/Gemini/Handlers/Stream.php index 87a402d31..b36010b1a 100644 --- a/src/Providers/Gemini/Handlers/Stream.php +++ b/src/Providers/Gemini/Handlers/Stream.php @@ -18,6 +18,8 @@ use Prism\Prism\Providers\Gemini\Maps\ToolMap; use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -100,6 +102,16 @@ protected function processStream(Response $response, Request $request, int $dept $this->state->markStreamStarted(); } + // Emit step start event once per step + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + // Update usage data from each chunk $this->state->withUsage($this->extractUsage($data, $request)); @@ -219,6 +231,13 @@ protected function processStream(Response $response, Request $request, int $dept return; } + // Emit step finish before stream end + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), @@ -356,6 +375,13 @@ protected function handleToolCalls( $request->addMessage(new AssistantMessage($this->state->currentText(), $mappedToolCalls)); $request->addMessage(new ToolResultMessage($toolResults)); + // Emit step finish after tool calls + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + $depth++; if ($depth < $request->maxSteps()) { $previousUsage = $this->state->usage(); diff --git a/src/Providers/Groq/Handlers/Stream.php b/src/Providers/Groq/Handlers/Stream.php index e51777efe..964be0c2d 100644 --- a/src/Providers/Groq/Handlers/Stream.php +++ b/src/Providers/Groq/Handlers/Stream.php @@ -22,6 +22,8 @@ use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; use Prism\Prism\Streaming\Events\ErrorEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -96,6 +98,16 @@ protected function processStream(Response $response, Request $request, int $dept ); } + // Emit step start event once per step + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + if ($this->hasError($data)) { yield from $this->handleErrors($data, $request); @@ -168,6 +180,13 @@ protected function processStream(Response $response, Request $request, int $dept } } + // Emit step finish before stream end + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), @@ -275,6 +294,13 @@ protected function handleToolCalls( $request->addMessage(new AssistantMessage($text, $mappedToolCalls)); $request->addMessage(new ToolResultMessage($toolResults)); + // Emit step finish after tool calls + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + // Reset text state for next response $this->state->resetTextState(); $this->state->withMessageId(EventID::generate()); diff --git a/src/Providers/Mistral/Handlers/Stream.php b/src/Providers/Mistral/Handlers/Stream.php index f206db6ae..b40199638 100644 --- a/src/Providers/Mistral/Handlers/Stream.php +++ b/src/Providers/Mistral/Handlers/Stream.php @@ -21,6 +21,8 @@ use Prism\Prism\Providers\Mistral\Maps\ToolMap; use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -95,6 +97,15 @@ protected function processStream(Response $response, Request $request, int $dept ); } + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + if ($this->hasToolCalls($data)) { $toolCalls = $this->extractToolCalls($data, $toolCalls); @@ -173,6 +184,12 @@ protected function processStream(Response $response, Request $request, int $dept } } + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), @@ -271,6 +288,12 @@ protected function handleToolCalls( $request->addMessage(new AssistantMessage($text, $mappedToolCalls)); $request->addMessage(new ToolResultMessage($toolResults)); + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + $this->state->resetTextState(); $this->state->withMessageId(EventID::generate()); diff --git a/src/Providers/Ollama/Handlers/Stream.php b/src/Providers/Ollama/Handlers/Stream.php index b481413a0..943f8324a 100644 --- a/src/Providers/Ollama/Handlers/Stream.php +++ b/src/Providers/Ollama/Handlers/Stream.php @@ -18,6 +18,8 @@ use Prism\Prism\Providers\Ollama\ValueObjects\OllamaStreamState; use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -91,6 +93,16 @@ protected function processStream(Response $response, Request $request, int $dept $this->state->markStreamStarted()->withMessageId(EventID::generate()); } + // Emit step start event once per step + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + // Accumulate token counts $this->state->addPromptTokens((int) data_get($data, 'prompt_eval_count', 0)); $this->state->addCompletionTokens((int) data_get($data, 'eval_count', 0)); @@ -186,6 +198,13 @@ protected function processStream(Response $response, Request $request, int $dept ); } + // Emit step finish before stream end + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + // Emit stream end event with usage yield new StreamEndEvent( id: EventID::generate(), @@ -292,6 +311,13 @@ protected function handleToolCalls( $request->addMessage(new AssistantMessage($text, $mappedToolCalls)); $request->addMessage(new ToolResultMessage($toolResults)); + // Emit step finish after tool calls + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + // Continue streaming if within step limit $depth++; if ($depth < $request->maxSteps()) { diff --git a/src/Providers/OpenAI/Handlers/Stream.php b/src/Providers/OpenAI/Handlers/Stream.php index 41bd80d52..02bf07595 100644 --- a/src/Providers/OpenAI/Handlers/Stream.php +++ b/src/Providers/OpenAI/Handlers/Stream.php @@ -22,6 +22,8 @@ use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; use Prism\Prism\Streaming\Events\ProviderToolEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -110,6 +112,15 @@ protected function processStream(Response $response, Request $request, int $dept continue; } + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + if ($this->hasReasoningSummaryDelta($data)) { $reasoningDelta = $this->extractReasoningSummaryDelta($data); @@ -261,6 +272,12 @@ protected function processStream(Response $response, Request $request, int $dept return; } + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), @@ -398,6 +415,13 @@ protected function handleToolCalls(Request $request, int $depth): Generator $request->addMessage(new AssistantMessage($this->state->currentText(), $mappedToolCalls)); $request->addMessage(new ToolResultMessage($toolResults)); + // Emit step finish after tool calls + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + $depth++; if ($depth < $request->maxSteps()) { diff --git a/src/Providers/OpenRouter/Handlers/Stream.php b/src/Providers/OpenRouter/Handlers/Stream.php index fa52f68de..4d1a4b672 100644 --- a/src/Providers/OpenRouter/Handlers/Stream.php +++ b/src/Providers/OpenRouter/Handlers/Stream.php @@ -17,6 +17,8 @@ use Prism\Prism\Providers\OpenRouter\Maps\MessageMap; use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -93,6 +95,15 @@ protected function processStream(Response $response, Request $request, int $dept ); } + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + if ($this->hasToolCalls($data)) { $toolCalls = $this->extractToolCalls($data, $toolCalls); @@ -225,6 +236,12 @@ protected function processStream(Response $response, Request $request, int $dept } } + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), @@ -391,6 +408,12 @@ protected function handleToolCalls( $depth++; + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + if ($depth < $request->maxSteps()) { $nextResponse = $this->sendRequest($request); yield from $this->processStream($nextResponse, $request, $depth); diff --git a/src/Providers/XAI/Handlers/Stream.php b/src/Providers/XAI/Handlers/Stream.php index 672d1ca3c..6271c66eb 100644 --- a/src/Providers/XAI/Handlers/Stream.php +++ b/src/Providers/XAI/Handlers/Stream.php @@ -21,6 +21,8 @@ use Prism\Prism\Providers\XAI\Maps\ToolMap; use Prism\Prism\Streaming\EventID; use Prism\Prism\Streaming\Events\ArtifactEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -100,6 +102,15 @@ protected function processStream(Response $response, Request $request, int $dept ); } + if ($this->state->shouldEmitStepStart()) { + $this->state->markStepStarted(); + + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } + $thinkingContent = $this->extractThinking($data, $request); if ($thinkingContent !== '' && $thinkingContent !== '0') { @@ -199,6 +210,12 @@ protected function processStream(Response $response, Request $request, int $dept ); } + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), @@ -368,6 +385,12 @@ protected function handleToolCalls( $request->addMessage(new AssistantMessage($text, $mappedToolCalls)); $request->addMessage(new ToolResultMessage($toolResults)); + $this->state->markStepFinished(); + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + $this->state ->resetTextState() ->withMessageId(EventID::generate()); diff --git a/src/Streaming/Adapters/BroadcastAdapter.php b/src/Streaming/Adapters/BroadcastAdapter.php index c07797033..511d19305 100644 --- a/src/Streaming/Adapters/BroadcastAdapter.php +++ b/src/Streaming/Adapters/BroadcastAdapter.php @@ -12,6 +12,8 @@ use Prism\Prism\Events\Broadcasting\ArtifactBroadcast; use Prism\Prism\Events\Broadcasting\ErrorBroadcast; use Prism\Prism\Events\Broadcasting\ProviderToolEventBroadcast; +use Prism\Prism\Events\Broadcasting\StepFinishBroadcast; +use Prism\Prism\Events\Broadcasting\StepStartBroadcast; use Prism\Prism\Events\Broadcasting\StreamEndBroadcast; use Prism\Prism\Events\Broadcasting\StreamStartBroadcast; use Prism\Prism\Events\Broadcasting\TextCompleteBroadcast; @@ -25,6 +27,8 @@ use Prism\Prism\Streaming\Events\ArtifactEvent; use Prism\Prism\Streaming\Events\ErrorEvent; use Prism\Prism\Streaming\Events\ProviderToolEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -69,6 +73,7 @@ protected function broadcastEvent(StreamEvent $event): ShouldBroadcast { return match ($event::class) { StreamStartEvent::class => new StreamStartBroadcast($event, $this->channels), + StepStartEvent::class => new StepStartBroadcast($event, $this->channels), TextStartEvent::class => new TextStartBroadcast($event, $this->channels), TextDeltaEvent::class => new TextDeltaBroadcast($event, $this->channels), TextCompleteEvent::class => new TextCompleteBroadcast($event, $this->channels), @@ -80,6 +85,7 @@ protected function broadcastEvent(StreamEvent $event): ShouldBroadcast ArtifactEvent::class => new ArtifactBroadcast($event, $this->channels), ProviderToolEvent::class => new ProviderToolEventBroadcast($event, $this->channels), ErrorEvent::class => new ErrorBroadcast($event, $this->channels), + StepFinishEvent::class => new StepFinishBroadcast($event, $this->channels), StreamEndEvent::class => new StreamEndBroadcast($event, $this->channels), default => throw new InvalidArgumentException('Unsupported event type for broadcasting: '.$event::class), }; diff --git a/src/Streaming/Adapters/DataProtocolAdapter.php b/src/Streaming/Adapters/DataProtocolAdapter.php index 022530210..a19109812 100644 --- a/src/Streaming/Adapters/DataProtocolAdapter.php +++ b/src/Streaming/Adapters/DataProtocolAdapter.php @@ -10,6 +10,8 @@ use Prism\Prism\Streaming\Events\ArtifactEvent; use Prism\Prism\Streaming\Events\ErrorEvent; use Prism\Prism\Streaming\Events\ProviderToolEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -126,6 +128,7 @@ protected function handleEventConversion(StreamEvent $event): ?string { $data = match ($event::class) { StreamStartEvent::class => $this->handleStreamStart($event), + StepStartEvent::class => $this->handleStepStart($event), TextStartEvent::class => $this->handleTextStart($event), TextDeltaEvent::class => $this->handleTextDelta($event), TextCompleteEvent::class => $this->handleTextComplete($event), @@ -136,6 +139,7 @@ protected function handleEventConversion(StreamEvent $event): ?string ToolResultEvent::class => $this->handleToolResult($event), ArtifactEvent::class => $this->handleArtifact($event), ProviderToolEvent::class => $this->handleProviderTool($event), + StepFinishEvent::class => $this->handleStepFinish($event), StreamEndEvent::class => $this->handleStreamEnd($event), ErrorEvent::class => $this->handleError($event), default => $this->handleDefault($event), @@ -387,4 +391,24 @@ protected function handleDefault(StreamEvent $event): array 'data' => $event->toArray(), ]; } + + /** + * @return array + */ + protected function handleStepStart(StepStartEvent $event): array + { + return [ + 'type' => 'start-step', + ]; + } + + /** + * @return array + */ + protected function handleStepFinish(StepFinishEvent $event): array + { + return [ + 'type' => 'finish-step', + ]; + } } diff --git a/src/Streaming/Events/StepFinishEvent.php b/src/Streaming/Events/StepFinishEvent.php new file mode 100644 index 000000000..18b6fb322 --- /dev/null +++ b/src/Streaming/Events/StepFinishEvent.php @@ -0,0 +1,30 @@ + $this->id, + 'timestamp' => $this->timestamp, + ]; + } +} diff --git a/src/Streaming/Events/StepStartEvent.php b/src/Streaming/Events/StepStartEvent.php new file mode 100644 index 000000000..aebf59a7b --- /dev/null +++ b/src/Streaming/Events/StepStartEvent.php @@ -0,0 +1,30 @@ + $this->id, + 'timestamp' => $this->timestamp, + ]; + } +} diff --git a/src/Streaming/StreamState.php b/src/Streaming/StreamState.php index abdaf2217..bbe320f97 100644 --- a/src/Streaming/StreamState.php +++ b/src/Streaming/StreamState.php @@ -16,6 +16,8 @@ class StreamState protected bool $streamStarted = false; + protected bool $stepStarted = false; + protected bool $textStarted = false; protected bool $thinkingStarted = false; @@ -95,6 +97,20 @@ public function markStreamStarted(): self return $this; } + public function markStepStarted(): self + { + $this->stepStarted = true; + + return $this; + } + + public function markStepFinished(): self + { + $this->stepStarted = false; + + return $this; + } + public function markTextStarted(): self { $this->textStarted = true; @@ -335,6 +351,11 @@ public function shouldEmitStreamStart(): bool return ! $this->streamStarted; } + public function shouldEmitStepStart(): bool + { + return ! $this->stepStarted; + } + public function shouldEmitTextStart(): bool { return ! $this->textStarted; diff --git a/src/Testing/PrismFake.php b/src/Testing/PrismFake.php index c9437cb8c..b3ba44a51 100644 --- a/src/Testing/PrismFake.php +++ b/src/Testing/PrismFake.php @@ -20,6 +20,8 @@ use Prism\Prism\Moderation\Response as ModerationResponse; use Prism\Prism\Providers\Provider; use Prism\Prism\Streaming\EventID; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -244,7 +246,15 @@ protected function streamEventsFromTextResponse(TextResponse $response, TextRequ provider: 'fake' ); + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + if ($response->steps->isNotEmpty()) { + $stepIndex = 0; + $totalSteps = $response->steps->count(); + foreach ($response->steps as $step) { if ($step->text !== '') { yield new TextStartEvent( @@ -287,6 +297,20 @@ protected function streamEventsFromTextResponse(TextResponse $response, TextRequ success: true ); } + + $stepIndex++; + + // If this step has tool calls/results and there are more steps, end current step and start new one + if (($step->toolCalls !== [] || $step->toolResults !== []) && $stepIndex < $totalSteps) { + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StepStartEvent( + id: EventID::generate(), + timestamp: time() + ); + } } } elseif ($response->text !== '') { yield new TextStartEvent( @@ -309,6 +333,11 @@ protected function streamEventsFromTextResponse(TextResponse $response, TextRequ ); } + yield new StepFinishEvent( + id: EventID::generate(), + timestamp: time() + ); + yield new StreamEndEvent( id: EventID::generate(), timestamp: time(), diff --git a/tests/Providers/Anthropic/StreamTest.php b/tests/Providers/Anthropic/StreamTest.php index 7391619cd..042df445b 100644 --- a/tests/Providers/Anthropic/StreamTest.php +++ b/tests/Providers/Anthropic/StreamTest.php @@ -17,6 +17,8 @@ use Prism\Prism\Facades\Tool; use Prism\Prism\Streaming\Events\CitationEvent; use Prism\Prism\Streaming\Events\ProviderToolEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; @@ -687,3 +689,78 @@ }); }); }); + +describe('step events', function (): void { + it('emits step start and step finish events', function (): void { + FixtureResponse::fakeStreamResponses('v1/messages', 'anthropic/stream-basic-text'); + + $response = Prism::text() + ->using('anthropic', 'claude-3-7-sonnet-20250219') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); + }); + + it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeStreamResponses('v1/messages', 'anthropic/stream-with-tools'); + + $tools = [ + Tool::as('weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + Tool::as('search') + ->for('useful for searching current events or data') + ->withStringParameter('query', 'The detailed search query') + ->using(fn (string $query): string => "Search results for: {$query}"), + ]; + + $response = Prism::text() + ->using(Provider::Anthropic, 'claude-3-7-sonnet-20250219') + ->withTools($tools) + ->withMaxSteps(3) + ->withPrompt('What is the weather in Detroit?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); + }); +}); diff --git a/tests/Providers/DeepSeek/StreamTest.php b/tests/Providers/DeepSeek/StreamTest.php index 3137a5899..1d2a7f5c8 100644 --- a/tests/Providers/DeepSeek/StreamTest.php +++ b/tests/Providers/DeepSeek/StreamTest.php @@ -10,7 +10,10 @@ use Prism\Prism\Enums\Provider; use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; use Prism\Prism\Streaming\Events\ThinkingEvent; @@ -210,3 +213,76 @@ ->and($thinkingContent)->toContain('answer') ->and($regularContent)->toContain('32'); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeStreamResponses('chat/completions', 'deepseek/stream-basic-text'); + + $response = Prism::text() + ->using(Provider::DeepSeek, 'deepseek-chat') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeStreamResponses('chat/completions', 'deepseek/stream-with-tools'); + + $tools = [ + Tool::as('get_weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + Tool::as('search') + ->for('useful for searching current events or data') + ->withStringParameter('query', 'The detailed search query') + ->using(fn (string $query): string => "Search results for: {$query}"), + ]; + + $response = Prism::text() + ->using(Provider::DeepSeek, 'deepseek-chat') + ->withTools($tools) + ->withMaxSteps(4) + ->withPrompt('What is the weather in Detroit?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); +}); diff --git a/tests/Providers/Gemini/GeminiStreamTest.php b/tests/Providers/Gemini/GeminiStreamTest.php index 702538cd1..beb6076b1 100644 --- a/tests/Providers/Gemini/GeminiStreamTest.php +++ b/tests/Providers/Gemini/GeminiStreamTest.php @@ -10,7 +10,10 @@ use Prism\Prism\Enums\Provider; use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; use Prism\Prism\Streaming\Events\ToolCallEvent; @@ -287,3 +290,72 @@ $firstToolResultEvent = array_values($toolResultEvents)[0]; expect($firstToolResultEvent->toolResult)->not->toBeNull(); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeResponseSequence('*', 'gemini/stream-basic-text'); + + $response = Prism::text() + ->using(Provider::Gemini, 'gemini-2.0-flash') + ->withPrompt('Explain how AI works') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeResponseSequence('*', 'gemini/stream-with-tools'); + + $tools = [ + Tool::as('weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + ]; + + $response = Prism::text() + ->using(Provider::Gemini, 'gemini-2.5-flash') + ->withTools($tools) + ->withMaxSteps(3) + ->withPrompt('What is the weather in San Francisco?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); +}); diff --git a/tests/Providers/Groq/StreamTest.php b/tests/Providers/Groq/StreamTest.php index 9336cab2b..20f187af8 100644 --- a/tests/Providers/Groq/StreamTest.php +++ b/tests/Providers/Groq/StreamTest.php @@ -11,7 +11,10 @@ use Prism\Prism\Exceptions\PrismStreamDecodeException; use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; use Prism\Prism\Streaming\Events\ToolCallEvent; @@ -366,3 +369,76 @@ expect($streamEndEvent->usage->promptTokens)->toBe(7); expect($streamEndEvent->usage->completionTokens)->toBe(50); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeStreamResponses('openai/v1/chat/completions', 'groq/stream-basic-text'); + + $response = Prism::text() + ->using(Provider::Groq, 'llama-3.1-70b-versatile') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeStreamResponses('openai/v1/chat/completions', 'groq/stream-with-tools'); + + $tools = [ + Tool::as('weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + Tool::as('search') + ->for('useful for searching current events or data') + ->withStringParameter('query', 'The detailed search query') + ->using(fn (string $query): string => "Search results for: {$query}"), + ]; + + $response = Prism::text() + ->using(Provider::Groq, 'llama-3.1-70b-versatile') + ->withTools($tools) + ->withMaxSteps(4) + ->withPrompt('What is the weather in Detroit?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); +}); diff --git a/tests/Providers/Mistral/StreamTest.php b/tests/Providers/Mistral/StreamTest.php index a504bc111..12947c1ad 100644 --- a/tests/Providers/Mistral/StreamTest.php +++ b/tests/Providers/Mistral/StreamTest.php @@ -11,7 +11,10 @@ use Prism\Prism\Exceptions\PrismStreamDecodeException; use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; use Prism\Prism\Streaming\Events\ToolCallEvent; @@ -353,3 +356,76 @@ expect($streamEndEvent->usage->promptTokens)->toBe(7); expect($streamEndEvent->usage->completionTokens)->toBe(13); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeStreamResponses('v1/chat/completions', 'mistral/stream-basic-text-1'); + + $response = Prism::text() + ->using(Provider::Mistral, 'mistral-small-latest') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeStreamResponses('v1/chat/completions', 'mistral/stream-with-tools-1'); + + $tools = [ + Tool::as('weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + Tool::as('search') + ->for('useful for searching current events or data') + ->withStringParameter('query', 'The detailed search query') + ->using(fn (string $query): string => "Search results for: {$query}"), + ]; + + $response = Prism::text() + ->using(Provider::Mistral, 'mistral-large-latest') + ->withTools($tools) + ->withMaxSteps(4) + ->withPrompt('What is the weather in Detroit?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); +}); diff --git a/tests/Providers/Ollama/StreamTest.php b/tests/Providers/Ollama/StreamTest.php index ce4dfed74..9163d45ed 100644 --- a/tests/Providers/Ollama/StreamTest.php +++ b/tests/Providers/Ollama/StreamTest.php @@ -10,7 +10,10 @@ use Prism\Prism\Exceptions\PrismRateLimitedException; use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; use Prism\Prism\Streaming\Events\ThinkingEvent; @@ -287,3 +290,76 @@ expect($finalText)->toContain('Here is the answer:'); expect($lastFinishReason)->toBe(FinishReason::Stop); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeStreamResponses('api/chat', 'ollama/stream-basic-text'); + + $response = Prism::text() + ->using('ollama', 'granite3-dense:8b') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeStreamResponses('api/chat', 'ollama/stream-with-tools'); + + $tools = [ + Tool::as('weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + Tool::as('search') + ->for('useful for searching current events or data') + ->withStringParameter('query', 'The detailed search query') + ->using(fn (string $query): string => 'The tigers game is at 3pm today'), + ]; + + $response = Prism::text() + ->using('ollama', 'qwen3:14b') + ->withTools($tools) + ->withMaxSteps(6) + ->withPrompt('What is the weather in Detroit?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); +}); diff --git a/tests/Providers/OpenAI/StreamTest.php b/tests/Providers/OpenAI/StreamTest.php index 222150ebe..5a73448c1 100644 --- a/tests/Providers/OpenAI/StreamTest.php +++ b/tests/Providers/OpenAI/StreamTest.php @@ -11,7 +11,10 @@ use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; use Prism\Prism\Streaming\Events\ProviderToolEvent; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; use Prism\Prism\Streaming\Events\ThinkingEvent; @@ -648,3 +651,74 @@ return true; }); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeResponseSequence('v1/responses', 'openai/stream-basic-text-responses'); + + $response = Prism::text() + ->using('openai', 'gpt-4o') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeResponseSequence('v1/responses', 'openai/stream-with-tools-responses'); + + $tools = [ + Tool::as('weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + Tool::as('search') + ->for('useful for searching current events or data') + ->withStringParameter('query', 'The detailed search query') + ->using(fn (string $query): string => "Search results for: {$query}"), + ]; + + $response = Prism::text() + ->using('openai', 'gpt-4o') + ->withTools($tools) + ->withMaxSteps(4) + ->withPrompt('What is the weather in Detroit?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2) + ->and(count($stepFinishEvents))->toBeGreaterThanOrEqual(2) + ->and(count($stepStartEvents))->toBe(count($stepFinishEvents)); +}); diff --git a/tests/Providers/OpenRouter/StreamTest.php b/tests/Providers/OpenRouter/StreamTest.php index 0556091f4..12143bb56 100644 --- a/tests/Providers/OpenRouter/StreamTest.php +++ b/tests/Providers/OpenRouter/StreamTest.php @@ -8,7 +8,10 @@ use Prism\Prism\Enums\Provider; use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextCompleteEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; @@ -50,15 +53,15 @@ expect($events[0]->provider)->toBe('openrouter'); // Check we have TextStartEvent - $textStartEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof TextStartEvent); + $textStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof TextStartEvent); expect($textStartEvents)->toHaveCount(1); // Check we have TextDeltaEvents - $textDeltaEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof TextDeltaEvent); + $textDeltaEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof TextDeltaEvent); expect($textDeltaEvents)->not->toBeEmpty(); // Check we have TextCompleteEvent - $textCompleteEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof TextCompleteEvent); + $textCompleteEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof TextCompleteEvent); expect($textCompleteEvents)->toHaveCount(1); // Check last event is StreamEndEvent @@ -241,7 +244,7 @@ expect($toolResultEvents)->toHaveCount(1); expect($text)->toContain('The current time is '.$currentTime); - $streamEndEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof StreamEndEvent); + $streamEndEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StreamEndEvent); expect($streamEndEvents)->not->toBeEmpty(); }); @@ -272,7 +275,7 @@ expect($events)->not->toBeEmpty(); // Check for ThinkingStartEvent - $thinkingStartEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof ThinkingStartEvent); + $thinkingStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof ThinkingStartEvent); expect($thinkingStartEvents)->toHaveCount(1); // Check for ThinkingEvent @@ -283,10 +286,104 @@ expect($text)->toBe('The answer to 2 + 2 is 4.'); // Check for usage with reasoning tokens - $streamEndEvents = array_filter($events, fn (\Prism\Prism\Streaming\Events\StreamEvent $e): bool => $e instanceof StreamEndEvent); + $streamEndEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StreamEndEvent); expect($streamEndEvents)->toHaveCount(1); $streamEndEvent = array_values($streamEndEvents)[0]; expect($streamEndEvent->usage)->not->toBeNull(); expect($streamEndEvent->usage->thoughtTokens)->toBe(12); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeStreamResponses('v1/chat/completions', 'openrouter/stream-text-with-a-prompt'); + + $response = Prism::text() + ->using(Provider::OpenRouter, 'openai/gpt-4-turbo') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeStreamResponses('v1/chat/completions', 'openrouter/stream-text-with-tools'); + + $weatherTool = Tool::as('weather') + ->for('Get weather for a city') + ->withStringParameter('city', 'The city name') + ->using(fn (string $city): string => "The weather in {$city} is 75°F and sunny"); + + $response = Prism::text() + ->using(Provider::OpenRouter, 'openai/gpt-4-turbo') + ->withTools([$weatherTool]) + ->withMaxSteps(3) + ->withPrompt('What is the weather in San Francisco?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Extract step events + $stepStartEvents = array_values(array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent)); + $stepFinishEvents = array_values(array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent)); + + // Should have 2 steps: tool call step + final response step + expect($stepStartEvents)->toHaveCount(2); + expect($stepFinishEvents)->toHaveCount(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); + + // Verify event ordering using indices + $getIndices = fn (string $class): array => array_keys(array_filter($events, fn (StreamEvent $e): bool => $e instanceof $class)); + + $streamStartIdx = $getIndices(StreamStartEvent::class)[0]; + $stepStartIndices = $getIndices(StepStartEvent::class); + $stepFinishIndices = $getIndices(StepFinishEvent::class); + $toolCallIndices = $getIndices(ToolCallEvent::class); + $toolResultIndices = $getIndices(ToolResultEvent::class); + $streamEndIdx = $getIndices(StreamEndEvent::class)[0]; + + // Verify overall structure: StreamStart -> Steps -> StreamEnd + expect($streamStartIdx)->toBeLessThan($stepStartIndices[0]); + expect($stepFinishIndices[count($stepFinishIndices) - 1])->toBeLessThan($streamEndIdx); + + // Verify each step has proper start/finish ordering + foreach ($stepStartIndices as $i => $startIdx) { + expect($startIdx)->toBeLessThan($stepFinishIndices[$i], "Step $i: start should come before finish"); + } + + // Verify tool call happens within first step (before first step finish) + expect($toolCallIndices[0])->toBeGreaterThan($stepStartIndices[0]); + expect($toolCallIndices[0])->toBeLessThan($stepFinishIndices[0]); + + // Verify tool result happens after tool call but before second step starts + expect($toolResultIndices[0])->toBeGreaterThan($toolCallIndices[0]); + expect($toolResultIndices[0])->toBeLessThan($stepStartIndices[1]); +}); diff --git a/tests/Providers/XAI/StreamTest.php b/tests/Providers/XAI/StreamTest.php index 15d7f8bb6..9da285f96 100644 --- a/tests/Providers/XAI/StreamTest.php +++ b/tests/Providers/XAI/StreamTest.php @@ -9,7 +9,10 @@ use Prism\Prism\Enums\FinishReason; use Prism\Prism\Facades\Prism; use Prism\Prism\Facades\Tool; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; use Prism\Prism\Streaming\Events\ThinkingEvent; @@ -447,3 +450,72 @@ && $body['model'] === 'grok-4'; }); }); + +it('emits step start and step finish events', function (): void { + FixtureResponse::fakeResponseSequence('v1/chat/completions', 'xai/stream-basic-text-responses'); + + $response = Prism::text() + ->using('xai', 'grok-4') + ->withPrompt('Who are you?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // Check for StepStartEvent after StreamStartEvent + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + expect($stepStartEvents)->toHaveCount(1); + + // Check for StepFinishEvent before StreamEndEvent + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + expect($stepFinishEvents)->toHaveCount(1); + + // Verify order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + $eventTypes = array_map(get_class(...), $events); + $streamStartIndex = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIndex = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIndex = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIndex = array_search(StreamEndEvent::class, $eventTypes); + + expect($streamStartIndex)->toBeLessThan($stepStartIndex); + expect($stepStartIndex)->toBeLessThan($stepFinishIndex); + expect($stepFinishIndex)->toBeLessThan($streamEndIndex); +}); + +it('emits multiple step events with tool calls', function (): void { + FixtureResponse::fakeResponseSequence('v1/chat/completions', 'xai/stream-with-tools-responses'); + + $tools = [ + Tool::as('get_weather') + ->for('useful when you need to search for current weather conditions') + ->withStringParameter('city', 'The city that you want the weather for') + ->using(fn (string $city): string => "The weather will be 75° and sunny in {$city}"), + ]; + + $response = Prism::text() + ->using('xai', 'grok-4') + ->withTools($tools) + ->withMaxSteps(4) + ->withPrompt('What is the weather in Detroit?') + ->asStream(); + + $events = []; + + foreach ($response as $event) { + $events[] = $event; + } + + // With tool calls, we should have multiple step start/finish pairs + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // At least 2 steps: one for tool call, one for final response + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Verify step start/finish pairs are balanced + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); +}); diff --git a/tests/Streaming/PrismStreamIntegrationTest.php b/tests/Streaming/PrismStreamIntegrationTest.php index 21a73b0e9..191bb7898 100644 --- a/tests/Streaming/PrismStreamIntegrationTest.php +++ b/tests/Streaming/PrismStreamIntegrationTest.php @@ -5,7 +5,10 @@ use Illuminate\Broadcasting\PrivateChannel; use Prism\Prism\Enums\FinishReason; use Prism\Prism\Facades\Prism; +use Prism\Prism\Streaming\Events\StepFinishEvent; +use Prism\Prism\Streaming\Events\StepStartEvent; use Prism\Prism\Streaming\Events\StreamEndEvent; +use Prism\Prism\Streaming\Events\StreamEvent; use Prism\Prism\Streaming\Events\StreamStartEvent; use Prism\Prism\Streaming\Events\TextCompleteEvent; use Prism\Prism\Streaming\Events\TextDeltaEvent; @@ -33,13 +36,15 @@ $eventArray = iterator_to_array($events); - expect(count($eventArray))->toBeGreaterThanOrEqual(5); + expect(count($eventArray))->toBeGreaterThanOrEqual(7); // StreamStart, StepStart, TextStart, TextDelta(s), TextComplete, StepFinish, StreamEnd expect($eventArray[0])->toBeInstanceOf(StreamStartEvent::class); - expect($eventArray[1])->toBeInstanceOf(TextStartEvent::class); + expect($eventArray[1])->toBeInstanceOf(StepStartEvent::class); + expect($eventArray[2])->toBeInstanceOf(TextStartEvent::class); $lastIndex = count($eventArray) - 1; expect($eventArray[$lastIndex])->toBeInstanceOf(StreamEndEvent::class); - expect($eventArray[$lastIndex - 1])->toBeInstanceOf(TextCompleteEvent::class); + expect($eventArray[$lastIndex - 1])->toBeInstanceOf(StepFinishEvent::class); + expect($eventArray[$lastIndex - 2])->toBeInstanceOf(TextCompleteEvent::class); }); it('asStream yields text delta events with chunked content', function (): void { @@ -181,9 +186,12 @@ $eventArray = iterator_to_array($events); - expect($eventArray)->toHaveCount(2); + // StreamStart, StepStart, StepFinish, StreamEnd (no text events for empty response) + expect($eventArray)->toHaveCount(4); expect($eventArray[0])->toBeInstanceOf(StreamStartEvent::class); - expect($eventArray[1])->toBeInstanceOf(StreamEndEvent::class); + expect($eventArray[1])->toBeInstanceOf(StepStartEvent::class); + expect($eventArray[2])->toBeInstanceOf(StepFinishEvent::class); + expect($eventArray[3])->toBeInstanceOf(StreamEndEvent::class); }); it('asStream handles multi-step responses with text and tool calls', function (): void { @@ -801,3 +809,206 @@ expect($response1)->toBeInstanceOf(StreamedResponse::class); expect($response2)->toBeInstanceOf(StreamedResponse::class); }); + +describe('step events', function (): void { + it('emits step start and finish events for simple text response', function (): void { + Prism::fake([ + TextResponseFake::make()->withText('Hello World'), + ]); + + $events = iterator_to_array( + Prism::text() + ->using('openai', 'gpt-4') + ->withPrompt('Test') + ->asStream() + ); + + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // Single response = 1 step + expect($stepStartEvents)->toHaveCount(1); + expect($stepFinishEvents)->toHaveCount(1); + }); + + it('emits step events in correct order relative to stream events', function (): void { + Prism::fake([ + TextResponseFake::make()->withText('Test message'), + ]); + + $events = iterator_to_array( + Prism::text() + ->using('anthropic', 'claude-3-sonnet') + ->withPrompt('Test') + ->asStream() + ); + + $eventTypes = array_map(fn (StreamEvent $e): string => $e::class, $events); + + $streamStartIdx = array_search(StreamStartEvent::class, $eventTypes); + $stepStartIdx = array_search(StepStartEvent::class, $eventTypes); + $stepFinishIdx = array_search(StepFinishEvent::class, $eventTypes); + $streamEndIdx = array_search(StreamEndEvent::class, $eventTypes); + + // Order: StreamStart -> StepStart -> ... -> StepFinish -> StreamEnd + expect($streamStartIdx)->toBeLessThan($stepStartIdx); + expect($stepStartIdx)->toBeLessThan($stepFinishIdx); + expect($stepFinishIdx)->toBeLessThan($streamEndIdx); + }); + + it('emits multiple step events for multi-step tool call conversation', function (): void { + $toolCall = new ToolCall('tool-1', 'calculator', ['a' => 1, 'b' => 2]); + $toolResult = new ToolResult('tool-1', 'calculator', ['a' => 1, 'b' => 2], ['result' => 3]); + + Prism::fake([ + TextResponseFake::make()->withSteps(collect([ + TextStepFake::make() + ->withText('Let me calculate') + ->withToolCalls([$toolCall]), + TextStepFake::make() + ->withToolResults([$toolResult]), + TextStepFake::make() + ->withText('The answer is 3'), + ])), + ]); + + $events = iterator_to_array( + Prism::text() + ->using('openai', 'gpt-4') + ->withPrompt('What is 1 + 2?') + ->asStream() + ); + + $stepStartEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent); + $stepFinishEvents = array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent); + + // Multiple steps = multiple step events + expect(count($stepStartEvents))->toBeGreaterThanOrEqual(2); + expect(count($stepFinishEvents))->toBeGreaterThanOrEqual(2); + + // Start and finish counts should match + expect(count($stepStartEvents))->toBe(count($stepFinishEvents)); + }); + + it('maintains step start/finish pairing for each step', function (): void { + $toolCall = new ToolCall('tool-1', 'search', ['q' => 'test']); + $toolResult = new ToolResult('tool-1', 'search', ['q' => 'test'], ['found' => true]); + + Prism::fake([ + TextResponseFake::make()->withSteps(collect([ + TextStepFake::make()->withToolCalls([$toolCall]), + TextStepFake::make()->withToolResults([$toolResult]), + TextStepFake::make()->withText('Done'), + ])), + ]); + + $events = iterator_to_array( + Prism::text() + ->using('anthropic', 'claude-3-sonnet') + ->withPrompt('Search') + ->asStream() + ); + + // Get indices of step events + $stepStartIndices = array_keys(array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepStartEvent)); + $stepFinishIndices = array_keys(array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent)); + + // Re-index + $stepStartIndices = array_values($stepStartIndices); + $stepFinishIndices = array_values($stepFinishIndices); + + // Each step start should be followed by its corresponding finish + foreach ($stepStartIndices as $i => $startIdx) { + if (isset($stepFinishIndices[$i])) { + expect($startIdx)->toBeLessThan($stepFinishIndices[$i], + "Step $i: start index ($startIdx) should be less than finish index ({$stepFinishIndices[$i]})"); + } + } + }); + + it('places tool events within step boundaries', function (): void { + $toolCall = new ToolCall('tool-1', 'weather', ['city' => 'NYC']); + $toolResult = new ToolResult('tool-1', 'weather', ['city' => 'NYC'], ['temp' => 72]); + + Prism::fake([ + TextResponseFake::make()->withSteps(collect([ + TextStepFake::make() + ->withText('Checking weather') + ->withToolCalls([$toolCall]), + TextStepFake::make() + ->withToolResults([$toolResult]), + TextStepFake::make() + ->withText('It is 72 degrees'), + ])), + ]); + + $events = iterator_to_array( + Prism::text() + ->using('openai', 'gpt-4') + ->withPrompt('Weather in NYC?') + ->asStream() + ); + + $getFirstIndex = fn (string $class): ?int => array_key_first( + array_filter($events, fn (StreamEvent $e): bool => $e instanceof $class) + ); + + $stepStartIdx = $getFirstIndex(StepStartEvent::class); + $toolCallIdx = $getFirstIndex(ToolCallEvent::class); + $stepFinishIndices = array_keys(array_filter($events, fn (StreamEvent $e): bool => $e instanceof StepFinishEvent)); + + // Tool call should occur after first step start + expect($toolCallIdx)->toBeGreaterThan($stepStartIdx); + + // Tool call should occur before first step finish + expect($toolCallIdx)->toBeLessThan($stepFinishIndices[0]); + }); + + it('step events have valid id and timestamp', function (): void { + Prism::fake([ + TextResponseFake::make()->withText('Test'), + ]); + + $events = iterator_to_array( + Prism::text() + ->using('openai', 'gpt-4') + ->withPrompt('Test') + ->asStream() + ); + + $stepEvents = array_filter( + $events, + fn (StreamEvent $e): bool => $e instanceof StepStartEvent || $e instanceof StepFinishEvent + ); + + foreach ($stepEvents as $event) { + expect($event->id)->toBeString()->not->toBeEmpty(); + expect($event->timestamp)->toBeInt()->toBeGreaterThan(0); + } + }); + + it('step events can be converted to array', function (): void { + Prism::fake([ + TextResponseFake::make()->withText('Test'), + ]); + + $events = iterator_to_array( + Prism::text() + ->using('openai', 'gpt-4') + ->withPrompt('Test') + ->asStream() + ); + + $stepEvents = array_filter( + $events, + fn (StreamEvent $e): bool => $e instanceof StepStartEvent || $e instanceof StepFinishEvent + ); + + foreach ($stepEvents as $event) { + $array = $event->toArray(); + expect($array)->toBeArray(); + expect($array)->toHaveKey('id'); + expect($array)->toHaveKey('timestamp'); + } + }); +}); From 73f6a7bc4c050163ea4f9c4370034011fcdbf457 Mon Sep 17 00:00:00 2001 From: "vinit.ka" Date: Fri, 9 Jan 2026 00:41:39 +0530 Subject: [PATCH 2/2] chore: [ streaming-output.md ] Add step events docs --- docs/core-concepts/streaming-output.md | 31 ++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/docs/core-concepts/streaming-output.md b/docs/core-concepts/streaming-output.md index e24fdbde7..0067f5332 100644 --- a/docs/core-concepts/streaming-output.md +++ b/docs/core-concepts/streaming-output.md @@ -179,6 +179,11 @@ function ChatComponent() { setIsComplete(false); }, + '.step_start': (data) => { + console.log('Step started:', data); + // A new generation cycle is beginning + }, + '.text_start': (data) => { console.log('Text start event received:', data); setCurrentMessage(''); @@ -202,6 +207,11 @@ function ChatComponent() { console.log('Tool result:', data.result); }, + '.step_finish': (data) => { + console.log('Step finished:', data); + // Generation cycle complete, may be followed by another step + }, + '.stream_end': (data) => { console.log('Stream ended:', data.finish_reason); setIsComplete(true); @@ -246,6 +256,7 @@ All streaming approaches emit the same core events with consistent data structur ### Available Events - **`stream_start`** - Stream initialization with model and provider info +- **`step_start`** - Beginning of a generation step (emitted before each AI response cycle) - **`text_start`** - Beginning of a text message - **`text_delta`** - Incremental text chunks as they're generated - **`text_complete`** - End of a complete text message @@ -257,9 +268,13 @@ All streaming approaches emit the same core events with consistent data structur - **`tool_call_delta`** - Incremental tool call params chunks as they're generated - **`artifact`** - Binary artifacts produced by tools (images, audio, files) - **`provider_tool_event`** - Provider-specific tool events (e.g., image generation, web search) +- **`step_finish`** - End of a generation step (emitted after tool calls or before stream end) - **`error`** - Error handling with recovery information - **`stream_end`** - Stream completion with usage statistics +> [!TIP] +> **Understanding Steps**: A "step" represents one cycle of AI generation. In a simple request without tools, there's typically one step. When using tools, each cycle of "AI generates → tools execute → AI continues" creates a new step. Use `step_start` and `step_finish` events to track these cycles in multi-turn tool interactions. + ### Event Data Examples Based on actual streaming output: @@ -277,6 +292,12 @@ Based on actual streaming output: } } +// step_start event +{ + "id": "anthropic_evt_abc123step", + "timestamp": 1756412888 +} + // text_start event { "id": "anthropic_evt_8YI9ULcftpFtHzh3", @@ -338,6 +359,12 @@ Based on actual streaming output: } } +// step_finish event +{ + "id": "anthropic_evt_def456step", + "timestamp": 1756412895 +} + // stream_end event { "id": "anthropic_evt_BZ3rqDYyprnywNyL", @@ -673,12 +700,16 @@ The Vercel AI SDK format provides structured streaming data: ``` data: {"type":"start","messageId":"anthropic_evt_NPbGJs7D0oQhvz2K"} +data: {"type":"start-step"} + data: {"type":"text-start","id":"msg_013P3F8KkVG3Qasjeay3NUmY"} data: {"type":"text-delta","id":"msg_013P3F8KkVG3Qasjeay3NUmY","delta":"Hello"} data: {"type":"text-end","id":"msg_013P3F8KkVG3Qasjeay3NUmY"} +data: {"type":"finish-step"} + data: {"type":"finish","messageMetadata":{"finishReason":"stop","usage":{"promptTokens":1998,"completionTokens":288}}} data: [DONE]