Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Enums/StreamEventType.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ enum StreamEventType: string
case Citation = 'citation';
case Error = 'error';
case StreamEnd = 'stream_end';
case StepStart = 'step_start';
case StepFinish = 'step_finish';
}
7 changes: 7 additions & 0 deletions src/Events/Broadcasting/StepFinishBroadcast.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

declare(strict_types=1);

namespace Prism\Prism\Events\Broadcasting;

class StepFinishBroadcast extends StreamEventBroadcast {}
7 changes: 7 additions & 0 deletions src/Events/Broadcasting/StepStartBroadcast.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

declare(strict_types=1);

namespace Prism\Prism\Events\Broadcasting;

class StepStartBroadcast extends StreamEventBroadcast {}
51 changes: 39 additions & 12 deletions src/Providers/Anthropic/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use Prism\Prism\Streaming\Events\CitationEvent;
use Prism\Prism\Streaming\Events\ErrorEvent;
use Prism\Prism\Streaming\Events\ProviderToolEvent;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -77,18 +79,28 @@ protected function processStream(Response $response, Request $request, int $dept
$streamEvent = $this->processEvent($event);

if ($streamEvent instanceof Generator) {
yield from $streamEvent;
foreach ($streamEvent as $event) {
yield $event;
}
} elseif ($streamEvent instanceof StreamEvent) {
yield $streamEvent;
}
}

if ($this->state->hasToolCalls()) {
yield from $this->handleToolCalls($request, $depth);
foreach ($this->handleToolCalls($request, $depth) as $item) {
yield $item;
}

return;
}

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield $this->emitStreamEndEvent();
}

Expand All @@ -113,8 +125,9 @@ protected function processEvent(array $event): StreamEvent|Generator|null

/**
* @param array<string, mixed> $event
* @return Generator<StreamEvent>
*/
protected function handleMessageStart(array $event): ?StreamStartEvent
protected function handleMessageStart(array $event): Generator
{
$message = $event['message'] ?? [];
$this->state->withMessageId($message['id'] ?? EventID::generate());
Expand All @@ -130,18 +143,25 @@ protected function handleMessageStart(array $event): ?StreamStartEvent
}

// Only emit StreamStartEvent once per streaming session
if (! $this->state->shouldEmitStreamStart()) {
return null;
if ($this->state->shouldEmitStreamStart()) {
$this->state->markStreamStarted();

yield new StreamStartEvent(
id: EventID::generate(),
timestamp: time(),
model: $message['model'] ?? 'unknown',
provider: 'anthropic'
);
}

$this->state->markStreamStarted();
if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

return new StreamStartEvent(
id: EventID::generate(),
timestamp: time(),
model: $message['model'] ?? 'unknown',
provider: 'anthropic'
);
yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}
}

