Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
837 changes: 837 additions & 0 deletions Packages/OsaurusCore/Models/OpenResponsesAPI.swift

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,21 @@ public enum RemoteProviderAuthType: String, Codable, Sendable, CaseIterable {
public enum RemoteProviderType: String, Codable, Sendable, CaseIterable {
case openai = "openai" // OpenAI-compatible API (default)
case anthropic = "anthropic" // Anthropic Messages API
case openResponses = "openResponses" // Open Responses API

public var displayName: String {
switch self {
case .openai: return "OpenAI Compatible"
case .anthropic: return "Anthropic"
case .openResponses: return "Open Responses"
}
}

public var chatEndpoint: String {
switch self {
case .openai: return "/chat/completions"
case .anthropic: return "/messages"
case .openResponses: return "/responses"
}
}

Expand Down Expand Up @@ -228,7 +231,7 @@ public struct RemoteProvider: Codable, Identifiable, Sendable, Equatable {
if headers["anthropic-version"] == nil {
headers["anthropic-version"] = "2023-06-01"
}
case .openai:
case .openai, .openResponses:
headers["Authorization"] = "Bearer \(apiKey)"
}
}
Expand Down
297 changes: 297 additions & 0 deletions Packages/OsaurusCore/Models/ResponseWriters.swift
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,300 @@ final class AnthropicSSEResponseWriter {
}
}
}

// MARK: - Open Responses SSE Response Writer

