diff --git a/workers/site/services/ChatService.ts b/workers/site/services/ChatService.ts index 742a689..eb7bd8c 100644 --- a/workers/site/services/ChatService.ts +++ b/workers/site/services/ChatService.ts @@ -14,7 +14,6 @@ import {XaiChatSdk} from "../sdk/models/xai"; import {CerebrasSdk} from "../sdk/models/cerebras"; import {CloudflareAISdk} from "../sdk/models/cloudflareAi"; -// Types export interface StreamParams { env: Env; openai: OpenAI; @@ -92,10 +91,8 @@ const ChatService = types const handleAgentProcess = async ( {controller, encoder, webhook, dynamicContext}: StreamHandlerParams ) => { - console.log("handleAgentProcess::start"); if (!webhook) return; - console.log("handleAgentProcess::[Loading Live Search]"); - dynamicContext.append("\n## Live Search\n~~~markdown\n"); + dynamicContext.append("\n## Agent Results\n~~~markdown\n"); for await (const chunk of self.streamAgentData({webhook})) { controller.enqueue(encoder.encode(chunk)); @@ -103,9 +100,7 @@ const ChatService = types } dynamicContext.append("\n~~~\n"); - console.log(`handleAgentProcess::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`); ChatSdk.sendDoubleNewline(controller, encoder); - console.log("handleAgentProcess::exit") }; const createStreamParams = async ( @@ -331,12 +326,10 @@ const ChatService = types const encoder = new TextEncoder(); try { - // Send initial retry directive - // controller.enqueue(encoder.encode('retry: 0\n\n')); const dynamicContext = Message.create(streamConfig.preprocessedContext); - // Process webhook if configured + // Process agents if configured await self.bootstrapAgents({ savedStreamConfig, controller, @@ -408,7 +401,6 @@ const ChatService = types const streamConfig = JSON.parse(savedStreamConfig); console.log(`chatService::handleSseStream::${streamId}::[stream configured]`); - // Create the SSE readable stream const stream = self.createSseReadableStream({ streamId, streamConfig, @@ -419,17 +411,14 @@ const ChatService = types // Use `tee()` to create two streams: one for processing and one for the response const [processingStream, responseStream] = stream.tee(); - // Add the new stream to activeStreams self.setActiveStream(streamId, { - ...streamConfig, // Ensure streamConfig matches the expected structure + ...streamConfig, }); - // Process the stream for internal logic processingStream.pipeTo( new WritableStream({ close() { - console.log(`chatService::handleSseStream::${streamId}::[stream closed]`); - self.removeActiveStream(streamId); // Use action to update state + self.removeActiveStream(streamId); }, }) ); @@ -443,7 +432,6 @@ const ChatService = types }, }); }), - }; }); diff --git a/workers/site/workflows/IntentService.ts b/workers/site/workflows/IntentService.ts deleted file mode 100644 index 93438b8..0000000 --- a/workers/site/workflows/IntentService.ts +++ /dev/null @@ -1,64 +0,0 @@ -import type { MessageType } from "../models/Message"; -import OpenAI from "openai"; -import { z } from "zod"; -import { zodResponseFormat } from "openai/helpers/zod"; - -const IntentSchema = z.object({ - action: z.enum(["web-search", "news-search", "web-scrape", ""]), - confidence: z.number(), -}); - -export class SimpleSearchIntentService { - constructor( - private client: OpenAI, - private messages: MessageType[], - ) {} - - async query(prompt: string, confidenceThreshold = 0.9) { - console.log({ confidenceThreshold }); - - const systemMessage = { - role: "system", - content: `Model intent as JSON: -{ - "action": "", - "confidence": "" -} - -- Context from another conversation. -- confidence is a decimal between 0 and 1 representing similarity of the context to the identified action -- Intent reflects user's or required action. -- Use "" for unknown/ambiguous intent. - -Analyze context and output JSON.`.trim(), - }; - - const conversation = this.messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: prompt }); - - const completion = await this.client.beta.chat.completions.parse({ - model: "gpt-4o", - messages: JSON.parse(JSON.stringify([systemMessage, ...conversation])), - temperature: 0, - response_format: zodResponseFormat(IntentSchema, "intent"), - }); - - const { action, confidence } = completion.choices[0].message.parsed; - - console.log({ action, confidence }); - - return confidence >= confidenceThreshold - ? { action, confidence } - : { action: "unknown", confidence }; - } -} - -export function createIntentService(chat: { - messages: MessageType[]; - openai: OpenAI; -}) { - return new SimpleSearchIntentService(chat.openai, chat.messages); -} diff --git a/workers/site/workflows/index.ts b/workers/site/workflows/index.ts deleted file mode 100644 index 71d4785..0000000 --- a/workers/site/workflows/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "./preprocessing/executePreprocessingWorkflow"; diff --git a/workers/site/workflows/preprocessing/createPreprocessingWorkflow.ts b/workers/site/workflows/preprocessing/createPreprocessingWorkflow.ts deleted file mode 100644 index 985bbbc..0000000 --- a/workers/site/workflows/preprocessing/createPreprocessingWorkflow.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { - ManifoldRegion, - WorkflowFunctionManifold, -} from "manifold-workflow-engine"; -import { createIntentService } from "../IntentService"; -import { createSearchWebhookOperator } from "./webOperator"; -import { createNewsWebhookOperator } from "./newsOperator"; -import { createScrapeWebhookOperator } from "./scrapeOperator"; - -export const createPreprocessingWorkflow = ({ - eventHost, - initialState, - streamId, - chat: { messages, openai }, -}) => { - const preprocessingManifold = new WorkflowFunctionManifold( - createIntentService({ messages, openai }), - ); - preprocessingManifold.state = { ...initialState }; - - const searchWebhookOperator = createSearchWebhookOperator({ - eventHost, - streamId, - openai, - messages, - }); - const newsWebhookOperator = createNewsWebhookOperator({ - eventHost, - streamId, - openai, - messages, - }); - const scrapeWebhookOperator = createScrapeWebhookOperator({ - eventHost, - streamId, - openai, - messages, - }); - - const preprocessingRegion = new ManifoldRegion("preprocessingRegion", [ - searchWebhookOperator, - newsWebhookOperator, - scrapeWebhookOperator, - ]); - - preprocessingManifold.addRegion(preprocessingRegion); - - return preprocessingManifold; -}; diff --git a/workers/site/workflows/preprocessing/executePreprocessingWorkflow.ts b/workers/site/workflows/preprocessing/executePreprocessingWorkflow.ts deleted file mode 100644 index 83693c8..0000000 --- a/workers/site/workflows/preprocessing/executePreprocessingWorkflow.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { createPreprocessingWorkflow } from "./createPreprocessingWorkflow"; - -export async function executePreprocessingWorkflow({ - latestUserMessage, - latestAiMessage, - eventHost, - streamId, - chat: { messages, openai }, -}) { - console.log(`Executing executePreprocessingWorkflow`); - const initialState = { latestUserMessage, latestAiMessage }; - - // Add execution tracking flag to prevent duplicate runs - const executionKey = `preprocessing-${crypto.randomUUID()}`; - if (globalThis[executionKey]) { - console.log("Preventing duplicate preprocessing workflow execution"); - return globalThis[executionKey]; - } - - const workflows = { - preprocessing: createPreprocessingWorkflow({ - eventHost, - initialState, - streamId, - chat: { messages, openai }, - }), - results: new Map(), - }; - - try { - // Store the promise to prevent parallel executions - globalThis[executionKey] = (async () => { - await workflows.preprocessing.navigate(latestUserMessage); - await workflows.preprocessing.executeWorkflow(latestUserMessage); - console.log( - `executePreprocessingWorkflow::workflow::preprocessing::results`, - { state: JSON.stringify(workflows.preprocessing.state, null, 2) }, - ); - workflows.results.set("preprocessed", workflows.preprocessing.state); - - // Cleanup after execution - setTimeout(() => { - delete globalThis[executionKey]; - }, 1000); - - return workflows; - })(); - - return await globalThis[executionKey]; - } catch (error) { - delete globalThis[executionKey]; - throw new Error("Workflow execution failed"); - } -} diff --git a/workers/site/workflows/preprocessing/newsOperator.ts b/workers/site/workflows/preprocessing/newsOperator.ts deleted file mode 100644 index 80bbecd..0000000 --- a/workers/site/workflows/preprocessing/newsOperator.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { WorkflowOperator } from "manifold-workflow-engine"; -import { zodResponseFormat } from "openai/helpers/zod"; -import { z } from "zod"; - -const QuerySchema = z.object({ - query: z.string(), -}); - -export function createNewsWebhookOperator({ - eventHost, - streamId, - openai, - messages, -}) { - return new WorkflowOperator("news-search", async (state: any) => { - const { latestUserMessage } = state; - console.log(`Processing user message: ${latestUserMessage}`); - - const resource = "news-search"; - const input = await getQueryFromContext({ - openai, - messages, - latestUserMessage, - }); - - const eventSource = new URL(eventHost); - const url = `${eventSource}api/webhooks`; - console.log({ url }); - - const stream = { - id: crypto.randomUUID(), - parent: streamId, - resource, - payload: input, - }; - const createStreamResponse = await fetch(`${eventSource}api/webhooks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - id: stream.id, - parent: streamId, - resource: "news-search", - payload: { - input, - }, - }), - }); - const raw = await createStreamResponse.text(); - const { stream_url } = JSON.parse(raw); - const surl = eventHost + stream_url; - const webhook = { url: surl, id: stream.id, resource }; - - return { - ...state, - webhook, - latestUserMessage: "", - latestAiMessage: "", - }; - }); - - async function getQueryFromContext({ messages, openai, latestUserMessage }) { - const systemMessage = { - role: "system", - content: `Analyze the latest message in a conversation and generate a JSON object with a single implied question for a news search. The JSON should be structured as follows: - -{ - "query": "" -} - -## Example -{ - "query": "When was the last Buffalo Sabres hockey game?" -} - -Focus on the most recent message to determine the query. Output only the JSON object without any additional text.`, - }; - - const conversation = messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: `${latestUserMessage}` }); - - const m = [systemMessage, ...conversation]; - - const completion = await openai.beta.chat.completions.parse({ - model: "gpt-4o-mini", - messages: m, - temperature: 0, - response_format: zodResponseFormat(QuerySchema, "query"), - }); - - const { query } = completion.choices[0].message.parsed; - - console.log({ newsWebhookQuery: query }); - - return query; - } -} diff --git a/workers/site/workflows/preprocessing/scrapeOperator.ts b/workers/site/workflows/preprocessing/scrapeOperator.ts deleted file mode 100644 index 19a5314..0000000 --- a/workers/site/workflows/preprocessing/scrapeOperator.ts +++ /dev/null @@ -1,112 +0,0 @@ -import { WorkflowOperator } from "manifold-workflow-engine"; -import { zodResponseFormat } from "openai/helpers/zod"; -import { z } from "zod"; - -const UrlActionSchema = z.object({ - url: z.string(), - query: z.string(), - action: z.enum(["read", "scrape", "crawl", ""]), -}); - -export function createScrapeWebhookOperator({ - eventHost, - streamId, - openai, - messages, -}) { - return new WorkflowOperator("web-scrape", async (state: any) => { - const { latestUserMessage } = state; - - const webscrapeWebhookEndpoint = "/api/webhooks"; - - const resource = "web-scrape"; - const context = await getQueryFromContext({ - openai, - messages, - latestUserMessage, - }); - - const input = { - url: context?.url, - action: context?.action, - query: context.query, - }; - - const eventSource = new URL(eventHost); - const url = `${eventSource}api/webhooks`; - - const stream = { - id: crypto.randomUUID(), - parent: streamId, - resource, - payload: input, - }; - const createStreamResponse = await fetch(`${eventSource}api/webhooks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - id: stream.id, - parent: streamId, - resource: "web-scrape", - payload: { - input, - }, - }), - }); - const raw = await createStreamResponse.text(); - const { stream_url } = JSON.parse(raw); - const surl = eventHost + stream_url; - const webhook = { url: surl, id: stream.id, resource }; - - return { - ...state, - webhook, - latestUserMessage: "", - latestAiMessage: "", - }; - }); -} - -async function getQueryFromContext({ messages, openai, latestUserMessage }) { - const systemMessage = { - role: "system" as const, - content: - `You are modeling a structured output containing a single question, a URL, and an action, all relative to a single input. - -Return the result as a JSON object in the following structure: -{ - "url": "Full URL in the conversation that references the URL being interacted with. No trailing slash!", - "query": "Implied question about the resources at the URL.", - "action": "read | scrape | crawl" -} - -- The input being modeled is conversational data from a different conversation than this one. -- Intent should represent a next likely action the system might take to satisfy or enhance the user's request. - -Instructions: -1. Analyze the provided context and declare the url, action, and question implied by the latest message. - -Output the JSON object. Do not include any additional explanations or text.`.trim(), - }; - - const conversation = messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: `${latestUserMessage}` }); - - const m = [systemMessage, ...conversation]; - - const completion = await openai.beta.chat.completions.parse({ - model: "gpt-4o-mini", - messages: m, - temperature: 0, - response_format: zodResponseFormat(UrlActionSchema, "UrlActionSchema"), - }); - - const { query, action, url } = completion.choices[0].message.parsed; - - return { query, action, url }; -} diff --git a/workers/site/workflows/preprocessing/webOperator.ts b/workers/site/workflows/preprocessing/webOperator.ts deleted file mode 100644 index a2b4b6c..0000000 --- a/workers/site/workflows/preprocessing/webOperator.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { WorkflowOperator } from "manifold-workflow-engine"; -import { zodResponseFormat } from "openai/helpers/zod"; -import { z } from "zod"; - -const QuerySchema = z.object({ - query: z.string(), // No min/max constraints in the schema -}); - -export function createSearchWebhookOperator({ - eventHost, - streamId, - openai, - messages, -}) { - return new WorkflowOperator("web-search", async (state: any) => { - const { latestUserMessage } = state; - - const websearchWebhookEndpoint = "/api/webhooks"; - - const resource = "web-search"; - const input = await getQueryFromContext({ - openai, - messages, - latestUserMessage, - }); - - // process webhooks - const eventSource = new URL(eventHost); - const url = `${eventSource}api/webhooks`; - - const stream = { - id: crypto.randomUUID(), - parent: streamId, - resource, - payload: input, - }; - const createStreamResponse = await fetch(`${eventSource}api/webhooks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - id: stream.id, - parent: streamId, - resource: "web-search", - payload: { - input, - }, - }), - }); - const raw = await createStreamResponse.text(); - const { stream_url } = JSON.parse(raw); - const surl = eventHost + stream_url; - const webhook = { url: surl, id: stream.id, resource }; - - return { - ...state, - webhook, - latestUserMessage: "", // unset to break out of loop - latestAiMessage: "", // unset to break out of loop - }; - }); -} - -async function getQueryFromContext({ messages, openai, latestUserMessage }) { - const systemMessage = { - role: "system", - content: `Analyze the latest message in the conversation and generate a JSON object with a single implied question for a web search. The JSON should be structured as follows: - -{ - "query": "the question that needs a web search" -} - -## Example -{ - "query": "What was the score of the last Buffalo Sabres hockey game?" -} - -Focus on the most recent message to determine the query. Output only the JSON object without any additional text.`, - }; - - const conversation = messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: `${latestUserMessage}` }); - - const m = [systemMessage, ...conversation]; - - const completion = await openai.beta.chat.completions.parse({ - model: "gpt-4o-mini", - messages: m, - temperature: 0, - response_format: zodResponseFormat(QuerySchema, "query"), - }); - - const { query } = completion.choices[0].message.parsed; - - return query; -}