From f8bf65603cf68cd4a66afafce9d501e13a8abbb2 Mon Sep 17 00:00:00 2001 From: geoffsee <> Date: Tue, 27 May 2025 13:29:17 -0400 Subject: [PATCH] Refactor webhook processing to agent-focused architecture Renames functions and variables to use "agent" terminology instead of "webhook" for consistency with updated architecture. Removes unused preprocessing workflows and streamlines the ChatSdk code. This improves code clarity and aligns naming with system functionality. --- workers/site/sdk/chat-sdk.ts | 104 +++------------------------ workers/site/services/ChatService.ts | 36 +++++----- 2 files changed, 28 insertions(+), 112 deletions(-) diff --git a/workers/site/sdk/chat-sdk.ts b/workers/site/sdk/chat-sdk.ts index 4872067..6b8ce61 100644 --- a/workers/site/sdk/chat-sdk.ts +++ b/workers/site/sdk/chat-sdk.ts @@ -1,74 +1,14 @@ -import { OpenAI } from "openai"; +import {OpenAI} from "openai"; import Message from "../models/Message"; -import { executePreprocessingWorkflow } from "../workflows"; -import { MarkdownSdk } from "./markdown-sdk"; -import { AssistantSdk } from "./assistant-sdk"; -import { IMessage } from "../../../src/stores/ClientChatStore"; -import { getModelFamily } from "../../../src/components/chat/SupportedModels"; +import {AssistantSdk} from "./assistant-sdk"; +import {IMessage} from "../../../src/stores/ClientChatStore"; +import {getModelFamily} from "../../../src/components/chat/SupportedModels"; export class ChatSdk { static async preprocess({ - tools, messages, - contextContainer, - eventHost, - streamId, - openai, - env, }) { - const { latestAiMessage, latestUserMessage } = - ChatSdk.extractMessageContext(messages); - - if (tools.includes("web-search")) { - try { - const { results } = await executePreprocessingWorkflow({ - latestUserMessage, - latestAiMessage, - eventHost, - streamId, - chat: { - messages, - openai, - }, - }); - - const { webhook } = results.get("preprocessed"); - - if (webhook) { - const objectId = env.SITE_COORDINATOR.idFromName("stream-index"); - - const durableObject = env.SITE_COORDINATOR.get(objectId); - - await durableObject.saveStreamData( - streamId, - JSON.stringify({ - webhooks: [webhook], - }), - ); - - await durableObject.saveStreamData( - webhook.id, - JSON.stringify({ - parent: streamId, - url: webhook.url, - }), - ); - } - - console.log("handleOpenAiStream::workflowResults", { - webhookUrl: webhook?.url, - }); - } catch (workflowError) { - console.error( - "handleOpenAiStream::workflowError::Failed to execute workflow", - workflowError, - ); - } - return Message.create({ - role: "assistant", - content: MarkdownSdk.formatContextContainer(contextContainer), - }); - } + // a custom implementation for preprocessing would go here return Message.create({ role: "assistant", content: "", @@ -92,20 +32,10 @@ export class ChatSdk { return new Response("No messages provided", { status: 400 }); } - const contextContainer = new Map(); - const preprocessedContext = await ChatSdk.preprocess({ - tools, messages, - eventHost: ctx.env.EVENTSOURCE_HOST, - contextContainer: contextContainer, - streamId, - openai: ctx.openai, - env: ctx.env, }); - console.log({ preprocessedContext: JSON.stringify(preprocessedContext) }); - const objectId = ctx.env.SITE_COORDINATOR.idFromName("stream-index"); const durableObject = ctx.env.SITE_COORDINATOR.get(objectId); @@ -139,20 +69,6 @@ export class ChatSdk { ); } - private static extractMessageContext(messages: any[]) { - const latestUserMessageObj = [...messages] - .reverse() - .find((msg) => msg.role === "user"); - const latestAiMessageObj = [...messages] - .reverse() - .find((msg) => msg.role === "assistant"); - - return { - latestUserMessage: latestUserMessageObj?.content || "", - latestAiMessage: latestAiMessageObj?.content || "", - }; - } - static async calculateMaxTokens( messages: any[], ctx: Record & { @@ -230,18 +146,18 @@ export class ChatSdk { return messagesToSend; } - static async handleWebhookStream( + static async handleAgentStream( eventSource: EventSource, dataCallback: any, ): Promise { - console.log("sdk::handleWebhookStream::start"); + // console.log("sdk::handleWebhookStream::start"); let done = false; return new Promise((resolve, reject) => { if (!done) { - console.log("sdk::handleWebhookStream::promise::created"); + // console.log("sdk::handleWebhookStream::promise::created"); eventSource.onopen = () => { - console.log("sdk::handleWebhookStream::eventSource::open"); - console.log("Connected to webhook"); + // console.log("sdk::handleWebhookStream::eventSource::open"); + console.log("Connected to agent"); }; const parseEvent = (data) => { diff --git a/workers/site/services/ChatService.ts b/workers/site/services/ChatService.ts index 18bcf3c..742a689 100644 --- a/workers/site/services/ChatService.ts +++ b/workers/site/services/ChatService.ts @@ -89,23 +89,23 @@ const ChatService = types }; - const handleWebhookProcessing = async ( + const handleAgentProcess = async ( {controller, encoder, webhook, dynamicContext}: StreamHandlerParams ) => { - console.log("handleWebhookProcessing::start"); + console.log("handleAgentProcess::start"); if (!webhook) return; - console.log("handleWebhookProcessing::[Loading Live Search]"); + console.log("handleAgentProcess::[Loading Live Search]"); dynamicContext.append("\n## Live Search\n~~~markdown\n"); - for await (const chunk of self.streamWebhookData({webhook})) { + for await (const chunk of self.streamAgentData({webhook})) { controller.enqueue(encoder.encode(chunk)); dynamicContext.append(chunk); } dynamicContext.append("\n~~~\n"); - console.log(`handleWebhookProcessing::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`); + console.log(`handleAgentProcess::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`); ChatSdk.sendDoubleNewline(controller, encoder); - console.log("handleWebhookProcessing::exit") + console.log("handleAgentProcess::exit") }; const createStreamParams = async ( @@ -189,8 +189,8 @@ const ChatService = types self.webhookStreamActive = value; }, - streamWebhookData: async function* ({webhook}) { - console.log("streamWebhookData::start"); + streamAgentData: async function* ({webhook}) { + console.log("streamAgentData::start"); if (self.webhookStreamActive) { return } @@ -206,10 +206,10 @@ const ChatService = types let currentPromise = dataPromise(); const eventSource = new EventSource(webhook.url.trim()); - console.log("streamWebhookData::setWebhookStreamActive::true"); + console.log("streamAgentData::setWebhookStreamActive::true"); self.setWebhookStreamActive(true) try { - ChatSdk.handleWebhookStream(eventSource, (data) => { + ChatSdk.handleAgentStream(eventSource, (data) => { const formattedData = `data: ${JSON.stringify(data)}\n\n`; queue.push(formattedData); if (resolveQueueItem) resolveQueueItem(); @@ -218,7 +218,7 @@ const ChatService = types finished = true; if (resolveQueueItem) resolveQueueItem(); }).catch((err) => { - console.log(`chatService::streamWebhookData::STREAM_ERROR::${err}`); + console.log(`chatService::streamAgentData::STREAM_ERROR::${err}`); errorOccurred = err; if (resolveQueueItem) resolveQueueItem(); }); @@ -234,9 +234,9 @@ const ChatService = types } self.setWebhookStreamActive(false); eventSource.close(); - console.log(`chatService::streamWebhookData::complete`); + // console.log(`chatService::streamAgentData::complete`); } catch (error) { - console.log(`chatService::streamWebhookData::error`); + console.log(`chatService::streamAgentData::error`); eventSource.close(); self.setWebhookStreamActive(false); console.error("Error while streaming webhook data:", error); @@ -290,10 +290,10 @@ const ChatService = types } }, /** - * handleWebhookIfNeeded - * Checks if a webhook exists, and if so, processes it. + * bootstrapAgents + * Checks if an agent exists, and if so, bootstraps it. */ - async handleWebhookIfNeeded(params: { + async bootstrapAgents(params: { savedStreamConfig: string; controller: ReadableStreamDefaultController; encoder: TextEncoder; @@ -306,7 +306,7 @@ const ChatService = types if (webhook) { console.log(`chatService::handleSseStream::ReadableStream::webhook:start`); - await handleWebhookProcessing({ + await handleAgentProcess({ controller, encoder, webhook, @@ -337,7 +337,7 @@ const ChatService = types // Process webhook if configured - await self.handleWebhookIfNeeded({ + await self.bootstrapAgents({ savedStreamConfig, controller, encoder,