/**
Expand Down Expand Up @@ -526,6 +546,13 @@ protected function handleToolCalls(Request $request, int $depth): Generator

$request->addMessage(new ToolResultMessage($toolResults));

// Emit step finish after tool calls
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

// Continue streaming if within step limit
$depth++;
if ($depth < $request->maxSteps()) {
Expand Down
23 changes: 23 additions & 0 deletions src/Providers/DeepSeek/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use Prism\Prism\Providers\DeepSeek\Maps\ToolChoiceMap;
use Prism\Prism\Providers\DeepSeek\Maps\ToolMap;
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -95,6 +97,15 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

if ($this->hasToolCalls($data)) {
$toolCalls = $this->extractToolCalls($data, $toolCalls);

Expand Down Expand Up @@ -213,6 +224,12 @@ protected function processStream(Response $response, Request $request, int $dept
return;
}

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
Expand Down Expand Up @@ -369,6 +386,12 @@ protected function handleToolCalls(Request $request, string $text, array $toolCa
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

$this->state->resetTextState();
$this->state->withMessageId(EventID::generate());

Expand Down
26 changes: 26 additions & 0 deletions src/Providers/Gemini/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use Prism\Prism\Providers\Gemini\Maps\ToolChoiceMap;
use Prism\Prism\Providers\Gemini\Maps\ToolMap;
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -98,6 +100,16 @@ protected function processStream(Response $response, Request $request, int $dept
$this->state->markStreamStarted();
}

// Emit step start event once per step
if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

// Update usage data from each chunk
$this->state->withUsage($this->extractUsage($data, $request));

Expand Down Expand Up @@ -217,6 +229,13 @@ protected function processStream(Response $response, Request $request, int $dept
return;
}

// Emit step finish before stream end
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
Expand Down Expand Up @@ -338,6 +357,13 @@ protected function handleToolCalls(
$request->addMessage(new AssistantMessage($this->state->currentText(), $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

// Emit step finish after tool calls
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

$depth++;
if ($depth < $request->maxSteps()) {
$previousUsage = $this->state->usage();
Expand Down
26 changes: 26 additions & 0 deletions src/Providers/Groq/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
use Prism\Prism\Providers\Groq\Maps\ToolMap;
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\ErrorEvent;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -95,6 +97,16 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

// Emit step start event once per step
if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

if ($this->hasError($data)) {
yield from $this->handleErrors($data, $request);

Expand Down Expand Up @@ -167,6 +179,13 @@ protected function processStream(Response $response, Request $request, int $dept
}
}

// Emit step finish before stream end
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
Expand Down Expand Up @@ -263,6 +282,13 @@ protected function handleToolCalls(
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

// Emit step finish after tool calls
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

// Reset text state for next response
$this->state->resetTextState();
$this->state->withMessageId(EventID::generate());
Expand Down
23 changes: 23 additions & 0 deletions src/Providers/Mistral/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
use Prism\Prism\Providers\Mistral\Maps\ToolChoiceMap;
use Prism\Prism\Providers\Mistral\Maps\ToolMap;
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -94,6 +96,15 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

if ($this->hasToolCalls($data)) {
$toolCalls = $this->extractToolCalls($data, $toolCalls);

Expand Down Expand Up @@ -172,6 +183,12 @@ protected function processStream(Response $response, Request $request, int $dept
}
}

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

yield new StreamEndEvent(
id: EventID::generate(),
timestamp: time(),
Expand Down Expand Up @@ -259,6 +276,12 @@ protected function handleToolCalls(
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

$this->state->resetTextState();
$this->state->withMessageId(EventID::generate());

Expand Down
26 changes: 26 additions & 0 deletions src/Providers/Ollama/Handlers/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use Prism\Prism\Providers\Ollama\Maps\ToolMap;
use Prism\Prism\Providers\Ollama\ValueObjects\OllamaStreamState;
use Prism\Prism\Streaming\EventID;
use Prism\Prism\Streaming\Events\StepFinishEvent;
use Prism\Prism\Streaming\Events\StepStartEvent;
use Prism\Prism\Streaming\Events\StreamEndEvent;
use Prism\Prism\Streaming\Events\StreamEvent;
use Prism\Prism\Streaming\Events\StreamStartEvent;
Expand Down Expand Up @@ -90,6 +92,16 @@ protected function processStream(Response $response, Request $request, int $dept
$this->state->markStreamStarted()->withMessageId(EventID::generate());
}

// Emit step start event once per step
if ($this->state->shouldEmitStepStart()) {
$this->state->markStepStarted();

yield new StepStartEvent(
id: EventID::generate(),
timestamp: time()
);
}

// Accumulate token counts
$this->state->addPromptTokens((int) data_get($data, 'prompt_eval_count', 0));
$this->state->addCompletionTokens((int) data_get($data, 'eval_count', 0));
Expand Down Expand Up @@ -185,6 +197,13 @@ protected function processStream(Response $response, Request $request, int $dept
);
}

// Emit step finish before stream end
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

// Emit stream end event with usage
yield new StreamEndEvent(
id: EventID::generate(),
Expand Down Expand Up @@ -280,6 +299,13 @@ protected function handleToolCalls(
$request->addMessage(new AssistantMessage($text, $mappedToolCalls));
$request->addMessage(new ToolResultMessage($toolResults));

// Emit step finish after tool calls
$this->state->markStepFinished();
yield new StepFinishEvent(
id: EventID::generate(),
timestamp: time()
);

// Continue streaming if within step limit
$depth++;
if ($depth < $request->maxSteps()) {
Expand Down
Loading