reintroduce updated streaming pattern

This commit is contained in:
geoffsee
2025-05-23 16:35:54 -04:00
parent 0d20d1367a
commit 42f171ac05

View File

@@ -1,9 +1,9 @@
import {getSnapshot, types} from 'mobx-state-tree'; import {flow, getSnapshot, types} from 'mobx-state-tree';
import OpenAI from 'openai'; import OpenAI from 'openai';
import ChatSdk from '../sdk/chat-sdk'; import ChatSdk from '../sdk/chat-sdk';
import Message from "../models/Message"; import Message from "../models/Message";
import O1Message from "../models/O1Message"; import O1Message from "../models/O1Message";
import {getModelFamily} from "../../../src/components/chat/SupportedModels"; import {getModelFamily, ModelFamily} from "../../../src/components/chat/SupportedModels";
import {OpenAiChatSdk} from "../sdk/models/openai"; import {OpenAiChatSdk} from "../sdk/models/openai";
import {GroqChatSdk} from "../sdk/models/groq"; import {GroqChatSdk} from "../sdk/models/groq";
import {ClaudeChatSdk} from "../sdk/models/claude"; import {ClaudeChatSdk} from "../sdk/models/claude";
@@ -11,6 +11,8 @@ import {FireworksAiChatSdk} from "../sdk/models/fireworks";
import handleStreamData from "../sdk/handleStreamData"; import handleStreamData from "../sdk/handleStreamData";
import {GoogleChatSdk} from "../sdk/models/google"; import {GoogleChatSdk} from "../sdk/models/google";
import {XaiChatSdk} from "../sdk/models/xai"; import {XaiChatSdk} from "../sdk/models/xai";
import {CerebrasSdk} from "../sdk/models/cerebras";
import {CloudflareAISdk} from "../sdk/models/cloudflareAi";
// Types // Types
export interface StreamParams { export interface StreamParams {
@@ -33,10 +35,29 @@ interface StreamHandlerParams {
dynamicContext?: any; dynamicContext?: any;
} }
const activeStreamType = types.model({
name: types.optional(types.string, ""),
maxTokens: types.optional(types.number, 0),
systemPrompt: types.optional(types.string, ""),
model: types.optional(types.string, ""),
messages: types.optional(types.array(types.frozen()), []),
attachments: types.optional(types.array(types.frozen()), []),
tools: types.optional(types.array(types.frozen()), []),
disableWebhookGeneration: types.optional(types.boolean, false)
});
const activeStreamsMap = types.map(
activeStreamType,
);
const ChatService = types const ChatService = types
.model('ChatService', { .model('ChatService', {
openAIApiKey: types.optional(types.string, ""), openAIApiKey: types.optional(types.string, ""),
openAIBaseURL: types.optional(types.string, ""), openAIBaseURL: types.optional(types.string, ""),
activeStreams: types.optional(
activeStreamsMap,
{} // Correct initialization
),
maxTokens: types.number, maxTokens: types.number,
systemPrompt: types.string systemPrompt: types.string
}) })
@@ -62,7 +83,6 @@ const ChatService = types
text: item.text text: item.text
})), })),
}); });
console.log({here: "createMessageInstance"});
return m; return m;
} }
throw new Error('Unsupported message format'); throw new Error('Unsupported message format');
@@ -74,8 +94,8 @@ const ChatService = types
) => { ) => {
console.log("handleWebhookProcessing::start"); console.log("handleWebhookProcessing::start");
if (!webhook) return; if (!webhook) return;
console.log("handleWebhookProcessing::[Loading Hot Data]"); console.log("handleWebhookProcessing::[Loading Live Search]");
dynamicContext.append("\n## Hot Data\n~~~markdown\n"); dynamicContext.append("\n## Live Search\n~~~markdown\n");
for await (const chunk of self.streamWebhookData({webhook})) { for await (const chunk of self.streamWebhookData({webhook})) {
controller.enqueue(encoder.encode(chunk)); controller.enqueue(encoder.encode(chunk));
@@ -83,7 +103,7 @@ const ChatService = types
} }
dynamicContext.append("\n~~~\n"); dynamicContext.append("\n~~~\n");
console.log(`handleWebhookProcessing::[Finished loading Hot Data!][length: ${dynamicContext.content.length}]`); console.log(`handleWebhookProcessing::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`);
ChatSdk.sendDoubleNewline(controller, encoder); ChatSdk.sendDoubleNewline(controller, encoder);
console.log("handleWebhookProcessing::exit") console.log("handleWebhookProcessing::exit")
}; };
@@ -105,7 +125,7 @@ const ChatService = types
disableWebhookGeneration: true, disableWebhookGeneration: true,
maxTokens: await durableObject.dynamicMaxTokens( maxTokens: await durableObject.dynamicMaxTokens(
streamConfig.messages, streamConfig.messages,
128000 2000
), ),
} }
}; };
@@ -123,9 +143,31 @@ const ChatService = types
GoogleChatSdk.handleGoogleStream(params, dataHandler), GoogleChatSdk.handleGoogleStream(params, dataHandler),
xai: (params: StreamParams, dataHandler: Function) => xai: (params: StreamParams, dataHandler: Function) =>
XaiChatSdk.handleXaiStream(params, dataHandler), XaiChatSdk.handleXaiStream(params, dataHandler),
cerebras: (params: StreamParams, dataHandler: Function) =>
CerebrasSdk.handleCerebrasStream(params, dataHandler),
cloudflareAI: (params: StreamParams, dataHandler: Function) =>
CloudflareAISdk.handleCloudflareAIStream(params, dataHandler)
}; };
return { return {
setActiveStream(streamId: string, stream: any) {
const validStream = {
name: stream?.name || "Unnamed Stream",
maxTokens: stream?.maxTokens || 0,
systemPrompt: stream?.systemPrompt || "",
model: stream?.model || "",
messages: stream?.messages || [],
attachments: stream?.attachments || [],
tools: stream?.tools || [],
disableWebhookGeneration: stream?.disableWebhookGeneration || false,
};
self.activeStreams.set(streamId, validStream);
},
removeActiveStream(streamId: string) {
self.activeStreams.delete(streamId);
},
setEnv(env: Env) { setEnv(env: Env) {
self.env = env; self.env = env;
self.openai = new OpenAI({ self.openai = new OpenAI({
@@ -201,92 +243,238 @@ const ChatService = types
throw error; throw error;
} }
}, },
/**
* runModelHandler
* Selects the correct model handler and invokes it.
*/
async runModelHandler(params: {
streamConfig: any;
streamParams: any;
controller: ReadableStreamDefaultController;
encoder: TextEncoder;
streamId: string;
}) {
const {streamConfig, streamParams, controller, encoder, streamId} = params;
async handleSseStream(streamId: string) { const modelFamily = getModelFamily(streamConfig.model);
console.log(`chatService::handleSseStream::enter::${streamId}`); console.log(
`chatService::handleSseStream::ReadableStream::modelFamily::${modelFamily}`
);
const objectId = self.env.SITE_COORDINATOR.idFromName('stream-index'); const handler = modelHandlers[modelFamily as ModelFamily];
const durableObject = self.env.SITE_COORDINATOR.get(objectId); if (handler) {
const savedStreamConfig = await durableObject.getStreamData(streamId); try {
// console.log({savedStreamConfig}); console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::start`);
await handler(streamParams, handleStreamData(controller, encoder));
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::finish`);
} catch (error) {
const message = error.message.toLowerCase();
if (!savedStreamConfig) { if(message.includes("413 ") || (message.includes("maximum") || message.includes("too long") || message.includes("too large") )) {
return new Response('Stream not found', {status: 404}); throw new ClientError(`Error! Content length exceeds limits. Try shortening your message, removing any attached files, or editing an earlier message instead.`, 413, {model: streamConfig.model, maxTokens: streamParams.maxTokens})
// throw new Error(`Max tokens exceeded for model ${streamConfig.model}`)
}
if(message.includes("429 ")) {
throw new ClientError(`Error! Rate limit exceeded. Wait a few minutes before trying again.`, 429, {model: streamConfig.model, maxTokens: streamParams.maxTokens})
// throw new Error(`Max tokens exceeded for model ${streamConfig.model}`)
}
if (message.includes("404")) {
throw new ClientError(`Something went wrong, try again.`, 413, {})
// throw new Error(`Max tokens exceeded for model ${streamConfig.model}`)
}
throw error;
/*
'413 Request too large for model `mixtral-8x7b-32768` in organization `org_01htjxws48fm0rbbg5gnkgmbrh` service tier `on_demand` on tokens per minute (TPM): Limit 5000, Requested 49590, please reduce your message size and try again. Visit https://console.groq.com/docs/rate-limits for more information.'
*/
}
} }
},
/**
* handleWebhookIfNeeded
* Checks if a webhook exists, and if so, processes it.
*/
async handleWebhookIfNeeded(params: {
savedStreamConfig: string;
controller: ReadableStreamDefaultController;
encoder: TextEncoder;
dynamicContext: any; // or more specific type
}) {
const {savedStreamConfig, controller, encoder, dynamicContext} = params;
const streamConfig = JSON.parse(savedStreamConfig); const config = JSON.parse(savedStreamConfig);
console.log(`chatService::handleSseStream::${streamId}::[stream configured]`); const webhook = config?.webhooks?.[0];
console.log(`chatService::handleSseStream::${streamId}::[initializing stream]`); if (webhook) {
console.log(`chatService::handleSseStream::ReadableStream::webhook:start`);
await handleWebhookProcessing({
controller,
encoder,
webhook,
dynamicContext,
});
console.log(`chatService::handleSseStream::ReadableStream::webhook::end`);
}
},
const stream = new ReadableStream({
createSseReadableStream(params: {
streamId: string;
streamConfig: any;
savedStreamConfig: string;
durableObject: any;
}) {
const { streamId, streamConfig, savedStreamConfig, durableObject } = params;
return new ReadableStream({
async start(controller) { async start(controller) {
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::open`); console.log(`chatService::handleSseStream::ReadableStream::${streamId}::open`);
const encoder = new TextEncoder(); const encoder = new TextEncoder();
// controller.enqueue(encoder.encode('retry: 3\n\n'));
console.log(streamConfig.preprocessedContext);
const dynamicContext = Message.create(streamConfig.preprocessedContext);
console.log(`chatService::handleSseStream::ReadableStream::dynamicContext`);
try { try {
// Send initial retry directive
// controller.enqueue(encoder.encode('retry: 0\n\n'));
const dynamicContext = Message.create(streamConfig.preprocessedContext);
const config = JSON.parse(savedStreamConfig);
const webhook = config?.webhooks?.at(0); // Process webhook if configured
await self.handleWebhookIfNeeded({
if (webhook) { savedStreamConfig,
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::webhook:start`); controller,
await handleWebhookProcessing({ encoder,
controller, dynamicContext: dynamicContext,
encoder, });
webhook,
dynamicContext
});
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::webhook::end`);
}
// Process the stream data using the appropriate handler
const streamParams = await createStreamParams( const streamParams = await createStreamParams(
streamConfig, streamConfig,
dynamicContext, dynamicContext,
durableObject durableObject
); );
const modelFamily = getModelFamily(streamConfig.model); try {
console.log(`chatService::handleSseStream::ReadableStream::modelFamily::${modelFamily}`); await self.runModelHandler({
const handler = modelHandlers[modelFamily]; streamConfig,
streamParams,
if (handler) { controller,
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::start`); encoder,
await handler( streamId,
streamParams, });
handleStreamData(controller, encoder) } catch (e) {
); console.log("error caught at runModelHandler")
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::finish`); throw e;
} }
} catch (error) { } catch (error) {
controller.enqueue( console.error(`chatService::handleSseStream::${streamId}::Error`, error);
encoder.encode(`data: ${JSON.stringify({error: error.message})}\n\n`)
); if(error instanceof ClientError) {
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::Error::${error}`); controller.enqueue(
} finally { encoder.encode(`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`)
console.log(`chatService::handleSseStream::ReadableStream::${streamId}::closed::${streamId}`); );
} else {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'error', error: "Server error" })}\n\n`)
);
}
controller.close(); controller.close();
} finally {
try {
controller.close();
} catch (_) {}
} }
}, },
}); });
return new Response( },
stream,
{
headers: { handleSseStream: flow(function* (streamId: string): Generator<Promise<string>, Response, unknown> {
'Content-Type': 'text/event-stream', console.log(`chatService::handleSseStream::enter::${streamId}`);
'Cache-Control': 'no-cache',
'Connection': 'keep-alive', // Check if a stream is already active for this ID
if (self.activeStreams.has(streamId)) {
console.log(`chatService::handleSseStream::${streamId}::[stream already active]`);
return new Response('Stream already active', { status: 409 });
}
// Retrieve the stream configuration from the durable object
const objectId = self.env.SITE_COORDINATOR.idFromName('stream-index');
const durableObject = self.env.SITE_COORDINATOR.get(objectId);
const savedStreamConfig = yield durableObject.getStreamData(streamId);
if (!savedStreamConfig) {
return new Response('Stream not found', { status: 404 });
}
const streamConfig = JSON.parse(savedStreamConfig);
console.log(`chatService::handleSseStream::${streamId}::[stream configured]`);
// Create the SSE readable stream
const stream = self.createSseReadableStream({
streamId,
streamConfig,
savedStreamConfig,
durableObject,
});
// Use `tee()` to create two streams: one for processing and one for the response
const [processingStream, responseStream] = stream.tee();
// Add the new stream to activeStreams
self.setActiveStream(streamId, {
...streamConfig, // Ensure streamConfig matches the expected structure
});
// Process the stream for internal logic
processingStream.pipeTo(
new WritableStream({
close() {
console.log(`chatService::handleSseStream::${streamId}::[stream closed]`);
self.removeActiveStream(streamId); // Use action to update state
}, },
} })
); );
}
// Return the second stream as the response
return new Response(responseStream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}),
}; };
}); });
export default ChatService;
/**
* ClientError
* A custom construct for sending client-friendly errors via the controller in a structured and controlled manner.
*/
export class ClientError extends Error {
public statusCode: number;
public details: Record<string, any>;
constructor(message: string, statusCode: number, details: Record<string, any> = {}) {
super(message);
this.name = 'ClientError';
this.statusCode = statusCode;
this.details = details;
Object.setPrototypeOf(this, ClientError.prototype);
}
/**
* Formats the error for SSE-compatible data transmission.
*/
public formatForSSE(): string {
return JSON.stringify({
type: 'error',
message: this.message,
details: this.details,
statusCode: this.statusCode,
});
}
}
export default ChatService;