From 42f171ac05de05e5e65c899653fa879876240909 Mon Sep 17 00:00:00 2001 From: geoffsee <> Date: Fri, 23 May 2025 16:35:54 -0400 Subject: [PATCH] reintroduce updated streaming pattern --- workers/site/services/ChatService.ts | 316 +++++++++++++++++++++------ 1 file changed, 252 insertions(+), 64 deletions(-) diff --git a/workers/site/services/ChatService.ts b/workers/site/services/ChatService.ts index 6fe49d8..18bcf3c 100644 --- a/workers/site/services/ChatService.ts +++ b/workers/site/services/ChatService.ts @@ -1,9 +1,9 @@ -import {getSnapshot, types} from 'mobx-state-tree'; +import {flow, getSnapshot, types} from 'mobx-state-tree'; import OpenAI from 'openai'; import ChatSdk from '../sdk/chat-sdk'; import Message from "../models/Message"; import O1Message from "../models/O1Message"; -import {getModelFamily} from "../../../src/components/chat/SupportedModels"; +import {getModelFamily, ModelFamily} from "../../../src/components/chat/SupportedModels"; import {OpenAiChatSdk} from "../sdk/models/openai"; import {GroqChatSdk} from "../sdk/models/groq"; import {ClaudeChatSdk} from "../sdk/models/claude"; @@ -11,6 +11,8 @@ import {FireworksAiChatSdk} from "../sdk/models/fireworks"; import handleStreamData from "../sdk/handleStreamData"; import {GoogleChatSdk} from "../sdk/models/google"; import {XaiChatSdk} from "../sdk/models/xai"; +import {CerebrasSdk} from "../sdk/models/cerebras"; +import {CloudflareAISdk} from "../sdk/models/cloudflareAi"; // Types export interface StreamParams { @@ -33,10 +35,29 @@ interface StreamHandlerParams { dynamicContext?: any; } +const activeStreamType = types.model({ + name: types.optional(types.string, ""), + maxTokens: types.optional(types.number, 0), + systemPrompt: types.optional(types.string, ""), + model: types.optional(types.string, ""), + messages: types.optional(types.array(types.frozen()), []), + attachments: types.optional(types.array(types.frozen()), []), + tools: types.optional(types.array(types.frozen()), []), + disableWebhookGeneration: types.optional(types.boolean, false) +}); + +const activeStreamsMap = types.map( + activeStreamType, +); + const ChatService = types .model('ChatService', { openAIApiKey: types.optional(types.string, ""), openAIBaseURL: types.optional(types.string, ""), + activeStreams: types.optional( + activeStreamsMap, + {} // Correct initialization + ), maxTokens: types.number, systemPrompt: types.string }) @@ -62,7 +83,6 @@ const ChatService = types text: item.text })), }); - console.log({here: "createMessageInstance"}); return m; } throw new Error('Unsupported message format'); @@ -74,8 +94,8 @@ const ChatService = types ) => { console.log("handleWebhookProcessing::start"); if (!webhook) return; - console.log("handleWebhookProcessing::[Loading Hot Data]"); - dynamicContext.append("\n## Hot Data\n~~~markdown\n"); + console.log("handleWebhookProcessing::[Loading Live Search]"); + dynamicContext.append("\n## Live Search\n~~~markdown\n"); for await (const chunk of self.streamWebhookData({webhook})) { controller.enqueue(encoder.encode(chunk)); @@ -83,7 +103,7 @@ const ChatService = types } dynamicContext.append("\n~~~\n"); - console.log(`handleWebhookProcessing::[Finished loading Hot Data!][length: ${dynamicContext.content.length}]`); + console.log(`handleWebhookProcessing::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`); ChatSdk.sendDoubleNewline(controller, encoder); console.log("handleWebhookProcessing::exit") }; @@ -105,7 +125,7 @@ const ChatService = types disableWebhookGeneration: true, maxTokens: await durableObject.dynamicMaxTokens( streamConfig.messages, - 128000 + 2000 ), } }; @@ -123,9 +143,31 @@ const ChatService = types GoogleChatSdk.handleGoogleStream(params, dataHandler), xai: (params: StreamParams, dataHandler: Function) => XaiChatSdk.handleXaiStream(params, dataHandler), + cerebras: (params: StreamParams, dataHandler: Function) => + CerebrasSdk.handleCerebrasStream(params, dataHandler), + cloudflareAI: (params: StreamParams, dataHandler: Function) => + CloudflareAISdk.handleCloudflareAIStream(params, dataHandler) }; return { + setActiveStream(streamId: string, stream: any) { + const validStream = { + name: stream?.name || "Unnamed Stream", + maxTokens: stream?.maxTokens || 0, + systemPrompt: stream?.systemPrompt || "", + model: stream?.model || "", + messages: stream?.messages || [], + attachments: stream?.attachments || [], + tools: stream?.tools || [], + disableWebhookGeneration: stream?.disableWebhookGeneration || false, + }; + + self.activeStreams.set(streamId, validStream); + }, + + removeActiveStream(streamId: string) { + self.activeStreams.delete(streamId); + }, setEnv(env: Env) { self.env = env; self.openai = new OpenAI({ @@ -201,92 +243,238 @@ const ChatService = types throw error; } }, + /** + * runModelHandler + * Selects the correct model handler and invokes it. + */ + async runModelHandler(params: { + streamConfig: any; + streamParams: any; + controller: ReadableStreamDefaultController; + encoder: TextEncoder; + streamId: string; + }) { + const {streamConfig, streamParams, controller, encoder, streamId} = params; - async handleSseStream(streamId: string) { - console.log(`chatService::handleSseStream::enter::${streamId}`); + const modelFamily = getModelFamily(streamConfig.model); + console.log( + `chatService::handleSseStream::ReadableStream::modelFamily::${modelFamily}` + ); - const objectId = self.env.SITE_COORDINATOR.idFromName('stream-index'); - const durableObject = self.env.SITE_COORDINATOR.get(objectId); - const savedStreamConfig = await durableObject.getStreamData(streamId); - // console.log({savedStreamConfig}); + const handler = modelHandlers[modelFamily as ModelFamily]; + if (handler) { + try { + console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::start`); + await handler(streamParams, handleStreamData(controller, encoder)); + console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::finish`); + } catch (error) { + const message = error.message.toLowerCase(); - if (!savedStreamConfig) { - return new Response('Stream not found', {status: 404}); + if(message.includes("413 ") || (message.includes("maximum") || message.includes("too long") || message.includes("too large") )) { + throw new ClientError(`Error! Content length exceeds limits. Try shortening your message, removing any attached files, or editing an earlier message instead.`, 413, {model: streamConfig.model, maxTokens: streamParams.maxTokens}) + // throw new Error(`Max tokens exceeded for model ${streamConfig.model}`) + } + if(message.includes("429 ")) { + throw new ClientError(`Error! Rate limit exceeded. Wait a few minutes before trying again.`, 429, {model: streamConfig.model, maxTokens: streamParams.maxTokens}) + // throw new Error(`Max tokens exceeded for model ${streamConfig.model}`) + } + if (message.includes("404")) { + throw new ClientError(`Something went wrong, try again.`, 413, {}) + // throw new Error(`Max tokens exceeded for model ${streamConfig.model}`) + } + throw error; + /* + '413 Request too large for model `mixtral-8x7b-32768` in organization `org_01htjxws48fm0rbbg5gnkgmbrh` service tier `on_demand` on tokens per minute (TPM): Limit 5000, Requested 49590, please reduce your message size and try again. Visit https://console.groq.com/docs/rate-limits for more information.' + */ + } } + }, + /** + * handleWebhookIfNeeded + * Checks if a webhook exists, and if so, processes it. + */ + async handleWebhookIfNeeded(params: { + savedStreamConfig: string; + controller: ReadableStreamDefaultController; + encoder: TextEncoder; + dynamicContext: any; // or more specific type + }) { + const {savedStreamConfig, controller, encoder, dynamicContext} = params; - const streamConfig = JSON.parse(savedStreamConfig); - console.log(`chatService::handleSseStream::${streamId}::[stream configured]`); + const config = JSON.parse(savedStreamConfig); + const webhook = config?.webhooks?.[0]; - console.log(`chatService::handleSseStream::${streamId}::[initializing stream]`); + if (webhook) { + console.log(`chatService::handleSseStream::ReadableStream::webhook:start`); + await handleWebhookProcessing({ + controller, + encoder, + webhook, + dynamicContext, + }); + console.log(`chatService::handleSseStream::ReadableStream::webhook::end`); + } + }, - const stream = new ReadableStream({ + + createSseReadableStream(params: { + streamId: string; + streamConfig: any; + savedStreamConfig: string; + durableObject: any; + }) { + const { streamId, streamConfig, savedStreamConfig, durableObject } = params; + + return new ReadableStream({ async start(controller) { console.log(`chatService::handleSseStream::ReadableStream::${streamId}::open`); const encoder = new TextEncoder(); - // controller.enqueue(encoder.encode('retry: 3\n\n')); - console.log(streamConfig.preprocessedContext); - - const dynamicContext = Message.create(streamConfig.preprocessedContext); - console.log(`chatService::handleSseStream::ReadableStream::dynamicContext`); try { + // Send initial retry directive + // controller.enqueue(encoder.encode('retry: 0\n\n')); + const dynamicContext = Message.create(streamConfig.preprocessedContext); - const config = JSON.parse(savedStreamConfig); - const webhook = config?.webhooks?.at(0); - - if (webhook) { - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::webhook:start`); - await handleWebhookProcessing({ - controller, - encoder, - webhook, - dynamicContext - }); - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::webhook::end`); - } + // Process webhook if configured + await self.handleWebhookIfNeeded({ + savedStreamConfig, + controller, + encoder, + dynamicContext: dynamicContext, + }); + // Process the stream data using the appropriate handler const streamParams = await createStreamParams( streamConfig, dynamicContext, durableObject ); - const modelFamily = getModelFamily(streamConfig.model); - console.log(`chatService::handleSseStream::ReadableStream::modelFamily::${modelFamily}`); - const handler = modelHandlers[modelFamily]; - - if (handler) { - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::start`); - await handler( - streamParams, - handleStreamData(controller, encoder) - ); - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::finish`); + try { + await self.runModelHandler({ + streamConfig, + streamParams, + controller, + encoder, + streamId, + }); + } catch (e) { + console.log("error caught at runModelHandler") + throw e; } + } catch (error) { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({error: error.message})}\n\n`) - ); - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::Error::${error}`); - } finally { - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::closed::${streamId}`); + console.error(`chatService::handleSseStream::${streamId}::Error`, error); + + if(error instanceof ClientError) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`) + ); + } else { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ type: 'error', error: "Server error" })}\n\n`) + ); + } controller.close(); + } finally { + try { + controller.close(); + } catch (_) {} } }, }); - return new Response( - stream, - { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', + }, + + + handleSseStream: flow(function* (streamId: string): Generator, Response, unknown> { + console.log(`chatService::handleSseStream::enter::${streamId}`); + + // Check if a stream is already active for this ID + if (self.activeStreams.has(streamId)) { + console.log(`chatService::handleSseStream::${streamId}::[stream already active]`); + return new Response('Stream already active', { status: 409 }); + } + + // Retrieve the stream configuration from the durable object + const objectId = self.env.SITE_COORDINATOR.idFromName('stream-index'); + const durableObject = self.env.SITE_COORDINATOR.get(objectId); + const savedStreamConfig = yield durableObject.getStreamData(streamId); + + if (!savedStreamConfig) { + return new Response('Stream not found', { status: 404 }); + } + + const streamConfig = JSON.parse(savedStreamConfig); + console.log(`chatService::handleSseStream::${streamId}::[stream configured]`); + + // Create the SSE readable stream + const stream = self.createSseReadableStream({ + streamId, + streamConfig, + savedStreamConfig, + durableObject, + }); + + // Use `tee()` to create two streams: one for processing and one for the response + const [processingStream, responseStream] = stream.tee(); + + // Add the new stream to activeStreams + self.setActiveStream(streamId, { + ...streamConfig, // Ensure streamConfig matches the expected structure + }); + + // Process the stream for internal logic + processingStream.pipeTo( + new WritableStream({ + close() { + console.log(`chatService::handleSseStream::${streamId}::[stream closed]`); + self.removeActiveStream(streamId); // Use action to update state }, - } + }) ); - } + + // Return the second stream as the response + return new Response(responseStream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }); + }), + }; }); -export default ChatService; \ No newline at end of file + +/** + * ClientError + * A custom construct for sending client-friendly errors via the controller in a structured and controlled manner. + */ +export class ClientError extends Error { + public statusCode: number; + public details: Record; + + constructor(message: string, statusCode: number, details: Record = {}) { + super(message); + this.name = 'ClientError'; + this.statusCode = statusCode; + this.details = details; + Object.setPrototypeOf(this, ClientError.prototype); + } + + /** + * Formats the error for SSE-compatible data transmission. + */ + public formatForSSE(): string { + return JSON.stringify({ + type: 'error', + message: this.message, + details: this.details, + statusCode: this.statusCode, + }); + } +} + +export default ChatService;