From afd539a245de0aa695fd0bacaf5751fa6ea6a901 Mon Sep 17 00:00:00 2001 From: geoffsee <> Date: Tue, 27 May 2025 13:34:01 -0400 Subject: [PATCH] Remove preprocessing workflow and related operators This commit removes the preprocessing workflow, its operators, intent service, and associated functionality. Additionally, redundant logging and unnecessary comments have been cleaned up in the ChatService for better readability and maintainability. --- workers/site/services/ChatService.ts | 20 +--- workers/site/workflows/IntentService.ts | 64 ---------- workers/site/workflows/index.ts | 1 - .../createPreprocessingWorkflow.ts | 49 -------- .../executePreprocessingWorkflow.ts | 54 --------- .../workflows/preprocessing/newsOperator.ts | 101 ---------------- .../workflows/preprocessing/scrapeOperator.ts | 112 ------------------ .../workflows/preprocessing/webOperator.ts | 100 ---------------- 8 files changed, 4 insertions(+), 497 deletions(-) delete mode 100644 workers/site/workflows/IntentService.ts delete mode 100644 workers/site/workflows/index.ts delete mode 100644 workers/site/workflows/preprocessing/createPreprocessingWorkflow.ts delete mode 100644 workers/site/workflows/preprocessing/executePreprocessingWorkflow.ts delete mode 100644 workers/site/workflows/preprocessing/newsOperator.ts delete mode 100644 workers/site/workflows/preprocessing/scrapeOperator.ts delete mode 100644 workers/site/workflows/preprocessing/webOperator.ts diff --git a/workers/site/services/ChatService.ts b/workers/site/services/ChatService.ts index 742a689..eb7bd8c 100644 --- a/workers/site/services/ChatService.ts +++ b/workers/site/services/ChatService.ts @@ -14,7 +14,6 @@ import {XaiChatSdk} from "../sdk/models/xai"; import {CerebrasSdk} from "../sdk/models/cerebras"; import {CloudflareAISdk} from "../sdk/models/cloudflareAi"; -// Types export interface StreamParams { env: Env; openai: OpenAI; @@ -92,10 +91,8 @@ const ChatService = types const handleAgentProcess = async ( {controller, encoder, webhook, dynamicContext}: StreamHandlerParams ) => { - console.log("handleAgentProcess::start"); if (!webhook) return; - console.log("handleAgentProcess::[Loading Live Search]"); - dynamicContext.append("\n## Live Search\n~~~markdown\n"); + dynamicContext.append("\n## Agent Results\n~~~markdown\n"); for await (const chunk of self.streamAgentData({webhook})) { controller.enqueue(encoder.encode(chunk)); @@ -103,9 +100,7 @@ const ChatService = types } dynamicContext.append("\n~~~\n"); - console.log(`handleAgentProcess::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`); ChatSdk.sendDoubleNewline(controller, encoder); - console.log("handleAgentProcess::exit") }; const createStreamParams = async ( @@ -331,12 +326,10 @@ const ChatService = types const encoder = new TextEncoder(); try { - // Send initial retry directive - // controller.enqueue(encoder.encode('retry: 0\n\n')); const dynamicContext = Message.create(streamConfig.preprocessedContext); - // Process webhook if configured + // Process agents if configured await self.bootstrapAgents({ savedStreamConfig, controller, @@ -408,7 +401,6 @@ const ChatService = types const streamConfig = JSON.parse(savedStreamConfig); console.log(`chatService::handleSseStream::${streamId}::[stream configured]`); - // Create the SSE readable stream const stream = self.createSseReadableStream({ streamId, streamConfig, @@ -419,17 +411,14 @@ const ChatService = types // Use `tee()` to create two streams: one for processing and one for the response const [processingStream, responseStream] = stream.tee(); - // Add the new stream to activeStreams self.setActiveStream(streamId, { - ...streamConfig, // Ensure streamConfig matches the expected structure + ...streamConfig, }); - // Process the stream for internal logic processingStream.pipeTo( new WritableStream({ close() { - console.log(`chatService::handleSseStream::${streamId}::[stream closed]`); - self.removeActiveStream(streamId); // Use action to update state + self.removeActiveStream(streamId); }, }) ); @@ -443,7 +432,6 @@ const ChatService = types }, }); }), - }; }); diff --git a/workers/site/workflows/IntentService.ts b/workers/site/workflows/IntentService.ts deleted file mode 100644 index 93438b8..0000000 --- a/workers/site/workflows/IntentService.ts +++ /dev/null @@ -1,64 +0,0 @@ -import type { MessageType } from "../models/Message"; -import OpenAI from "openai"; -import { z } from "zod"; -import { zodResponseFormat } from "openai/helpers/zod"; - -const IntentSchema = z.object({ - action: z.enum(["web-search", "news-search", "web-scrape", ""]), - confidence: z.number(), -}); - -export class SimpleSearchIntentService { - constructor( - private client: OpenAI, - private messages: MessageType[], - ) {} - - async query(prompt: string, confidenceThreshold = 0.9) { - console.log({ confidenceThreshold }); - - const systemMessage = { - role: "system", - content: `Model intent as JSON: -{ - "action": "", - "confidence": "" -} - -- Context from another conversation. -- confidence is a decimal between 0 and 1 representing similarity of the context to the identified action -- Intent reflects user's or required action. -- Use "" for unknown/ambiguous intent. - -Analyze context and output JSON.`.trim(), - }; - - const conversation = this.messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: prompt }); - - const completion = await this.client.beta.chat.completions.parse({ - model: "gpt-4o", - messages: JSON.parse(JSON.stringify([systemMessage, ...conversation])), - temperature: 0, - response_format: zodResponseFormat(IntentSchema, "intent"), - }); - - const { action, confidence } = completion.choices[0].message.parsed; - - console.log({ action, confidence }); - - return confidence >= confidenceThreshold - ? { action, confidence } - : { action: "unknown", confidence }; - } -} - -export function createIntentService(chat: { - messages: MessageType[]; - openai: OpenAI; -}) { - return new SimpleSearchIntentService(chat.openai, chat.messages); -} diff --git a/workers/site/workflows/index.ts b/workers/site/workflows/index.ts deleted file mode 100644 index 71d4785..0000000 --- a/workers/site/workflows/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "./preprocessing/executePreprocessingWorkflow"; diff --git a/workers/site/workflows/preprocessing/createPreprocessingWorkflow.ts b/workers/site/workflows/preprocessing/createPreprocessingWorkflow.ts deleted file mode 100644 index 985bbbc..0000000 --- a/workers/site/workflows/preprocessing/createPreprocessingWorkflow.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { - ManifoldRegion, - WorkflowFunctionManifold, -} from "manifold-workflow-engine"; -import { createIntentService } from "../IntentService"; -import { createSearchWebhookOperator } from "./webOperator"; -import { createNewsWebhookOperator } from "./newsOperator"; -import { createScrapeWebhookOperator } from "./scrapeOperator"; - -export const createPreprocessingWorkflow = ({ - eventHost, - initialState, - streamId, - chat: { messages, openai }, -}) => { - const preprocessingManifold = new WorkflowFunctionManifold( - createIntentService({ messages, openai }), - ); - preprocessingManifold.state = { ...initialState }; - - const searchWebhookOperator = createSearchWebhookOperator({ - eventHost, - streamId, - openai, - messages, - }); - const newsWebhookOperator = createNewsWebhookOperator({ - eventHost, - streamId, - openai, - messages, - }); - const scrapeWebhookOperator = createScrapeWebhookOperator({ - eventHost, - streamId, - openai, - messages, - }); - - const preprocessingRegion = new ManifoldRegion("preprocessingRegion", [ - searchWebhookOperator, - newsWebhookOperator, - scrapeWebhookOperator, - ]); - - preprocessingManifold.addRegion(preprocessingRegion); - - return preprocessingManifold; -}; diff --git a/workers/site/workflows/preprocessing/executePreprocessingWorkflow.ts b/workers/site/workflows/preprocessing/executePreprocessingWorkflow.ts deleted file mode 100644 index 83693c8..0000000 --- a/workers/site/workflows/preprocessing/executePreprocessingWorkflow.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { createPreprocessingWorkflow } from "./createPreprocessingWorkflow"; - -export async function executePreprocessingWorkflow({ - latestUserMessage, - latestAiMessage, - eventHost, - streamId, - chat: { messages, openai }, -}) { - console.log(`Executing executePreprocessingWorkflow`); - const initialState = { latestUserMessage, latestAiMessage }; - - // Add execution tracking flag to prevent duplicate runs - const executionKey = `preprocessing-${crypto.randomUUID()}`; - if (globalThis[executionKey]) { - console.log("Preventing duplicate preprocessing workflow execution"); - return globalThis[executionKey]; - } - - const workflows = { - preprocessing: createPreprocessingWorkflow({ - eventHost, - initialState, - streamId, - chat: { messages, openai }, - }), - results: new Map(), - }; - - try { - // Store the promise to prevent parallel executions - globalThis[executionKey] = (async () => { - await workflows.preprocessing.navigate(latestUserMessage); - await workflows.preprocessing.executeWorkflow(latestUserMessage); - console.log( - `executePreprocessingWorkflow::workflow::preprocessing::results`, - { state: JSON.stringify(workflows.preprocessing.state, null, 2) }, - ); - workflows.results.set("preprocessed", workflows.preprocessing.state); - - // Cleanup after execution - setTimeout(() => { - delete globalThis[executionKey]; - }, 1000); - - return workflows; - })(); - - return await globalThis[executionKey]; - } catch (error) { - delete globalThis[executionKey]; - throw new Error("Workflow execution failed"); - } -} diff --git a/workers/site/workflows/preprocessing/newsOperator.ts b/workers/site/workflows/preprocessing/newsOperator.ts deleted file mode 100644 index 80bbecd..0000000 --- a/workers/site/workflows/preprocessing/newsOperator.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { WorkflowOperator } from "manifold-workflow-engine"; -import { zodResponseFormat } from "openai/helpers/zod"; -import { z } from "zod"; - -const QuerySchema = z.object({ - query: z.string(), -}); - -export function createNewsWebhookOperator({ - eventHost, - streamId, - openai, - messages, -}) { - return new WorkflowOperator("news-search", async (state: any) => { - const { latestUserMessage } = state; - console.log(`Processing user message: ${latestUserMessage}`); - - const resource = "news-search"; - const input = await getQueryFromContext({ - openai, - messages, - latestUserMessage, - }); - - const eventSource = new URL(eventHost); - const url = `${eventSource}api/webhooks`; - console.log({ url }); - - const stream = { - id: crypto.randomUUID(), - parent: streamId, - resource, - payload: input, - }; - const createStreamResponse = await fetch(`${eventSource}api/webhooks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - id: stream.id, - parent: streamId, - resource: "news-search", - payload: { - input, - }, - }), - }); - const raw = await createStreamResponse.text(); - const { stream_url } = JSON.parse(raw); - const surl = eventHost + stream_url; - const webhook = { url: surl, id: stream.id, resource }; - - return { - ...state, - webhook, - latestUserMessage: "", - latestAiMessage: "", - }; - }); - - async function getQueryFromContext({ messages, openai, latestUserMessage }) { - const systemMessage = { - role: "system", - content: `Analyze the latest message in a conversation and generate a JSON object with a single implied question for a news search. The JSON should be structured as follows: - -{ - "query": "" -} - -## Example -{ - "query": "When was the last Buffalo Sabres hockey game?" -} - -Focus on the most recent message to determine the query. Output only the JSON object without any additional text.`, - }; - - const conversation = messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: `${latestUserMessage}` }); - - const m = [systemMessage, ...conversation]; - - const completion = await openai.beta.chat.completions.parse({ - model: "gpt-4o-mini", - messages: m, - temperature: 0, - response_format: zodResponseFormat(QuerySchema, "query"), - }); - - const { query } = completion.choices[0].message.parsed; - - console.log({ newsWebhookQuery: query }); - - return query; - } -} diff --git a/workers/site/workflows/preprocessing/scrapeOperator.ts b/workers/site/workflows/preprocessing/scrapeOperator.ts deleted file mode 100644 index 19a5314..0000000 --- a/workers/site/workflows/preprocessing/scrapeOperator.ts +++ /dev/null @@ -1,112 +0,0 @@ -import { WorkflowOperator } from "manifold-workflow-engine"; -import { zodResponseFormat } from "openai/helpers/zod"; -import { z } from "zod"; - -const UrlActionSchema = z.object({ - url: z.string(), - query: z.string(), - action: z.enum(["read", "scrape", "crawl", ""]), -}); - -export function createScrapeWebhookOperator({ - eventHost, - streamId, - openai, - messages, -}) { - return new WorkflowOperator("web-scrape", async (state: any) => { - const { latestUserMessage } = state; - - const webscrapeWebhookEndpoint = "/api/webhooks"; - - const resource = "web-scrape"; - const context = await getQueryFromContext({ - openai, - messages, - latestUserMessage, - }); - - const input = { - url: context?.url, - action: context?.action, - query: context.query, - }; - - const eventSource = new URL(eventHost); - const url = `${eventSource}api/webhooks`; - - const stream = { - id: crypto.randomUUID(), - parent: streamId, - resource, - payload: input, - }; - const createStreamResponse = await fetch(`${eventSource}api/webhooks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - id: stream.id, - parent: streamId, - resource: "web-scrape", - payload: { - input, - }, - }), - }); - const raw = await createStreamResponse.text(); - const { stream_url } = JSON.parse(raw); - const surl = eventHost + stream_url; - const webhook = { url: surl, id: stream.id, resource }; - - return { - ...state, - webhook, - latestUserMessage: "", - latestAiMessage: "", - }; - }); -} - -async function getQueryFromContext({ messages, openai, latestUserMessage }) { - const systemMessage = { - role: "system" as const, - content: - `You are modeling a structured output containing a single question, a URL, and an action, all relative to a single input. - -Return the result as a JSON object in the following structure: -{ - "url": "Full URL in the conversation that references the URL being interacted with. No trailing slash!", - "query": "Implied question about the resources at the URL.", - "action": "read | scrape | crawl" -} - -- The input being modeled is conversational data from a different conversation than this one. -- Intent should represent a next likely action the system might take to satisfy or enhance the user's request. - -Instructions: -1. Analyze the provided context and declare the url, action, and question implied by the latest message. - -Output the JSON object. Do not include any additional explanations or text.`.trim(), - }; - - const conversation = messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: `${latestUserMessage}` }); - - const m = [systemMessage, ...conversation]; - - const completion = await openai.beta.chat.completions.parse({ - model: "gpt-4o-mini", - messages: m, - temperature: 0, - response_format: zodResponseFormat(UrlActionSchema, "UrlActionSchema"), - }); - - const { query, action, url } = completion.choices[0].message.parsed; - - return { query, action, url }; -} diff --git a/workers/site/workflows/preprocessing/webOperator.ts b/workers/site/workflows/preprocessing/webOperator.ts deleted file mode 100644 index a2b4b6c..0000000 --- a/workers/site/workflows/preprocessing/webOperator.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { WorkflowOperator } from "manifold-workflow-engine"; -import { zodResponseFormat } from "openai/helpers/zod"; -import { z } from "zod"; - -const QuerySchema = z.object({ - query: z.string(), // No min/max constraints in the schema -}); - -export function createSearchWebhookOperator({ - eventHost, - streamId, - openai, - messages, -}) { - return new WorkflowOperator("web-search", async (state: any) => { - const { latestUserMessage } = state; - - const websearchWebhookEndpoint = "/api/webhooks"; - - const resource = "web-search"; - const input = await getQueryFromContext({ - openai, - messages, - latestUserMessage, - }); - - // process webhooks - const eventSource = new URL(eventHost); - const url = `${eventSource}api/webhooks`; - - const stream = { - id: crypto.randomUUID(), - parent: streamId, - resource, - payload: input, - }; - const createStreamResponse = await fetch(`${eventSource}api/webhooks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - id: stream.id, - parent: streamId, - resource: "web-search", - payload: { - input, - }, - }), - }); - const raw = await createStreamResponse.text(); - const { stream_url } = JSON.parse(raw); - const surl = eventHost + stream_url; - const webhook = { url: surl, id: stream.id, resource }; - - return { - ...state, - webhook, - latestUserMessage: "", // unset to break out of loop - latestAiMessage: "", // unset to break out of loop - }; - }); -} - -async function getQueryFromContext({ messages, openai, latestUserMessage }) { - const systemMessage = { - role: "system", - content: `Analyze the latest message in the conversation and generate a JSON object with a single implied question for a web search. The JSON should be structured as follows: - -{ - "query": "the question that needs a web search" -} - -## Example -{ - "query": "What was the score of the last Buffalo Sabres hockey game?" -} - -Focus on the most recent message to determine the query. Output only the JSON object without any additional text.`, - }; - - const conversation = messages.map((m) => ({ - role: m.role, - content: m.content, - })); - conversation.push({ role: "user", content: `${latestUserMessage}` }); - - const m = [systemMessage, ...conversation]; - - const completion = await openai.beta.chat.completions.parse({ - model: "gpt-4o-mini", - messages: m, - temperature: 0, - response_format: zodResponseFormat(QuerySchema, "query"), - }); - - const { query } = completion.choices[0].message.parsed; - - return query; -}