diff --git a/src/platform/src/Bridge/Anthropic/ResultConverter.php b/src/platform/src/Bridge/Anthropic/ResultConverter.php index 415d7a54c..e4298d060 100644 --- a/src/platform/src/Bridge/Anthropic/ResultConverter.php +++ b/src/platform/src/Bridge/Anthropic/ResultConverter.php @@ -22,9 +22,6 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Christopher Hertel @@ -47,7 +44,7 @@ public function convert(RawHttpResult|RawResultInterface $result, array $options } if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($response)); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -74,15 +71,9 @@ public function convert(RawHttpResult|RawResultInterface $result, array $options return new TextResult($data['content'][0]['text']); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - $data = $chunk->getArrayData(); - + foreach ($result->getDataStream() as $data) { if ('content_block_delta' != $data['type'] || !isset($data['delta']['text'])) { continue; } diff --git a/src/platform/src/Bridge/Bedrock/RawBedrockResult.php b/src/platform/src/Bridge/Bedrock/RawBedrockResult.php index 7e3fad831..c87243fcf 100644 --- a/src/platform/src/Bridge/Bedrock/RawBedrockResult.php +++ b/src/platform/src/Bridge/Bedrock/RawBedrockResult.php @@ -12,6 +12,7 @@ namespace Symfony\AI\Platform\Bridge\Bedrock; use AsyncAws\BedrockRuntime\Result\InvokeModelResponse; +use Symfony\AI\Platform\Exception\RuntimeException; use Symfony\AI\Platform\Result\RawResultInterface; /** @@ -29,6 +30,11 @@ public function getData(): array return json_decode($this->invokeModelResponse->getBody(), true, 512, \JSON_THROW_ON_ERROR); } + public function getDataStream(): iterable + { + throw new RuntimeException('Streaming is not implemented yet.'); + } + public function getObject(): InvokeModelResponse { return $this->invokeModelResponse; diff --git a/src/platform/src/Bridge/Cerebras/ResultConverter.php b/src/platform/src/Bridge/Cerebras/ResultConverter.php index b11f7576b..bfaa831e4 100644 --- a/src/platform/src/Bridge/Cerebras/ResultConverter.php +++ b/src/platform/src/Bridge/Cerebras/ResultConverter.php @@ -13,16 +13,11 @@ use Symfony\AI\Platform\Exception\RuntimeException; use Symfony\AI\Platform\Model as BaseModel; -use Symfony\AI\Platform\Result\RawHttpResult; use Symfony\AI\Platform\Result\RawResultInterface; use Symfony\AI\Platform\Result\ResultInterface; use Symfony\AI\Platform\Result\StreamResult; use Symfony\AI\Platform\Result\TextResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Component\HttpClient\Exception\JsonException; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Junaid Farooq @@ -34,10 +29,10 @@ public function supports(BaseModel $model): bool return $model instanceof Model; } - public function convert(RawHttpResult|RawResultInterface $result, array $options = []): ResultInterface + public function convert(RawResultInterface $result, array $options = []): ResultInterface { if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($result->getObject())); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -53,19 +48,9 @@ public function convert(RawHttpResult|RawResultInterface $result, array $options return new TextResult($data['choices'][0]['message']['content']); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - try { - $data = $chunk->getArrayData(); - } catch (JsonException) { - continue; - } - + foreach ($result->getDataStream() as $data) { if (!isset($data['choices'][0]['delta']['content'])) { continue; } diff --git a/src/platform/src/Bridge/DeepSeek/ResultConverter.php b/src/platform/src/Bridge/DeepSeek/ResultConverter.php index b7499344b..5c477694d 100644 --- a/src/platform/src/Bridge/DeepSeek/ResultConverter.php +++ b/src/platform/src/Bridge/DeepSeek/ResultConverter.php @@ -16,7 +16,6 @@ use Symfony\AI\Platform\Exception\RuntimeException; use Symfony\AI\Platform\Model; use Symfony\AI\Platform\Result\ChoiceResult; -use Symfony\AI\Platform\Result\RawHttpResult; use Symfony\AI\Platform\Result\RawResultInterface; use Symfony\AI\Platform\Result\ResultInterface; use Symfony\AI\Platform\Result\StreamResult; @@ -24,9 +23,6 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Oskar Stark @@ -38,10 +34,10 @@ public function supports(Model $model): bool return $model instanceof DeepSeek; } - public function convert(RawResultInterface|RawHttpResult $result, array $options = []): ResultInterface + public function convert(RawResultInterface $result, array $options = []): ResultInterface { if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($result->getObject())); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -63,16 +59,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { $toolCalls = []; - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - $data = $chunk->getArrayData(); - + foreach ($result->getDataStream() as $data) { if ($this->streamIsToolCall($data)) { $toolCalls = $this->convertStreamToToolCalls($toolCalls, $data); } diff --git a/src/platform/src/Bridge/DockerModelRunner/Completions/ResultConverter.php b/src/platform/src/Bridge/DockerModelRunner/Completions/ResultConverter.php index 98c841e01..fc0dcc48b 100644 --- a/src/platform/src/Bridge/DockerModelRunner/Completions/ResultConverter.php +++ b/src/platform/src/Bridge/DockerModelRunner/Completions/ResultConverter.php @@ -26,9 +26,6 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Mathieu Santostefano @@ -43,7 +40,7 @@ public function supports(Model $model): bool public function convert(RawResultInterface|RawHttpResult $result, array $options = []): ResultInterface { if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($result->getObject())); + return new StreamResult($this->convertStream($result)); } if (404 === $result->getObject()->getStatusCode() @@ -70,16 +67,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { $toolCalls = []; - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - $data = $chunk->getArrayData(); - + foreach ($result->getDataStream() as $data) { if ($this->streamIsToolCall($data)) { $toolCalls = $this->convertStreamToToolCalls($toolCalls, $data); } diff --git a/src/platform/src/Bridge/ElevenLabs/ElevenLabsResultConverter.php b/src/platform/src/Bridge/ElevenLabs/ElevenLabsResultConverter.php index 7dbe8a604..d5e60d931 100644 --- a/src/platform/src/Bridge/ElevenLabs/ElevenLabsResultConverter.php +++ b/src/platform/src/Bridge/ElevenLabs/ElevenLabsResultConverter.php @@ -14,6 +14,7 @@ use Symfony\AI\Platform\Exception\RuntimeException; use Symfony\AI\Platform\Model; use Symfony\AI\Platform\Result\BinaryResult; +use Symfony\AI\Platform\Result\RawHttpResult; use Symfony\AI\Platform\Result\RawResultInterface; use Symfony\AI\Platform\Result\ResultInterface; use Symfony\AI\Platform\Result\StreamResult; @@ -37,9 +38,8 @@ public function supports(Model $model): bool return $model instanceof ElevenLabs; } - public function convert(RawResultInterface $result, array $options = []): ResultInterface + public function convert(RawHttpResult|RawResultInterface $result, array $options = []): ResultInterface { - /** @var ResponseInterface $response */ $response = $result->getObject(); return match (true) { diff --git a/src/platform/src/Bridge/Gemini/Gemini/ResultConverter.php b/src/platform/src/Bridge/Gemini/Gemini/ResultConverter.php index 755cb2590..4b81624d5 100644 --- a/src/platform/src/Bridge/Gemini/Gemini/ResultConverter.php +++ b/src/platform/src/Bridge/Gemini/Gemini/ResultConverter.php @@ -25,8 +25,6 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Roy Garrido @@ -51,7 +49,7 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options } if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($response)); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -69,50 +67,21 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if ($chunk->isFirst() || $chunk->isLast()) { + foreach ($result->getDataStream() as $data) { + $choices = array_map($this->convertChoice(...), $data['candidates'] ?? []); + + if (!$choices) { continue; } - $jsonDelta = trim($chunk->getContent()); - - // Remove leading/trailing brackets - if (str_starts_with($jsonDelta, '[') || str_starts_with($jsonDelta, ',')) { - $jsonDelta = substr($jsonDelta, 1); - } - if (str_ends_with($jsonDelta, ']')) { - $jsonDelta = substr($jsonDelta, 0, -1); + if (1 !== \count($choices)) { + yield new ChoiceResult(...$choices); + continue; } - // Split in case of multiple JSON objects - $deltas = explode(",\r\n", $jsonDelta); - - foreach ($deltas as $delta) { - if ('' === $delta) { - continue; - } - - try { - $data = json_decode($delta, true, 512, \JSON_THROW_ON_ERROR); - } catch (\JsonException $e) { - throw new RuntimeException('Failed to decode JSON response.', previous: $e); - } - - $choices = array_map($this->convertChoice(...), $data['candidates'] ?? []); - - if (!$choices) { - continue; - } - - if (1 !== \count($choices)) { - yield new ChoiceResult(...$choices); - continue; - } - - yield $choices[0]->getContent(); - } + yield $choices[0]->getContent(); } } diff --git a/src/platform/src/Bridge/Mistral/Llm/ResultConverter.php b/src/platform/src/Bridge/Mistral/Llm/ResultConverter.php index 0374b6b2d..881745a05 100644 --- a/src/platform/src/Bridge/Mistral/Llm/ResultConverter.php +++ b/src/platform/src/Bridge/Mistral/Llm/ResultConverter.php @@ -23,9 +23,6 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Christopher Hertel @@ -45,7 +42,7 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options $httpResponse = $result->getObject(); if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($httpResponse)); + return new StreamResult($this->convertStream($result)); } if (200 !== $code = $httpResponse->getStatusCode()) { @@ -63,16 +60,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { $toolCalls = []; - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - $data = $chunk->getArrayData(); - + foreach ($result->getDataStream() as $data) { if ($this->streamIsToolCall($data)) { $toolCalls = $this->convertStreamToToolCalls($toolCalls, $data); } diff --git a/src/platform/src/Bridge/Ollama/OllamaResultConverter.php b/src/platform/src/Bridge/Ollama/OllamaResultConverter.php index 9870f7f4c..bc41e768d 100644 --- a/src/platform/src/Bridge/Ollama/OllamaResultConverter.php +++ b/src/platform/src/Bridge/Ollama/OllamaResultConverter.php @@ -22,10 +22,6 @@ use Symfony\AI\Platform\Result\VectorResult; use Symfony\AI\Platform\ResultConverterInterface; use Symfony\AI\Platform\Vector\Vector; -use Symfony\Component\HttpClient\Chunk\FirstChunk; -use Symfony\Component\HttpClient\Chunk\LastChunk; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface; /** * @author Christopher Hertel @@ -40,7 +36,7 @@ public function supports(Model $model): bool public function convert(RawResultInterface $result, array $options = []): ResultInterface { if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($result->getObject())); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -93,20 +89,10 @@ public function doConvertEmbeddings(array $data): ResultInterface ); } - private function convertStream(ResponseInterface $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { $toolCalls = []; - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if ($chunk instanceof FirstChunk || $chunk instanceof LastChunk) { - continue; - } - - try { - $data = json_decode($chunk->getContent(), true, 512, \JSON_THROW_ON_ERROR); - } catch (\JsonException $e) { - throw new RuntimeException('Failed to decode JSON: '.$e->getMessage()); - } - + foreach ($result->getDataStream() as $data) { if ($this->streamIsToolCall($data)) { $toolCalls = $this->convertStreamToToolCalls($toolCalls, $data); } diff --git a/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php b/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php index 02c6b611e..d9ceb0038 100644 --- a/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php +++ b/src/platform/src/Bridge/OpenAi/Gpt/ResultConverter.php @@ -27,9 +27,6 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Christopher Hertel @@ -66,7 +63,7 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options } if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($response)); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -88,16 +85,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface|RawHttpResult $result): \Generator { $toolCalls = []; - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - $data = $chunk->getArrayData(); - + foreach ($result->getDataStream() as $data) { if ($this->streamIsToolCall($data)) { $toolCalls = $this->convertStreamToToolCalls($toolCalls, $data); } diff --git a/src/platform/src/Bridge/Perplexity/ResultConverter.php b/src/platform/src/Bridge/Perplexity/ResultConverter.php index 3d0243fe9..147886d7f 100644 --- a/src/platform/src/Bridge/Perplexity/ResultConverter.php +++ b/src/platform/src/Bridge/Perplexity/ResultConverter.php @@ -15,15 +15,11 @@ use Symfony\AI\Platform\Metadata\Metadata; use Symfony\AI\Platform\Model; use Symfony\AI\Platform\Result\ChoiceResult; -use Symfony\AI\Platform\Result\RawHttpResult; use Symfony\AI\Platform\Result\RawResultInterface; use Symfony\AI\Platform\Result\ResultInterface; use Symfony\AI\Platform\Result\StreamResult; use Symfony\AI\Platform\Result\TextResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Mathieu Santostefano @@ -35,10 +31,10 @@ public function supports(Model $model): bool return $model instanceof Perplexity; } - public function convert(RawResultInterface|RawHttpResult $result, array $options = []): ResultInterface + public function convert(RawResultInterface $result, array $options = []): ResultInterface { if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($result->getObject())); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -54,19 +50,13 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options return $result; } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { $searchResults = $citations = []; /** @var Metadata $metadata */ $metadata = yield; - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - $data = $chunk->getArrayData(); - + foreach ($result->getDataStream() as $data) { if (isset($data['choices'][0]['delta']['content'])) { yield $data['choices'][0]['delta']['content']; } diff --git a/src/platform/src/Bridge/Scaleway/Llm/ResultConverter.php b/src/platform/src/Bridge/Scaleway/Llm/ResultConverter.php index f2e8fb8a3..165a75cff 100644 --- a/src/platform/src/Bridge/Scaleway/Llm/ResultConverter.php +++ b/src/platform/src/Bridge/Scaleway/Llm/ResultConverter.php @@ -16,7 +16,6 @@ use Symfony\AI\Platform\Exception\RuntimeException; use Symfony\AI\Platform\Model; use Symfony\AI\Platform\Result\ChoiceResult; -use Symfony\AI\Platform\Result\RawHttpResult; use Symfony\AI\Platform\Result\RawResultInterface; use Symfony\AI\Platform\Result\ResultInterface; use Symfony\AI\Platform\Result\StreamResult; @@ -24,9 +23,6 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\Chunk\ServerSentEvent; -use Symfony\Component\HttpClient\EventSourceHttpClient; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Marcus Stöhr @@ -38,10 +34,10 @@ public function supports(Model $model): bool return $model instanceof Scaleway; } - public function convert(RawResultInterface|RawHttpResult $result, array $options = []): ResultInterface + public function convert(RawResultInterface $result, array $options = []): ResultInterface { if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($result->getObject())); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -58,16 +54,10 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options return 1 === \count($choices) ? $choices[0] : new ChoiceResult(...$choices); } - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { $toolCalls = []; - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if (!$chunk instanceof ServerSentEvent || '[DONE]' === $chunk->getData()) { - continue; - } - - $data = $chunk->getArrayData(); - + foreach ($result->getDataStream() as $data) { if ($this->streamIsToolCall($data)) { $toolCalls = $this->convertStreamToToolCalls($toolCalls, $data); } diff --git a/src/platform/src/Bridge/TransformersPhp/RawPipelineResult.php b/src/platform/src/Bridge/TransformersPhp/RawPipelineResult.php index 900e73887..d785b13ae 100644 --- a/src/platform/src/Bridge/TransformersPhp/RawPipelineResult.php +++ b/src/platform/src/Bridge/TransformersPhp/RawPipelineResult.php @@ -11,6 +11,7 @@ namespace Symfony\AI\Platform\Bridge\TransformersPhp; +use Symfony\AI\Platform\Exception\RuntimeException; use Symfony\AI\Platform\Result\RawResultInterface; /** @@ -28,6 +29,11 @@ public function getData(): array return $this->pipelineExecution->getResult(); } + public function getDataStream(): iterable + { + throw new RuntimeException('Streaming is not implemented yet.'); + } + public function getObject(): PipelineExecution { return $this->pipelineExecution; diff --git a/src/platform/src/Bridge/VertexAi/Gemini/ResultConverter.php b/src/platform/src/Bridge/VertexAi/Gemini/ResultConverter.php index 44c81b383..a02fa5836 100644 --- a/src/platform/src/Bridge/VertexAi/Gemini/ResultConverter.php +++ b/src/platform/src/Bridge/VertexAi/Gemini/ResultConverter.php @@ -24,9 +24,7 @@ use Symfony\AI\Platform\Result\ToolCall; use Symfony\AI\Platform\Result\ToolCallResult; use Symfony\AI\Platform\ResultConverterInterface; -use Symfony\Component\HttpClient\EventSourceHttpClient; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; -use Symfony\Contracts\HttpClient\ResponseInterface as HttpResponse; /** * @author Junaid Farooq @@ -51,7 +49,7 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options } if ($options['stream'] ?? false) { - return new StreamResult($this->convertStream($response)); + return new StreamResult($this->convertStream($result)); } $data = $result->getData(); @@ -72,49 +70,21 @@ public function convert(RawResultInterface|RawHttpResult $result, array $options /** * @throws TransportExceptionInterface */ - private function convertStream(HttpResponse $result): \Generator + private function convertStream(RawResultInterface $result): \Generator { - foreach ((new EventSourceHttpClient())->stream($result) as $chunk) { - if ($chunk->isFirst() || $chunk->isLast()) { - continue; - } - - $jsonDelta = trim($chunk->getContent()); + foreach ($result->getDataStream() as $data) { + $choices = array_map($this->convertChoice(...), $data['candidates'] ?? []); - if (str_starts_with($jsonDelta, '[') || str_starts_with($jsonDelta, ',')) { - $jsonDelta = substr($jsonDelta, 1); + if (!$choices) { + continue; } - if (str_ends_with($jsonDelta, ']')) { - $jsonDelta = substr($jsonDelta, 0, -1); + if (1 !== \count($choices)) { + yield new ChoiceResult(...$choices); + continue; } - $deltas = explode(",\r\n", $jsonDelta); - - foreach ($deltas as $delta) { - if ('' === $delta) { - continue; - } - - try { - $data = json_decode($delta, true, 512, \JSON_THROW_ON_ERROR); - } catch (\JsonException $e) { - throw new RuntimeException('Failed to decode JSON response.', previous: $e); - } - - $choices = array_map($this->convertChoice(...), $data['candidates'] ?? []); - - if (!$choices) { - continue; - } - - if (1 !== \count($choices)) { - yield new ChoiceResult(...$choices); - continue; - } - - yield $choices[0]->getContent(); - } + yield $choices[0]->getContent(); } } diff --git a/src/platform/src/Result/InMemoryRawResult.php b/src/platform/src/Result/InMemoryRawResult.php index 82bc3b500..a597757e6 100644 --- a/src/platform/src/Result/InMemoryRawResult.php +++ b/src/platform/src/Result/InMemoryRawResult.php @@ -19,10 +19,12 @@ final readonly class InMemoryRawResult implements RawResultInterface { /** - * @param array $data + * @param array $data + * @param iterable> $dataStream */ public function __construct( private array $data = [], + private iterable $dataStream = [], private object $object = new \stdClass(), ) { } @@ -32,6 +34,11 @@ public function getData(): array return $this->data; } + public function getDataStream(): iterable + { + yield from $this->dataStream; + } + public function getObject(): object { return $this->object; diff --git a/src/platform/src/Result/RawHttpResult.php b/src/platform/src/Result/RawHttpResult.php index 15509cac2..8459862fd 100644 --- a/src/platform/src/Result/RawHttpResult.php +++ b/src/platform/src/Result/RawHttpResult.php @@ -11,6 +11,8 @@ namespace Symfony\AI\Platform\Result; +use Symfony\Component\HttpClient\Chunk\ServerSentEvent; +use Symfony\Component\HttpClient\EventSourceHttpClient; use Symfony\Contracts\HttpClient\ResponseInterface; /** @@ -28,6 +30,36 @@ public function getData(): array return $this->response->toArray(false); } + public function getDataStream(): iterable + { + foreach ((new EventSourceHttpClient())->stream($this->response) as $chunk) { + if ($chunk->isFirst() || $chunk->isLast() || ($chunk instanceof ServerSentEvent && '[DONE]' === $chunk->getData())) { + continue; + } + + $jsonDelta = $chunk instanceof ServerSentEvent ? $chunk->getData() : $chunk->getContent(); + + // Remove leading/trailing brackets + if (str_starts_with($jsonDelta, '[') || str_starts_with($jsonDelta, ',')) { + $jsonDelta = substr($jsonDelta, 1); + } + if (str_ends_with($jsonDelta, ']')) { + $jsonDelta = substr($jsonDelta, 0, -1); + } + + // Split in case of multiple JSON objects + $deltas = explode(",\r\n", $jsonDelta); + + foreach ($deltas as $delta) { + if ('' === trim($delta)) { + continue; + } + + yield json_decode($delta, true, flags: \JSON_THROW_ON_ERROR); + } + } + } + public function getObject(): ResponseInterface { return $this->response; diff --git a/src/platform/src/Result/RawResultInterface.php b/src/platform/src/Result/RawResultInterface.php index 6c000ef0f..89dfeddd7 100644 --- a/src/platform/src/Result/RawResultInterface.php +++ b/src/platform/src/Result/RawResultInterface.php @@ -25,5 +25,10 @@ interface RawResultInterface */ public function getData(): array; + /** + * @return iterable> + */ + public function getDataStream(): iterable; + public function getObject(): object; } diff --git a/src/platform/src/Test/InMemoryPlatform.php b/src/platform/src/Test/InMemoryPlatform.php index d107a8a2e..1586c0f20 100644 --- a/src/platform/src/Test/InMemoryPlatform.php +++ b/src/platform/src/Test/InMemoryPlatform.php @@ -51,10 +51,10 @@ public function __construct(string $name) $result = \is_string($this->mockResult) ? $this->mockResult : ($this->mockResult)($model, $input, $options); if ($result instanceof ResultInterface) { - return $this->createPromise($result, $options); + return $this->createDeferredResult($result, $options); } - return $this->createPromise(new TextResult($result), $options); + return $this->createDeferredResult(new TextResult($result), $options); } public function getModelCatalog(): ModelCatalogInterface @@ -68,10 +68,11 @@ public function getModelCatalog(): ModelCatalogInterface * @param ResultInterface $result The result to wrap in a promise * @param array $options Additional options for the promise */ - private function createPromise(ResultInterface $result, array $options): DeferredResult + private function createDeferredResult(ResultInterface $result, array $options): DeferredResult { $rawResult = $result->getRawResult() ?? new InMemoryRawResult( ['text' => $result->getContent()], + [], (object) ['text' => $result->getContent()], ); diff --git a/src/platform/tests/Bridge/ElevenLabs/ElevenLabsConverterTest.php b/src/platform/tests/Bridge/ElevenLabs/ElevenLabsConverterTest.php index 2339335aa..4821bed16 100644 --- a/src/platform/tests/Bridge/ElevenLabs/ElevenLabsConverterTest.php +++ b/src/platform/tests/Bridge/ElevenLabs/ElevenLabsConverterTest.php @@ -35,7 +35,7 @@ public function testConvertSpeechToTextResponse() $converter = new ElevenLabsResultConverter(new MockHttpClient()); $rawResult = new InMemoryRawResult([ 'text' => 'Hello there', - ], new class { + ], [], new class { public function getInfo(): string { return 'speech-to-text'; @@ -51,7 +51,7 @@ public function getInfo(): string public function testConvertTextToSpeechResponse() { $converter = new ElevenLabsResultConverter(new MockHttpClient()); - $rawResult = new InMemoryRawResult([], new class { + $rawResult = new InMemoryRawResult([], [], new class { public function getInfo(): string { return 'text-to-speech'; diff --git a/src/platform/tests/Result/BaseResultTest.php b/src/platform/tests/Result/BaseResultTest.php index f30d1c090..b8911d554 100644 --- a/src/platform/tests/Result/BaseResultTest.php +++ b/src/platform/tests/Result/BaseResultTest.php @@ -64,6 +64,11 @@ public function getData(): array return ['key' => 'value']; } + public function getDataStream(): iterable + { + return $this->getData(); + } + public function getObject(): object { return new \stdClass();