/// SSE Response Writer for Open Responses API format
/// Emits semantic events: response.created, response.output_item.added, response.output_text.delta, etc.
final class OpenResponsesSSEWriter {
private var responseId: String = ""
private var model: String = ""
private var inputTokens: Int = 0
private var outputTokens: Int = 0
private var sequenceNumber: Int = 0
private var currentItemId: String = ""
private var currentOutputIndex: Int = 0
private var accumulatedText: String = ""

func writeHeaders(_ context: ChannelHandlerContext, extraHeaders: [(String, String)]? = nil) {
var head = HTTPResponseHead(version: .http1_1, status: .ok)
var headers = HTTPHeaders()
headers.add(name: "Content-Type", value: "text/event-stream")
headers.add(name: "Cache-Control", value: "no-cache, no-transform")
headers.add(name: "Connection", value: "keep-alive")
headers.add(name: "X-Accel-Buffering", value: "no")
headers.add(name: "Transfer-Encoding", value: "chunked")
if let extraHeaders {
for (n, v) in extraHeaders { headers.add(name: n, value: v) }
}
head.headers = headers
context.write(NIOAny(HTTPServerResponsePart.head(head)), promise: nil)
context.flush()
}

/// Generate the next sequence number
private func nextSequenceNumber() -> Int {
sequenceNumber += 1
return sequenceNumber
}

/// Write response.created event to start the response
func writeResponseCreated(
responseId: String,
model: String,
inputTokens: Int,
context: ChannelHandlerContext
) {
self.responseId = responseId
self.model = model
self.inputTokens = inputTokens
self.outputTokens = 0
self.sequenceNumber = 0
self.currentOutputIndex = 0
self.accumulatedText = ""

let response = OpenResponsesResponse(
id: responseId,
createdAt: Int(Date().timeIntervalSince1970),
status: .inProgress,
model: model,
output: [],
usage: nil
)
let event = ResponseCreatedEvent(sequenceNumber: nextSequenceNumber(), response: response)
writeSSEEvent("response.created", payload: event, context: context)
}

/// Write response.in_progress event
func writeResponseInProgress(context: ChannelHandlerContext) {
let response = OpenResponsesResponse(
id: responseId,
createdAt: Int(Date().timeIntervalSince1970),
status: .inProgress,
model: model,
output: [],
usage: nil
)
let event = ResponseInProgressEvent(sequenceNumber: nextSequenceNumber(), response: response)
writeSSEEvent("response.in_progress", payload: event, context: context)
}

/// Write response.output_item.added event for a new message item
func writeMessageItemAdded(itemId: String, context: ChannelHandlerContext) {
self.currentItemId = itemId

let messageItem = OpenResponsesOutputMessage(
id: itemId,
status: .inProgress,
content: []
)
let event = OutputItemAddedEvent(
sequenceNumber: nextSequenceNumber(),
outputIndex: currentOutputIndex,
item: .message(messageItem)
)
writeSSEEvent("response.output_item.added", payload: event, context: context)
}

/// Write response.content_part.added event
func writeContentPartAdded(context: ChannelHandlerContext) {
let part = OpenResponsesOutputContent.outputText(OpenResponsesOutputText(text: ""))
let event = ContentPartAddedEvent(
sequenceNumber: nextSequenceNumber(),
itemId: currentItemId,
outputIndex: currentOutputIndex,
contentIndex: 0,
part: part
)
writeSSEEvent("response.content_part.added", payload: event, context: context)
}

/// Write response.output_text.delta event
@inline(__always)
func writeTextDelta(_ text: String, context: ChannelHandlerContext) {
guard !text.isEmpty else { return }

accumulatedText += text
outputTokens += max(1, text.count / 4)

let event = OutputTextDeltaEvent(
sequenceNumber: nextSequenceNumber(),
itemId: currentItemId,
outputIndex: currentOutputIndex,
contentIndex: 0,
delta: text
)
writeSSEEvent("response.output_text.delta", payload: event, context: context)
}

/// Write response.output_text.done event
func writeTextDone(context: ChannelHandlerContext) {
let event = OutputTextDoneEvent(
sequenceNumber: nextSequenceNumber(),
itemId: currentItemId,
outputIndex: currentOutputIndex,
contentIndex: 0,
text: accumulatedText
)
writeSSEEvent("response.output_text.done", payload: event, context: context)
}

/// Write response.output_item.done event for a completed message
func writeMessageItemDone(context: ChannelHandlerContext) {
let messageItem = OpenResponsesOutputMessage(
id: currentItemId,
status: .completed,
content: [.outputText(OpenResponsesOutputText(text: accumulatedText))]
)
let event = OutputItemDoneEvent(
sequenceNumber: nextSequenceNumber(),
outputIndex: currentOutputIndex,
item: .message(messageItem)
)
writeSSEEvent("response.output_item.done", payload: event, context: context)
currentOutputIndex += 1
}

/// Write response.output_item.added event for a function call
func writeFunctionCallItemAdded(
itemId: String,
callId: String,
name: String,
context: ChannelHandlerContext
) {
self.currentItemId = itemId

let functionCall = OpenResponsesFunctionCall(
id: itemId,
status: .inProgress,
callId: callId,
name: name,
arguments: ""
)
let event = OutputItemAddedEvent(
sequenceNumber: nextSequenceNumber(),
outputIndex: currentOutputIndex,
item: .functionCall(functionCall)
)
writeSSEEvent("response.output_item.added", payload: event, context: context)
}

/// Write response.function_call_arguments.delta event
@inline(__always)
func writeFunctionCallArgumentsDelta(
callId: String,
delta: String,
context: ChannelHandlerContext
) {
guard !delta.isEmpty else { return }

accumulatedText += delta

let event = FunctionCallArgumentsDeltaEvent(
sequenceNumber: nextSequenceNumber(),
itemId: currentItemId,
outputIndex: currentOutputIndex,
callId: callId,
delta: delta
)
writeSSEEvent("response.function_call_arguments.delta", payload: event, context: context)
}

/// Write response.function_call_arguments.done event
func writeFunctionCallArgumentsDone(
callId: String,
context: ChannelHandlerContext
) {
let event = FunctionCallArgumentsDoneEvent(
sequenceNumber: nextSequenceNumber(),
itemId: currentItemId,
outputIndex: currentOutputIndex,
callId: callId,
arguments: accumulatedText
)
writeSSEEvent("response.function_call_arguments.done", payload: event, context: context)
}

/// Write response.output_item.done event for a function call
func writeFunctionCallItemDone(
callId: String,
name: String,
context: ChannelHandlerContext
) {
let functionCall = OpenResponsesFunctionCall(
id: currentItemId,
status: .completed,
callId: callId,
name: name,
arguments: accumulatedText
)
let event = OutputItemDoneEvent(
sequenceNumber: nextSequenceNumber(),
outputIndex: currentOutputIndex,
item: .functionCall(functionCall)
)
writeSSEEvent("response.output_item.done", payload: event, context: context)
currentOutputIndex += 1
}

/// Write response.completed event
func writeResponseCompleted(context: ChannelHandlerContext) {
let response = OpenResponsesResponse(
id: responseId,
createdAt: Int(Date().timeIntervalSince1970),
status: .completed,
model: model,
output: [],
usage: OpenResponsesUsage(inputTokens: inputTokens, outputTokens: outputTokens)
)
let event = ResponseCompletedEvent(sequenceNumber: nextSequenceNumber(), response: response)
writeSSEEvent("response.completed", payload: event, context: context)
}

/// Write error event
func writeError(_ message: String, context: ChannelHandlerContext) {
let response = OpenResponsesResponse(
id: responseId,
createdAt: Int(Date().timeIntervalSince1970),
status: .failed,
model: model,
output: [],
usage: nil
)
let error = OpenResponsesError(code: "internal_error", message: message)
let event = ResponseFailedEvent(sequenceNumber: nextSequenceNumber(), response: response, error: error)
writeSSEEvent("response.failed", payload: event, context: context)
}

/// End the stream with [DONE] marker and close connection
func writeEnd(_ context: ChannelHandlerContext) {
var tail = context.channel.allocator.buffer(capacity: 16)
tail.writeString("data: [DONE]\n\n")
context.write(NIOAny(HTTPServerResponsePart.body(.byteBuffer(tail))), promise: nil)
let ctx = NIOLoopBound(context, eventLoop: context.eventLoop)
context.writeAndFlush(NIOAny(HTTPServerResponsePart.end(nil as HTTPHeaders?))).whenComplete {
_ in
ctx.value.close(promise: nil)
}
}

// MARK: - Private Helpers

@inline(__always)
private func writeSSEEvent<T: Encodable>(_ eventType: String, payload: T, context: ChannelHandlerContext) {
let encoder = IkigaJSONEncoder()
var buffer = context.channel.allocator.buffer(capacity: 512)
buffer.writeString("event: ")
buffer.writeString(eventType)
buffer.writeString("\ndata: ")
do {
try encoder.encodeAndWrite(payload, into: &buffer)
buffer.writeString("\n\n")
context.write(NIOAny(HTTPServerResponsePart.body(.byteBuffer(buffer))), promise: nil)
context.flush()
} catch {
print("Error encoding Open Responses SSE event: \(error)")
context.close(promise: nil)
}
}
}
Loading
Loading