diff --git a/src/components/chat/ChatInputTextArea.tsx b/src/components/chat/ChatInputTextArea.tsx index 182bc5c..0ebd7f9 100644 --- a/src/components/chat/ChatInputTextArea.tsx +++ b/src/components/chat/ChatInputTextArea.tsx @@ -1,15 +1,6 @@ -import React, { useEffect, useRef, useState } from "react"; -import { observer } from "mobx-react-lite"; -import { - Alert, - AlertIcon, - Box, - chakra, - HStack, - InputGroup, -} from "@chakra-ui/react"; -import fileUploadStore from "../../stores/FileUploadStore"; -import { UploadedItem } from "./Attachments"; +import React, {useEffect, useRef, useState} from "react"; +import {observer} from "mobx-react-lite"; +import {Box, chakra, InputGroup,} from "@chakra-ui/react"; import AutoResize from "react-textarea-autosize"; const AutoResizeTextArea = chakra(AutoResize); @@ -24,24 +15,6 @@ interface InputTextAreaProps { const InputTextArea: React.FC = observer( ({ inputRef, value, onChange, onKeyDown, isLoading }) => { - const fileInputRef = useRef(null); - - const handleAttachmentClick = () => { - if (fileInputRef.current) { - fileInputRef.current.click(); - } - }; - - const handleFileChange = (event: React.ChangeEvent) => { - const file = event.target.files?.[0]; - if (file) { - fileUploadStore.uploadFile(file, "/api/documents"); - } - }; - - const handleRemoveUploadedItem = (url: string) => { - fileUploadStore.removeUploadedFile(url); - }; const [heightConstraint, setHeightConstraint] = useState< number | undefined @@ -61,26 +34,6 @@ const InputTextArea: React.FC = observer( display="flex" flexDirection="column" > - {/* Attachments Section */} - {fileUploadStore.uploadResults.length > 0 && ( - - {fileUploadStore.uploadResults.map((result) => ( - handleRemoveUploadedItem(result.url)} - /> - ))} - - )} {/* Input Area */} @@ -97,7 +50,7 @@ const InputTextArea: React.FC = observer( pl="17px" bg="rgba(255, 255, 255, 0.15)" color="text.primary" - borderRadius="20px" // Set a consistent border radius + borderRadius="20px" border="none" placeholder="Free my mind..." _placeholder={{ color: "gray.400" }} @@ -115,32 +68,7 @@ const InputTextArea: React.FC = observer( transition: "height 0.2s ease-in-out", }} /> - {/**/} - {/**/} - {/**/} - - {fileUploadStore.uploadError && ( - - - {fileUploadStore.uploadError} - - )} ); }, diff --git a/src/pages/+client.ts b/src/pages/+client.ts index 0d14187..4dc621d 100644 --- a/src/pages/+client.ts +++ b/src/pages/+client.ts @@ -1,3 +1,4 @@ +// runs before anything else import UserOptionsStore from "../stores/UserOptionsStore"; UserOptionsStore.initialize(); diff --git a/src/pages/+data.ts b/src/pages/+data.ts index dd7b677..886ae4d 100644 --- a/src/pages/+data.ts +++ b/src/pages/+data.ts @@ -1,10 +1,10 @@ -// https://vike.dev/data import Routes from "../../src/renderer/routes"; export { data }; export type Data = Awaited>; import type { PageContextServer } from "vike/types"; +// sets the window title depending on the route const data = async (pageContext: PageContextServer) => { const getTitle = (path) => { return Routes[normalizePath(path)]?.heroLabel || ""; diff --git a/src/pages/_error/+Page.tsx b/src/pages/_error/+Page.tsx index 7760261..39a8153 100644 --- a/src/pages/_error/+Page.tsx +++ b/src/pages/_error/+Page.tsx @@ -1,3 +1,4 @@ +// client error catcher import { usePageContext } from "../../renderer/usePageContext"; import { Center, Text } from "@chakra-ui/react"; diff --git a/src/pages/index/+Page.tsx b/src/pages/index/+Page.tsx index 4789f34..05e0e48 100644 --- a/src/pages/index/+Page.tsx +++ b/src/pages/index/+Page.tsx @@ -4,7 +4,7 @@ import Chat from "../../components/chat/Chat"; import clientChatStore from "../../stores/ClientChatStore"; import { getModelFamily } from "../../components/chat/SupportedModels"; -// renders for path: "/" +// renders "/" export default function IndexPage() { useEffect(() => { try { diff --git a/src/renderer/+onRenderClient.tsx b/src/renderer/+onRenderClient.tsx index 48a3487..5254b3e 100644 --- a/src/renderer/+onRenderClient.tsx +++ b/src/renderer/+onRenderClient.tsx @@ -1,10 +1,10 @@ -// https://vike.dev/onRenderClient export { onRenderClient }; import React from "react"; import { hydrateRoot } from "react-dom/client"; import { Layout } from "../layout/Layout"; +// See https://vike.dev/onRenderClient for usage details async function onRenderClient(pageContext) { const { Page, pageProps } = pageContext; hydrateRoot( @@ -13,4 +13,4 @@ async function onRenderClient(pageContext) { , ); -} +} \ No newline at end of file diff --git a/src/renderer/+onRenderHtml.tsx b/src/renderer/+onRenderHtml.tsx index f275a93..8669e3c 100644 --- a/src/renderer/+onRenderHtml.tsx +++ b/src/renderer/+onRenderHtml.tsx @@ -1,5 +1,4 @@ import React from "react"; -// https://vike.dev/onRenderHtml export { onRenderHtml }; import { renderToStream } from "react-streaming/server"; @@ -7,6 +6,7 @@ import { escapeInject } from "vike/server"; import { Layout } from "../layout/Layout"; import type { OnRenderHtmlAsync } from "vike/types"; +// See https://vike.dev/onRenderHtml for usage details const onRenderHtml: OnRenderHtmlAsync = async ( pageContext, ): ReturnType => { @@ -49,8 +49,6 @@ window.ga_api = "/api/metrics"; return { documentHtml: res, - pageContext: { - // enableEagerStreaming: true - }, + pageContext: {}, }; }; diff --git a/src/renderer/routes.ts b/src/renderer/routes.ts index 07f72cb..f052858 100644 --- a/src/renderer/routes.ts +++ b/src/renderer/routes.ts @@ -1,3 +1,4 @@ +// Top level control interface for navigation export default { "/": { sidebarLabel: "Home", heroLabel: "g.s" }, // "/about": { sidebarLabel: "About", heroLabel: "About Me" }, diff --git a/src/renderer/types.ts b/src/renderer/types.ts index a6b1bcd..218e2b9 100644 --- a/src/renderer/types.ts +++ b/src/renderer/types.ts @@ -1,4 +1,3 @@ -// renderer/types.ts export type { PageProps }; type Page = (pageProps: PageProps) => React.ReactElement; @@ -10,8 +9,6 @@ declare global { Page: Page; pageProps?: PageProps; fetch?: typeof fetch; - - // Add your environment bindings here env: import("../../workers/site/env"); } } diff --git a/src/stores/ClientChatStore.ts b/src/stores/ClientChatStore.ts index 16ea043..d1fd4a4 100644 --- a/src/stores/ClientChatStore.ts +++ b/src/stores/ClientChatStore.ts @@ -1,6 +1,5 @@ import { applySnapshot, flow, Instance, types } from "mobx-state-tree"; import Message from "../models/Message"; -import Attachment from "../models/Attachment"; import IntermediateStep from "../models/IntermediateStep"; import UserOptionsStore from "./UserOptionsStore"; @@ -11,7 +10,6 @@ const ClientChatStore = types isLoading: types.optional(types.boolean, false), model: types.optional(types.string, "llama-3.3-70b-versatile"), imageModel: types.optional(types.string, "black-forest-labs/flux-1.1-pro"), - attachments: types.optional(types.array(Attachment), []), tools: types.optional(types.array(types.string), []), intermediateSteps: types.array(IntermediateStep), }) @@ -41,7 +39,6 @@ const ClientChatStore = types const payload = { messages: self.messages.slice(), model: self.model, - attachments: self.attachments.slice(), tools: self.tools.slice(), }; @@ -163,7 +160,6 @@ const ClientChatStore = types const payload = { messages: self.messages.slice(), model: self.model, - attachments: self.attachments.slice(), tools: self.tools.slice(), }; @@ -244,15 +240,6 @@ const ClientChatStore = types reset() { applySnapshot(self, {}); }, - addAttachment(attachment: Instance) { - self.attachments.push(attachment); - - if (self.attachments.length > 0) { - if (!self.tools.includes("user-attachments")) { - self.tools.push("user-attachments"); - } - } - }, addIntermediateStep(stepData) { return; }, @@ -271,21 +258,6 @@ const ClientChatStore = types self.messages.splice(index + 1); } }, - removeAttachment(url: string) { - const f = - self.attachments.filter((attachment) => attachment.url !== url) ?? []; - self.attachments.clear(); - - self.attachments.push(...f); - - if (self.attachments.length === 0) { - const remainingTools = self.tools.filter( - (tool) => tool !== "user-attachments", - ); - self.tools.clear(); - self.tools.push(...remainingTools); - } - }, setTools(tools: string[]) { self.tools.clear(); self.tools.push(...tools); diff --git a/src/stores/FileUploadStore.ts b/src/stores/FileUploadStore.ts deleted file mode 100644 index c470bfa..0000000 --- a/src/stores/FileUploadStore.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { types, flow } from "mobx-state-tree"; -import clientChatStore from "./ClientChatStore"; -import Attachment from "../models/Attachment"; - -const FileUploadStore = types - .model("FileUploadStore", { - isUploading: types.optional(types.boolean, false), - uploadError: types.maybeNull(types.string), - uploadedFiles: types.array(types.string), - uploadResults: types.array(types.frozen()), - }) - .actions((self) => ({ - uploadFile: flow(function* (file: File, endpoint: string) { - if (!endpoint) { - self.uploadError = "Endpoint URL is required."; - return; - } - - self.isUploading = true; - self.uploadError = null; - - const formData = new FormData(); - formData.append("file", file); - - try { - const response = yield fetch(endpoint, { - method: "POST", - body: formData, - }); - - if (!response.ok) { - throw new Error(`Upload failed with status: ${response.status}`); - } - - const result = yield response.json(); - self.uploadResults.push(result); - - if (result.url) { - self.uploadedFiles.push(result.url); - clientChatStore.addAttachment( - Attachment.create({ - content: `${file.name}\n~~~${result?.extractedText}\n`, - url: result.url, - }), - ); - } else { - throw new Error("No URL returned from the server."); - } - } catch (error: any) { - self.uploadError = error.message; - } finally { - self.isUploading = false; - } - }), - removeUploadedFile(url: string) { - clientChatStore.removeAttachment(url); - const index = self.uploadedFiles.findIndex( - (uploadedUrl) => uploadedUrl === url, - ); - if (index !== -1) { - self.uploadedFiles.splice(index, 1); - self.uploadResults.splice(index, 1); - } - }, - })); - -export default FileUploadStore.create(); diff --git a/workers/site/services/ChatService.ts b/workers/site/services/ChatService.ts index eb7bd8c..6183e0f 100644 --- a/workers/site/services/ChatService.ts +++ b/workers/site/services/ChatService.ts @@ -15,34 +15,21 @@ import {CerebrasSdk} from "../sdk/models/cerebras"; import {CloudflareAISdk} from "../sdk/models/cloudflareAi"; export interface StreamParams { - env: Env; - openai: OpenAI; - messages: any[]; - model: string; - systemPrompt: string; - preprocessedContext: any; - attachments: any[]; - tools: any[]; - disableWebhookGeneration: boolean; - maxTokens: number; -} - -interface StreamHandlerParams { - controller: ReadableStreamDefaultController; - encoder: TextEncoder; - webhook?: { url: string, payload: unknown }; - dynamicContext?: any; + env: Env; + openai: OpenAI; + messages: any[]; + model: string; + systemPrompt: string; + preprocessedContext: any; + maxTokens: number; } const activeStreamType = types.model({ - name: types.optional(types.string, ""), - maxTokens: types.optional(types.number, 0), - systemPrompt: types.optional(types.string, ""), - model: types.optional(types.string, ""), - messages: types.optional(types.array(types.frozen()), []), - attachments: types.optional(types.array(types.frozen()), []), - tools: types.optional(types.array(types.frozen()), []), - disableWebhookGeneration: types.optional(types.boolean, false) + name: types.optional(types.string, ""), + maxTokens: types.optional(types.number, 0), + systemPrompt: types.optional(types.string, ""), + model: types.optional(types.string, ""), + messages: types.optional(types.array(types.frozen()), []), }); const activeStreamsMap = types.map( @@ -51,388 +38,266 @@ const activeStreamsMap = types.map( const ChatService = types .model('ChatService', { - openAIApiKey: types.optional(types.string, ""), - openAIBaseURL: types.optional(types.string, ""), - activeStreams: types.optional( - activeStreamsMap, - {} // Correct initialization - ), - maxTokens: types.number, - systemPrompt: types.string + openAIApiKey: types.optional(types.string, ""), + openAIBaseURL: types.optional(types.string, ""), + activeStreams: types.optional( + activeStreamsMap, + {} + ), + maxTokens: types.number, + systemPrompt: types.string }) .volatile(self => ({ - openai: {} as OpenAI, - env: {} as Env, - webhookStreamActive: false + openai: {} as OpenAI, + env: {} as Env, })) .actions(self => { - // Helper functions - const createMessageInstance = (message: any) => { - if (typeof message.content === 'string') { - return Message.create({ - role: message.role, - content: message.content, - }); - } - if (Array.isArray(message.content)) { - const m = O1Message.create({ - role: message.role, - content: message.content.map(item => ({ - type: item.type, - text: item.text - })), - }); - return m; - } - throw new Error('Unsupported message format'); - }; + // Helper functions + const createMessageInstance = (message: any) => { + if (typeof message.content === 'string') { + return Message.create({ + role: message.role, + content: message.content, + }); + } + if (Array.isArray(message.content)) { + const m = O1Message.create({ + role: message.role, + content: message.content.map(item => ({ + type: item.type, + text: item.text + })), + }); + return m; + } + throw new Error('Unsupported message format'); + }; + const createStreamParams = async ( + streamConfig: any, + dynamicContext: any, + durableObject: any + ): Promise => { + return { + env: self.env, + openai: self.openai, + messages: streamConfig.messages.map(createMessageInstance), + model: streamConfig.model, + systemPrompt: streamConfig.systemPrompt, + preprocessedContext: getSnapshot(dynamicContext), + maxTokens: await durableObject.dynamicMaxTokens( + streamConfig.messages, + 2000 + ), + } + }; - const handleAgentProcess = async ( - {controller, encoder, webhook, dynamicContext}: StreamHandlerParams - ) => { - if (!webhook) return; - dynamicContext.append("\n## Agent Results\n~~~markdown\n"); + const modelHandlers = { + openai: (params: StreamParams, dataHandler: Function) => + OpenAiChatSdk.handleOpenAiStream(params, dataHandler), + groq: (params: StreamParams, dataHandler: Function) => + GroqChatSdk.handleGroqStream(params, dataHandler), + claude: (params: StreamParams, dataHandler: Function) => + ClaudeChatSdk.handleClaudeStream(params, dataHandler), + fireworks: (params: StreamParams, dataHandler: Function) => + FireworksAiChatSdk.handleFireworksStream(params, dataHandler), + google: (params: StreamParams, dataHandler: Function) => + GoogleChatSdk.handleGoogleStream(params, dataHandler), + xai: (params: StreamParams, dataHandler: Function) => + XaiChatSdk.handleXaiStream(params, dataHandler), + cerebras: (params: StreamParams, dataHandler: Function) => + CerebrasSdk.handleCerebrasStream(params, dataHandler), + cloudflareAI: (params: StreamParams, dataHandler: Function) => + CloudflareAISdk.handleCloudflareAIStream(params, dataHandler) + }; - for await (const chunk of self.streamAgentData({webhook})) { - controller.enqueue(encoder.encode(chunk)); - dynamicContext.append(chunk); - } - - dynamicContext.append("\n~~~\n"); - ChatSdk.sendDoubleNewline(controller, encoder); - }; - - const createStreamParams = async ( - streamConfig: any, - dynamicContext: any, - durableObject: any - ): Promise => { return { - env: self.env, - openai: self.openai, - messages: streamConfig.messages.map(createMessageInstance), - model: streamConfig.model, - systemPrompt: streamConfig.systemPrompt, - preprocessedContext: getSnapshot(dynamicContext), - attachments: streamConfig.attachments ?? [], - tools: streamConfig.tools ?? [], - disableWebhookGeneration: true, - maxTokens: await durableObject.dynamicMaxTokens( - streamConfig.messages, - 2000 - ), - } - }; + setActiveStream(streamId: string, stream: any) { + const validStream = { + name: stream?.name || "Unnamed Stream", + maxTokens: stream?.maxTokens || 0, + systemPrompt: stream?.systemPrompt || "", + model: stream?.model || "", + messages: stream?.messages || [], + }; - const modelHandlers = { - openai: (params: StreamParams, dataHandler: Function) => - OpenAiChatSdk.handleOpenAiStream(params, dataHandler), - groq: (params: StreamParams, dataHandler: Function) => - GroqChatSdk.handleGroqStream(params, dataHandler), - claude: (params: StreamParams, dataHandler: Function) => - ClaudeChatSdk.handleClaudeStream(params, dataHandler), - fireworks: (params: StreamParams, dataHandler: Function) => - FireworksAiChatSdk.handleFireworksStream(params, dataHandler), - google: (params: StreamParams, dataHandler: Function) => - GoogleChatSdk.handleGoogleStream(params, dataHandler), - xai: (params: StreamParams, dataHandler: Function) => - XaiChatSdk.handleXaiStream(params, dataHandler), - cerebras: (params: StreamParams, dataHandler: Function) => - CerebrasSdk.handleCerebrasStream(params, dataHandler), - cloudflareAI: (params: StreamParams, dataHandler: Function) => - CloudflareAISdk.handleCloudflareAIStream(params, dataHandler) - }; + self.activeStreams.set(streamId, validStream); + }, - return { - setActiveStream(streamId: string, stream: any) { - const validStream = { - name: stream?.name || "Unnamed Stream", - maxTokens: stream?.maxTokens || 0, - systemPrompt: stream?.systemPrompt || "", - model: stream?.model || "", - messages: stream?.messages || [], - attachments: stream?.attachments || [], - tools: stream?.tools || [], - disableWebhookGeneration: stream?.disableWebhookGeneration || false, - }; + removeActiveStream(streamId: string) { + self.activeStreams.delete(streamId); + }, + setEnv(env: Env) { + self.env = env; + self.openai = new OpenAI({ + apiKey: self.openAIApiKey, + baseURL: self.openAIBaseURL, + }); + }, - self.activeStreams.set(streamId, validStream); - }, - - removeActiveStream(streamId: string) { - self.activeStreams.delete(streamId); - }, - setEnv(env: Env) { - self.env = env; - self.openai = new OpenAI({ - apiKey: self.openAIApiKey, - baseURL: self.openAIBaseURL, - }); - }, - - handleChatRequest: async (request: Request) => { - return ChatSdk.handleChatRequest(request, { - openai: self.openai, - env: self.env, - systemPrompt: self.systemPrompt, - maxTokens: self.maxTokens - }); - }, - - setWebhookStreamActive(value) { - self.webhookStreamActive = value; - }, - - streamAgentData: async function* ({webhook}) { - console.log("streamAgentData::start"); - if (self.webhookStreamActive) { - return - } - - const queue: string[] = []; - let resolveQueueItem: Function; - let finished = false; - let errorOccurred: Error | null = null; - - const dataPromise = () => new Promise((resolve) => { - resolveQueueItem = resolve; - }); - - let currentPromise = dataPromise(); - const eventSource = new EventSource(webhook.url.trim()); - console.log("streamAgentData::setWebhookStreamActive::true"); - self.setWebhookStreamActive(true) - try { - ChatSdk.handleAgentStream(eventSource, (data) => { - const formattedData = `data: ${JSON.stringify(data)}\n\n`; - queue.push(formattedData); - if (resolveQueueItem) resolveQueueItem(); - currentPromise = dataPromise(); - }).then(() => { - finished = true; - if (resolveQueueItem) resolveQueueItem(); - }).catch((err) => { - console.log(`chatService::streamAgentData::STREAM_ERROR::${err}`); - errorOccurred = err; - if (resolveQueueItem) resolveQueueItem(); - }); - - while (!finished || queue.length > 0) { - if (queue.length > 0) { - yield queue.shift()!; - } else if (errorOccurred) { - throw errorOccurred; - } else { - await currentPromise; - } - } - self.setWebhookStreamActive(false); - eventSource.close(); - // console.log(`chatService::streamAgentData::complete`); - } catch (error) { - console.log(`chatService::streamAgentData::error`); - eventSource.close(); - self.setWebhookStreamActive(false); - console.error("Error while streaming webhook data:", error); - throw error; - } - }, - /** - * runModelHandler - * Selects the correct model handler and invokes it. - */ - async runModelHandler(params: { - streamConfig: any; - streamParams: any; - controller: ReadableStreamDefaultController; - encoder: TextEncoder; - streamId: string; - }) { - const {streamConfig, streamParams, controller, encoder, streamId} = params; - - const modelFamily = getModelFamily(streamConfig.model); - console.log( - `chatService::handleSseStream::ReadableStream::modelFamily::${modelFamily}` - ); - - const handler = modelHandlers[modelFamily as ModelFamily]; - if (handler) { - try { - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::start`); - await handler(streamParams, handleStreamData(controller, encoder)); - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::handler::finish`); - } catch (error) { - const message = error.message.toLowerCase(); - - if(message.includes("413 ") || (message.includes("maximum") || message.includes("too long") || message.includes("too large") )) { - throw new ClientError(`Error! Content length exceeds limits. Try shortening your message, removing any attached files, or editing an earlier message instead.`, 413, {model: streamConfig.model, maxTokens: streamParams.maxTokens}) - // throw new Error(`Max tokens exceeded for model ${streamConfig.model}`) - } - if(message.includes("429 ")) { - throw new ClientError(`Error! Rate limit exceeded. Wait a few minutes before trying again.`, 429, {model: streamConfig.model, maxTokens: streamParams.maxTokens}) - // throw new Error(`Max tokens exceeded for model ${streamConfig.model}`) - } - if (message.includes("404")) { - throw new ClientError(`Something went wrong, try again.`, 413, {}) - // throw new Error(`Max tokens exceeded for model ${streamConfig.model}`) - } - throw error; - /* - '413 Request too large for model `mixtral-8x7b-32768` in organization `org_01htjxws48fm0rbbg5gnkgmbrh` service tier `on_demand` on tokens per minute (TPM): Limit 5000, Requested 49590, please reduce your message size and try again. Visit https://console.groq.com/docs/rate-limits for more information.' - */ - } - } - }, - /** - * bootstrapAgents - * Checks if an agent exists, and if so, bootstraps it. - */ - async bootstrapAgents(params: { - savedStreamConfig: string; - controller: ReadableStreamDefaultController; - encoder: TextEncoder; - dynamicContext: any; // or more specific type - }) { - const {savedStreamConfig, controller, encoder, dynamicContext} = params; - - const config = JSON.parse(savedStreamConfig); - const webhook = config?.webhooks?.[0]; - - if (webhook) { - console.log(`chatService::handleSseStream::ReadableStream::webhook:start`); - await handleAgentProcess({ - controller, - encoder, - webhook, - dynamicContext, - }); - console.log(`chatService::handleSseStream::ReadableStream::webhook::end`); - } - }, + handleChatRequest: async (request: Request) => { + return ChatSdk.handleChatRequest(request, { + openai: self.openai, + env: self.env, + systemPrompt: self.systemPrompt, + maxTokens: self.maxTokens + }); + }, - createSseReadableStream(params: { - streamId: string; - streamConfig: any; - savedStreamConfig: string; - durableObject: any; - }) { - const { streamId, streamConfig, savedStreamConfig, durableObject } = params; + async runModelHandler(params: { + streamConfig: any; + streamParams: any; + controller: ReadableStreamDefaultController; + encoder: TextEncoder; + streamId: string; + }) { + const {streamConfig, streamParams, controller, encoder, streamId} = params; - return new ReadableStream({ - async start(controller) { - console.log(`chatService::handleSseStream::ReadableStream::${streamId}::open`); - const encoder = new TextEncoder(); + const modelFamily = getModelFamily(streamConfig.model); - try { - const dynamicContext = Message.create(streamConfig.preprocessedContext); + const handler = modelHandlers[modelFamily as ModelFamily]; + if (handler) { + try { + + await handler(streamParams, handleStreamData(controller, encoder)); + + } catch (error) { + const message = error.message.toLowerCase(); + + if (message.includes("413 ") || (message.includes("maximum") || message.includes("too long") || message.includes("too large"))) { + throw new ClientError(`Error! Content length exceeds limits. Try shortening your message or editing an earlier message.`, 413, { + model: streamConfig.model, + maxTokens: streamParams.maxTokens + }) + } + if (message.includes("429 ")) { + throw new ClientError(`Error! Rate limit exceeded. Wait a few minutes before trying again.`, 429, { + model: streamConfig.model, + maxTokens: streamParams.maxTokens + }) + } + if (message.includes("404")) { + throw new ClientError(`Something went wrong, try again.`, 413, {}) + } + throw error; + } + } + }, + + createSseReadableStream(params: { + streamId: string; + streamConfig: any; + savedStreamConfig: string; + durableObject: any; + }) { + const {streamId, streamConfig, savedStreamConfig, durableObject} = params; + + return new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + + try { + const dynamicContext = Message.create(streamConfig.preprocessedContext); + + // Process the stream data using the appropriate handler + const streamParams = await createStreamParams( + streamConfig, + dynamicContext, + durableObject + ); + + try { + await self.runModelHandler({ + streamConfig, + streamParams, + controller, + encoder, + streamId, + }); + } catch (e) { + console.log("error caught at runModelHandler") + throw e; + } + + } catch (error) { + console.error(`chatService::handleSseStream::${streamId}::Error`, error); + + if (error instanceof ClientError) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({type: 'error', error: error.message})}\n\n`) + ); + } else { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ + type: 'error', + error: "Server error" + })}\n\n`) + ); + } + controller.close(); + } finally { + try { + controller.close(); + } catch (_) { + } + } + }, + }); + }, - // Process agents if configured - await self.bootstrapAgents({ - savedStreamConfig, - controller, - encoder, - dynamicContext: dynamicContext, + handleSseStream: flow(function* (streamId: string): Generator, Response, unknown> { + console.log(`chatService::handleSseStream::enter::${streamId}`); + + // Check if a stream is already active for this ID + if (self.activeStreams.has(streamId)) { + return new Response('Stream already active', {status: 409}); + } + + // Retrieve the stream configuration from the durable object + const objectId = self.env.SITE_COORDINATOR.idFromName('stream-index'); + const durableObject = self.env.SITE_COORDINATOR.get(objectId); + const savedStreamConfig = yield durableObject.getStreamData(streamId); + + if (!savedStreamConfig) { + return new Response('Stream not found', {status: 404}); + } + + const streamConfig = JSON.parse(savedStreamConfig); + + const stream = self.createSseReadableStream({ + streamId, + streamConfig, + savedStreamConfig, + durableObject, }); - // Process the stream data using the appropriate handler - const streamParams = await createStreamParams( - streamConfig, - dynamicContext, - durableObject + // Use `tee()` to create two streams: one for processing and one for the response + const [processingStream, responseStream] = stream.tee(); + + self.setActiveStream(streamId, { + ...streamConfig, + }); + + processingStream.pipeTo( + new WritableStream({ + close() { + self.removeActiveStream(streamId); + }, + }) ); - try { - await self.runModelHandler({ - streamConfig, - streamParams, - controller, - encoder, - streamId, - }); - } catch (e) { - console.log("error caught at runModelHandler") - throw e; - } - - } catch (error) { - console.error(`chatService::handleSseStream::${streamId}::Error`, error); - - if(error instanceof ClientError) { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`) - ); - } else { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ type: 'error', error: "Server error" })}\n\n`) - ); - } - controller.close(); - } finally { - try { - controller.close(); - } catch (_) {} - } - }, - }); - }, - - - handleSseStream: flow(function* (streamId: string): Generator, Response, unknown> { - console.log(`chatService::handleSseStream::enter::${streamId}`); - - // Check if a stream is already active for this ID - if (self.activeStreams.has(streamId)) { - console.log(`chatService::handleSseStream::${streamId}::[stream already active]`); - return new Response('Stream already active', { status: 409 }); - } - - // Retrieve the stream configuration from the durable object - const objectId = self.env.SITE_COORDINATOR.idFromName('stream-index'); - const durableObject = self.env.SITE_COORDINATOR.get(objectId); - const savedStreamConfig = yield durableObject.getStreamData(streamId); - - if (!savedStreamConfig) { - return new Response('Stream not found', { status: 404 }); - } - - const streamConfig = JSON.parse(savedStreamConfig); - console.log(`chatService::handleSseStream::${streamId}::[stream configured]`); - - const stream = self.createSseReadableStream({ - streamId, - streamConfig, - savedStreamConfig, - durableObject, - }); - - // Use `tee()` to create two streams: one for processing and one for the response - const [processingStream, responseStream] = stream.tee(); - - self.setActiveStream(streamId, { - ...streamConfig, - }); - - processingStream.pipeTo( - new WritableStream({ - close() { - self.removeActiveStream(streamId); - }, - }) - ); - - // Return the second stream as the response - return new Response(responseStream, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - }, - }); - }), - }; + // Return the second stream as the response + return new Response(responseStream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }); + }), + }; }); @@ -441,28 +306,28 @@ const ChatService = types * A custom construct for sending client-friendly errors via the controller in a structured and controlled manner. */ export class ClientError extends Error { - public statusCode: number; - public details: Record; + public statusCode: number; + public details: Record; - constructor(message: string, statusCode: number, details: Record = {}) { - super(message); - this.name = 'ClientError'; - this.statusCode = statusCode; - this.details = details; - Object.setPrototypeOf(this, ClientError.prototype); - } + constructor(message: string, statusCode: number, details: Record = {}) { + super(message); + this.name = 'ClientError'; + this.statusCode = statusCode; + this.details = details; + Object.setPrototypeOf(this, ClientError.prototype); + } - /** - * Formats the error for SSE-compatible data transmission. - */ - public formatForSSE(): string { - return JSON.stringify({ - type: 'error', - message: this.message, - details: this.details, - statusCode: this.statusCode, - }); - } + /** + * Formats the error for SSE-compatible data transmission. + */ + public formatForSSE(): string { + return JSON.stringify({ + type: 'error', + message: this.message, + details: this.details, + statusCode: this.statusCode, + }); + } } export default ChatService;