diff --git a/packages/pi-ai/src/providers/azure-openai-responses.ts b/packages/pi-ai/src/providers/azure-openai-responses.ts index 9a08b57f2..42646f533 100644 --- a/packages/pi-ai/src/providers/azure-openai-responses.ts +++ b/packages/pi-ai/src/providers/azure-openai-responses.ts @@ -5,8 +5,6 @@ import type { ResponseCreateParamsStreaming } from "openai/resources/responses/r import { getEnvApiKey } from "../env-api-keys.js"; import { supportsXhigh } from "../models.js"; import type { - Api, - AssistantMessage, Context, Model, SimpleStreamOptions, @@ -15,6 +13,13 @@ import type { } from "../types.js"; import { AssistantMessageEventStream } from "../utils/event-stream.js"; import { convertResponsesMessages, convertResponsesTools, processResponsesStream } from "./openai-responses-shared.js"; +import { + assertStreamSuccess, + buildInitialOutput, + clampReasoningForModel, + finalizeStream, + handleStreamError, +} from "./openai-shared.js"; import { buildBaseOptions, clampReasoning } from "./simple-options.js"; let _AzureOpenAIClass: typeof AzureOpenAI | undefined; @@ -26,16 +31,6 @@ async function getAzureOpenAIClass(): Promise { return _AzureOpenAIClass; } -/** - * Clamp reasoning effort for models that don't support all levels. - * gpt-5.x models don't support "minimal" — map to "low". - */ -function clampReasoningForModel(modelName: string, effort: string): string { - const name = modelName.includes("/") ? modelName.split("/").pop()! : modelName; - if (name.startsWith("gpt-5") && effort === "minimal") return "low"; - return effort; -} - const DEFAULT_AZURE_API_VERSION = "v1"; const AZURE_TOOL_CALL_PROVIDERS = new Set(["openai", "openai-codex", "opencode", "azure-openai-responses"]); @@ -83,24 +78,7 @@ export const streamAzureOpenAIResponses: StreamFunction<"azure-openai-responses" // Start async processing (async () => { const deploymentName = resolveDeploymentName(model, options); - - const output: AssistantMessage = { - role: "assistant", - content: [], - api: "azure-openai-responses" as Api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }, - stopReason: "stop", - timestamp: Date.now(), - }; + const output = buildInitialOutput(model); try { // Create Azure OpenAI client @@ -119,22 +97,10 @@ export const streamAzureOpenAIResponses: StreamFunction<"azure-openai-responses" await processResponsesStream(openaiStream, output, stream, model); - if (options?.signal?.aborted) { - throw new Error("Request was aborted"); - } - - if (output.stopReason === "aborted" || output.stopReason === "error") { - throw new Error("An unknown error occurred"); - } - - stream.push({ type: "done", reason: output.stopReason, message: output }); - stream.end(); + assertStreamSuccess(output, options?.signal); + finalizeStream(stream, output); } catch (error) { - for (const block of output.content) delete (block as { index?: number }).index; - output.stopReason = options?.signal?.aborted ? "aborted" : "error"; - output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); - stream.push({ type: "error", reason: output.stopReason, error: output }); - stream.end(); + handleStreamError(stream, output, error, options?.signal); } })(); diff --git a/packages/pi-ai/src/providers/openai-completions.ts b/packages/pi-ai/src/providers/openai-completions.ts index 7372d6880..4d6e1a3cf 100644 --- a/packages/pi-ai/src/providers/openai-completions.ts +++ b/packages/pi-ai/src/providers/openai-completions.ts @@ -31,19 +31,16 @@ import type { import { AssistantMessageEventStream } from "../utils/event-stream.js"; import { parseStreamingJson } from "../utils/json-parse.js"; import { sanitizeSurrogates } from "../utils/sanitize-unicode.js"; -import { buildCopilotDynamicHeaders, hasCopilotVisionInput } from "./github-copilot-headers.js"; import { buildBaseOptions, clampReasoning } from "./simple-options.js"; +import { + assertStreamSuccess, + buildInitialOutput, + createOpenAIClient, + finalizeStream, + handleStreamError, +} from "./openai-shared.js"; import { transformMessages } from "./transform-messages.js"; -let _OpenAICompletionsClass: typeof OpenAI | undefined; -async function getOpenAICompletionsClass(): Promise { - if (!_OpenAICompletionsClass) { - const mod = await import("openai"); - _OpenAICompletionsClass = mod.default; - } - return _OpenAICompletionsClass; -} - /** * Check if conversation messages contain tool calls or tool results. * This is needed because Anthropic (via proxy) requires the tools param @@ -76,27 +73,15 @@ export const streamOpenAICompletions: StreamFunction<"openai-completions", OpenA const stream = new AssistantMessageEventStream(); (async () => { - const output: AssistantMessage = { - role: "assistant", - content: [], - api: model.api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }, - stopReason: "stop", - timestamp: Date.now(), - }; + const output = buildInitialOutput(model); try { const apiKey = options?.apiKey || getEnvApiKey(model.provider) || ""; - const client = await createClient(model, context, apiKey, options?.headers); + const isZai = model.provider === "zai" || model.baseUrl.includes("api.z.ai"); + const client = await createOpenAIClient(model, context, apiKey, { + optionsHeaders: options?.headers, + extraClientOptions: isZai ? { timeout: 100_000, maxRetries: 4 } : undefined, + }); let params = buildParams(model, context, options); const nextParams = await options?.onPayload?.(params, model); if (nextParams !== undefined) { @@ -292,25 +277,12 @@ export const streamOpenAICompletions: StreamFunction<"openai-completions", OpenA } finishCurrentBlock(currentBlock); - if (options?.signal?.aborted) { - throw new Error("Request was aborted"); - } - - if (output.stopReason === "aborted" || output.stopReason === "error") { - throw new Error("An unknown error occurred"); - } - - stream.push({ type: "done", reason: output.stopReason, message: output }); - stream.end(); + assertStreamSuccess(output, options?.signal); + finalizeStream(stream, output); } catch (error) { - for (const block of output.content) delete (block as any).index; - output.stopReason = options?.signal?.aborted ? "aborted" : "error"; - output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); // Some providers via OpenRouter give additional information in this field. const rawMetadata = (error as any)?.error?.metadata?.raw; - if (rawMetadata) output.errorMessage += `\n${rawMetadata}`; - stream.push({ type: "error", reason: output.stopReason, error: output }); - stream.end(); + handleStreamError(stream, output, error, options?.signal, rawMetadata); } })(); @@ -338,48 +310,6 @@ export const streamSimpleOpenAICompletions: StreamFunction<"openai-completions", } satisfies OpenAICompletionsOptions); }; -async function createClient( - model: Model<"openai-completions">, - context: Context, - apiKey?: string, - optionsHeaders?: Record, -) { - if (!apiKey) { - if (!process.env.OPENAI_API_KEY) { - throw new Error( - "OpenAI API key is required. Set OPENAI_API_KEY environment variable or pass it as an argument.", - ); - } - apiKey = process.env.OPENAI_API_KEY; - } - - const headers = { ...model.headers }; - if (model.provider === "github-copilot") { - const hasImages = hasCopilotVisionInput(context.messages); - const copilotHeaders = buildCopilotDynamicHeaders({ - messages: context.messages, - hasImages, - }); - Object.assign(headers, copilotHeaders); - } - - // Merge options headers last so they can override defaults - if (optionsHeaders) { - Object.assign(headers, optionsHeaders); - } - - const isZai = model.provider === "zai" || model.baseUrl.includes("api.z.ai"); - const OpenAIClass = await getOpenAICompletionsClass(); - - return new OpenAIClass({ - apiKey, - baseURL: model.baseUrl, - dangerouslyAllowBrowser: true, - defaultHeaders: headers, - ...(isZai && { timeout: 100_000, maxRetries: 4 }), - }); -} - function buildParams(model: Model<"openai-completions">, context: Context, options?: OpenAICompletionsOptions) { const compat = getCompat(model); const messages = convertMessages(model, context, compat); diff --git a/packages/pi-ai/src/providers/openai-responses.ts b/packages/pi-ai/src/providers/openai-responses.ts index bf8fa347a..d7abd3c61 100644 --- a/packages/pi-ai/src/providers/openai-responses.ts +++ b/packages/pi-ai/src/providers/openai-responses.ts @@ -1,12 +1,9 @@ // Lazy-loaded: OpenAI SDK is imported on first use, not at startup. // This avoids penalizing users who don't use OpenAI models. -import type OpenAI from "openai"; import type { ResponseCreateParamsStreaming } from "openai/resources/responses/responses.js"; import { getEnvApiKey } from "../env-api-keys.js"; import { supportsXhigh } from "../models.js"; import type { - Api, - AssistantMessage, CacheRetention, Context, Model, @@ -16,29 +13,17 @@ import type { Usage, } from "../types.js"; import { AssistantMessageEventStream } from "../utils/event-stream.js"; -import { buildCopilotDynamicHeaders, hasCopilotVisionInput } from "./github-copilot-headers.js"; import { convertResponsesMessages, convertResponsesTools, processResponsesStream } from "./openai-responses-shared.js"; +import { + assertStreamSuccess, + buildInitialOutput, + clampReasoningForModel, + createOpenAIClient, + finalizeStream, + handleStreamError, +} from "./openai-shared.js"; import { buildBaseOptions, clampReasoning } from "./simple-options.js"; -let _OpenAIResponsesClass: typeof OpenAI | undefined; -async function getOpenAIResponsesClass(): Promise { - if (!_OpenAIResponsesClass) { - const mod = await import("openai"); - _OpenAIResponsesClass = mod.default; - } - return _OpenAIResponsesClass; -} - -/** - * Clamp reasoning effort for models that don't support all levels. - * gpt-5.x models don't support "minimal" — map to "low". - */ -function clampReasoningForModel(modelName: string, effort: string): string { - const name = modelName.includes("/") ? modelName.split("/").pop()! : modelName; - if (name.startsWith("gpt-5") && effort === "minimal") return "low"; - return effort; -} - const OPENAI_TOOL_CALL_PROVIDERS = new Set(["openai", "openai-codex", "opencode"]); /** @@ -88,28 +73,14 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses", OpenAIRes // Start async processing (async () => { - const output: AssistantMessage = { - role: "assistant", - content: [], - api: model.api as Api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }, - stopReason: "stop", - timestamp: Date.now(), - }; + const output = buildInitialOutput(model); try { // Create OpenAI client const apiKey = options?.apiKey || getEnvApiKey(model.provider) || ""; - const client = await createClient(model, context, apiKey, options?.headers); + const client = await createOpenAIClient(model, context, apiKey, { + optionsHeaders: options?.headers, + }); let params = buildParams(model, context, options); const nextParams = await options?.onPayload?.(params, model); if (nextParams !== undefined) { @@ -126,22 +97,10 @@ export const streamOpenAIResponses: StreamFunction<"openai-responses", OpenAIRes applyServiceTierPricing, }); - if (options?.signal?.aborted) { - throw new Error("Request was aborted"); - } - - if (output.stopReason === "aborted" || output.stopReason === "error") { - throw new Error("An unknown error occurred"); - } - - stream.push({ type: "done", reason: output.stopReason, message: output }); - stream.end(); + assertStreamSuccess(output, options?.signal); + finalizeStream(stream, output); } catch (error) { - for (const block of output.content) delete (block as { index?: number }).index; - output.stopReason = options?.signal?.aborted ? "aborted" : "error"; - output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); - stream.push({ type: "error", reason: output.stopReason, error: output }); - stream.end(); + handleStreamError(stream, output, error, options?.signal); } })(); @@ -167,45 +126,6 @@ export const streamSimpleOpenAIResponses: StreamFunction<"openai-responses", Sim } satisfies OpenAIResponsesOptions); }; -async function createClient( - model: Model<"openai-responses">, - context: Context, - apiKey?: string, - optionsHeaders?: Record, -) { - if (!apiKey) { - if (!process.env.OPENAI_API_KEY) { - throw new Error( - "OpenAI API key is required. Set OPENAI_API_KEY environment variable or pass it as an argument.", - ); - } - apiKey = process.env.OPENAI_API_KEY; - } - - const headers = { ...model.headers }; - if (model.provider === "github-copilot") { - const hasImages = hasCopilotVisionInput(context.messages); - const copilotHeaders = buildCopilotDynamicHeaders({ - messages: context.messages, - hasImages, - }); - Object.assign(headers, copilotHeaders); - } - - // Merge options headers last so they can override defaults - if (optionsHeaders) { - Object.assign(headers, optionsHeaders); - } - - const OpenAIClass = await getOpenAIResponsesClass(); - return new OpenAIClass({ - apiKey, - baseURL: model.baseUrl, - dangerouslyAllowBrowser: true, - defaultHeaders: headers, - }); -} - function buildParams(model: Model<"openai-responses">, context: Context, options?: OpenAIResponsesOptions) { const messages = convertResponsesMessages(model, context, OPENAI_TOOL_CALL_PROVIDERS); diff --git a/packages/pi-ai/src/providers/openai-shared.ts b/packages/pi-ai/src/providers/openai-shared.ts new file mode 100644 index 000000000..d3bdfc5cd --- /dev/null +++ b/packages/pi-ai/src/providers/openai-shared.ts @@ -0,0 +1,193 @@ +/** + * Shared utilities for OpenAI Completions and Responses providers. + * + * This module consolidates code that is identical (or near-identical) across + * openai-completions.ts and openai-responses.ts to reduce duplication while + * preserving the subtle behavioural differences of each provider. + */ + +import type OpenAI from "openai"; +import type { + Api, + AssistantMessage, + Context, + Model, + StopReason, +} from "../types.js"; +import type { AssistantMessageEventStream } from "../utils/event-stream.js"; +import { buildCopilotDynamicHeaders, hasCopilotVisionInput } from "./github-copilot-headers.js"; + +// ============================================================================= +// Lazy SDK loading +// ============================================================================= + +let _openAIClass: typeof OpenAI | undefined; + +/** + * Lazy-load the OpenAI SDK default export. + * Shared between Completions and Responses providers so the module is only + * imported once regardless of which provider is used first. + */ +export async function getOpenAIClass(): Promise { + if (!_openAIClass) { + const mod = await import("openai"); + _openAIClass = mod.default; + } + return _openAIClass; +} + +// ============================================================================= +// Client creation +// ============================================================================= + +export interface CreateClientOptions { + /** Extra headers from the options bag (merged last, can override defaults). */ + optionsHeaders?: Record; + /** Provider-specific client constructor options (e.g. timeout, maxRetries for Z.ai). */ + extraClientOptions?: Record; +} + +/** + * Create an OpenAI SDK client instance. + * + * Handles: + * - API key resolution (explicit > env) + * - GitHub Copilot dynamic headers + * - Options header merging + * - Lazy SDK loading + */ +export async function createOpenAIClient( + model: Model, + context: Context, + apiKey: string | undefined, + options?: CreateClientOptions, +): Promise { + if (!apiKey) { + if (!process.env.OPENAI_API_KEY) { + throw new Error( + "OpenAI API key is required. Set OPENAI_API_KEY environment variable or pass it as an argument.", + ); + } + apiKey = process.env.OPENAI_API_KEY; + } + + const headers = { ...model.headers }; + if (model.provider === "github-copilot") { + const hasImages = hasCopilotVisionInput(context.messages); + const copilotHeaders = buildCopilotDynamicHeaders({ + messages: context.messages, + hasImages, + }); + Object.assign(headers, copilotHeaders); + } + + // Merge options headers last so they can override defaults + if (options?.optionsHeaders) { + Object.assign(headers, options.optionsHeaders); + } + + const OpenAIClass = await getOpenAIClass(); + return new OpenAIClass({ + apiKey, + baseURL: model.baseUrl, + dangerouslyAllowBrowser: true, + defaultHeaders: headers, + ...options?.extraClientOptions, + }); +} + +// ============================================================================= +// Initial output construction +// ============================================================================= + +/** + * Build the initial AssistantMessage output object used by all OpenAI stream + * handlers. Every field is initialised to its zero/default value. + */ +export function buildInitialOutput(model: Model): AssistantMessage { + return { + role: "assistant", + content: [], + api: model.api as Api, + provider: model.provider, + model: model.id, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; +} + +// ============================================================================= +// Stream lifecycle helpers +// ============================================================================= + +/** + * Shared post-stream checks. Call after the provider-specific stream loop + * finishes successfully (before pushing the "done" event). + * + * Throws if the request was aborted or the output indicates an error. + */ +export function assertStreamSuccess(output: AssistantMessage, signal?: AbortSignal): void { + if (signal?.aborted) { + throw new Error("Request was aborted"); + } + if (output.stopReason === "aborted" || output.stopReason === "error") { + throw new Error("An unknown error occurred"); + } +} + +/** + * Emit the "done" event and close the stream. + */ +export function finalizeStream( + stream: AssistantMessageEventStream, + output: AssistantMessage, +): void { + stream.push({ type: "done", reason: output.stopReason as Extract, message: output }); + stream.end(); +} + +/** + * Handle an error during streaming. + * + * Cleans up any leftover `index` properties on content blocks, sets the + * appropriate stop reason and error message, then emits the "error" event. + */ +export function handleStreamError( + stream: AssistantMessageEventStream, + output: AssistantMessage, + error: unknown, + signal?: AbortSignal, + /** Extra error metadata to append (e.g. OpenRouter raw metadata). */ + extraMessage?: string, +): void { + for (const block of output.content) delete (block as { index?: number }).index; + output.stopReason = signal?.aborted ? "aborted" : "error"; + output.errorMessage = error instanceof Error ? error.message : JSON.stringify(error); + if (extraMessage) output.errorMessage += `\n${extraMessage}`; + stream.push({ type: "error", reason: output.stopReason, error: output }); + stream.end(); +} + +// ============================================================================= +// Reasoning helpers +// ============================================================================= + +/** + * Clamp reasoning effort for models that don't support all levels. + * gpt-5.x models don't support "minimal" -- map to "low". + * + * Used by both openai-responses.ts and azure-openai-responses.ts. + */ +export function clampReasoningForModel(modelName: string, effort: string): string { + const name = modelName.includes("/") ? modelName.split("/").pop()! : modelName; + if (name.startsWith("gpt-5") && effort === "minimal") return "low"; + return effort; +}