Remove preprocessing workflow and related operators

This commit removes the preprocessing workflow, its operators, intent service, and associated functionality. Additionally, redundant logging and unnecessary comments have been cleaned up in the ChatService for better readability and maintainability.
This commit is contained in:
geoffsee
2025-05-27 13:34:01 -04:00
committed by Geoff Seemueller
parent f8bf65603c
commit afd539a245
8 changed files with 4 additions and 497 deletions

View File

@@ -14,7 +14,6 @@ import {XaiChatSdk} from "../sdk/models/xai";
import {CerebrasSdk} from "../sdk/models/cerebras"; import {CerebrasSdk} from "../sdk/models/cerebras";
import {CloudflareAISdk} from "../sdk/models/cloudflareAi"; import {CloudflareAISdk} from "../sdk/models/cloudflareAi";
// Types
export interface StreamParams { export interface StreamParams {
env: Env; env: Env;
openai: OpenAI; openai: OpenAI;
@@ -92,10 +91,8 @@ const ChatService = types
const handleAgentProcess = async ( const handleAgentProcess = async (
{controller, encoder, webhook, dynamicContext}: StreamHandlerParams {controller, encoder, webhook, dynamicContext}: StreamHandlerParams
) => { ) => {
console.log("handleAgentProcess::start");
if (!webhook) return; if (!webhook) return;
console.log("handleAgentProcess::[Loading Live Search]"); dynamicContext.append("\n## Agent Results\n~~~markdown\n");
dynamicContext.append("\n## Live Search\n~~~markdown\n");
for await (const chunk of self.streamAgentData({webhook})) { for await (const chunk of self.streamAgentData({webhook})) {
controller.enqueue(encoder.encode(chunk)); controller.enqueue(encoder.encode(chunk));
@@ -103,9 +100,7 @@ const ChatService = types
} }
dynamicContext.append("\n~~~\n"); dynamicContext.append("\n~~~\n");
console.log(`handleAgentProcess::[Finished loading Live Search!][length: ${dynamicContext.content.length}]`);
ChatSdk.sendDoubleNewline(controller, encoder); ChatSdk.sendDoubleNewline(controller, encoder);
console.log("handleAgentProcess::exit")
}; };
const createStreamParams = async ( const createStreamParams = async (
@@ -331,12 +326,10 @@ const ChatService = types
const encoder = new TextEncoder(); const encoder = new TextEncoder();
try { try {
// Send initial retry directive
// controller.enqueue(encoder.encode('retry: 0\n\n'));
const dynamicContext = Message.create(streamConfig.preprocessedContext); const dynamicContext = Message.create(streamConfig.preprocessedContext);
// Process webhook if configured // Process agents if configured
await self.bootstrapAgents({ await self.bootstrapAgents({
savedStreamConfig, savedStreamConfig,
controller, controller,
@@ -408,7 +401,6 @@ const ChatService = types
const streamConfig = JSON.parse(savedStreamConfig); const streamConfig = JSON.parse(savedStreamConfig);
console.log(`chatService::handleSseStream::${streamId}::[stream configured]`); console.log(`chatService::handleSseStream::${streamId}::[stream configured]`);
// Create the SSE readable stream
const stream = self.createSseReadableStream({ const stream = self.createSseReadableStream({
streamId, streamId,
streamConfig, streamConfig,
@@ -419,17 +411,14 @@ const ChatService = types
// Use `tee()` to create two streams: one for processing and one for the response // Use `tee()` to create two streams: one for processing and one for the response
const [processingStream, responseStream] = stream.tee(); const [processingStream, responseStream] = stream.tee();
// Add the new stream to activeStreams
self.setActiveStream(streamId, { self.setActiveStream(streamId, {
...streamConfig, // Ensure streamConfig matches the expected structure ...streamConfig,
}); });
// Process the stream for internal logic
processingStream.pipeTo( processingStream.pipeTo(
new WritableStream({ new WritableStream({
close() { close() {
console.log(`chatService::handleSseStream::${streamId}::[stream closed]`); self.removeActiveStream(streamId);
self.removeActiveStream(streamId); // Use action to update state
}, },
}) })
); );
@@ -443,7 +432,6 @@ const ChatService = types
}, },
}); });
}), }),
}; };
}); });

View File

@@ -1,64 +0,0 @@
import type { MessageType } from "../models/Message";
import OpenAI from "openai";
import { z } from "zod";
import { zodResponseFormat } from "openai/helpers/zod";
const IntentSchema = z.object({
action: z.enum(["web-search", "news-search", "web-scrape", ""]),
confidence: z.number(),
});
export class SimpleSearchIntentService {
constructor(
private client: OpenAI,
private messages: MessageType[],
) {}
async query(prompt: string, confidenceThreshold = 0.9) {
console.log({ confidenceThreshold });
const systemMessage = {
role: "system",
content: `Model intent as JSON:
{
"action": "",
"confidence": ""
}
- Context from another conversation.
- confidence is a decimal between 0 and 1 representing similarity of the context to the identified action
- Intent reflects user's or required action.
- Use "" for unknown/ambiguous intent.
Analyze context and output JSON.`.trim(),
};
const conversation = this.messages.map((m) => ({
role: m.role,
content: m.content,
}));
conversation.push({ role: "user", content: prompt });
const completion = await this.client.beta.chat.completions.parse({
model: "gpt-4o",
messages: JSON.parse(JSON.stringify([systemMessage, ...conversation])),
temperature: 0,
response_format: zodResponseFormat(IntentSchema, "intent"),
});
const { action, confidence } = completion.choices[0].message.parsed;
console.log({ action, confidence });
return confidence >= confidenceThreshold
? { action, confidence }
: { action: "unknown", confidence };
}
}
export function createIntentService(chat: {
messages: MessageType[];
openai: OpenAI;
}) {
return new SimpleSearchIntentService(chat.openai, chat.messages);
}

View File

@@ -1 +0,0 @@
export * from "./preprocessing/executePreprocessingWorkflow";

View File

@@ -1,49 +0,0 @@
import {
ManifoldRegion,
WorkflowFunctionManifold,
} from "manifold-workflow-engine";
import { createIntentService } from "../IntentService";
import { createSearchWebhookOperator } from "./webOperator";
import { createNewsWebhookOperator } from "./newsOperator";
import { createScrapeWebhookOperator } from "./scrapeOperator";
export const createPreprocessingWorkflow = ({
eventHost,
initialState,
streamId,
chat: { messages, openai },
}) => {
const preprocessingManifold = new WorkflowFunctionManifold(
createIntentService({ messages, openai }),
);
preprocessingManifold.state = { ...initialState };
const searchWebhookOperator = createSearchWebhookOperator({
eventHost,
streamId,
openai,
messages,
});
const newsWebhookOperator = createNewsWebhookOperator({
eventHost,
streamId,
openai,
messages,
});
const scrapeWebhookOperator = createScrapeWebhookOperator({
eventHost,
streamId,
openai,
messages,
});
const preprocessingRegion = new ManifoldRegion("preprocessingRegion", [
searchWebhookOperator,
newsWebhookOperator,
scrapeWebhookOperator,
]);
preprocessingManifold.addRegion(preprocessingRegion);
return preprocessingManifold;
};

View File

@@ -1,54 +0,0 @@
import { createPreprocessingWorkflow } from "./createPreprocessingWorkflow";
export async function executePreprocessingWorkflow({
latestUserMessage,
latestAiMessage,
eventHost,
streamId,
chat: { messages, openai },
}) {
console.log(`Executing executePreprocessingWorkflow`);
const initialState = { latestUserMessage, latestAiMessage };
// Add execution tracking flag to prevent duplicate runs
const executionKey = `preprocessing-${crypto.randomUUID()}`;
if (globalThis[executionKey]) {
console.log("Preventing duplicate preprocessing workflow execution");
return globalThis[executionKey];
}
const workflows = {
preprocessing: createPreprocessingWorkflow({
eventHost,
initialState,
streamId,
chat: { messages, openai },
}),
results: new Map(),
};
try {
// Store the promise to prevent parallel executions
globalThis[executionKey] = (async () => {
await workflows.preprocessing.navigate(latestUserMessage);
await workflows.preprocessing.executeWorkflow(latestUserMessage);
console.log(
`executePreprocessingWorkflow::workflow::preprocessing::results`,
{ state: JSON.stringify(workflows.preprocessing.state, null, 2) },
);
workflows.results.set("preprocessed", workflows.preprocessing.state);
// Cleanup after execution
setTimeout(() => {
delete globalThis[executionKey];
}, 1000);
return workflows;
})();
return await globalThis[executionKey];
} catch (error) {
delete globalThis[executionKey];
throw new Error("Workflow execution failed");
}
}

View File

@@ -1,101 +0,0 @@
import { WorkflowOperator } from "manifold-workflow-engine";
import { zodResponseFormat } from "openai/helpers/zod";
import { z } from "zod";
const QuerySchema = z.object({
query: z.string(),
});
export function createNewsWebhookOperator({
eventHost,
streamId,
openai,
messages,
}) {
return new WorkflowOperator("news-search", async (state: any) => {
const { latestUserMessage } = state;
console.log(`Processing user message: ${latestUserMessage}`);
const resource = "news-search";
const input = await getQueryFromContext({
openai,
messages,
latestUserMessage,
});
const eventSource = new URL(eventHost);
const url = `${eventSource}api/webhooks`;
console.log({ url });
const stream = {
id: crypto.randomUUID(),
parent: streamId,
resource,
payload: input,
};
const createStreamResponse = await fetch(`${eventSource}api/webhooks`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
id: stream.id,
parent: streamId,
resource: "news-search",
payload: {
input,
},
}),
});
const raw = await createStreamResponse.text();
const { stream_url } = JSON.parse(raw);
const surl = eventHost + stream_url;
const webhook = { url: surl, id: stream.id, resource };
return {
...state,
webhook,
latestUserMessage: "",
latestAiMessage: "",
};
});
async function getQueryFromContext({ messages, openai, latestUserMessage }) {
const systemMessage = {
role: "system",
content: `Analyze the latest message in a conversation and generate a JSON object with a single implied question for a news search. The JSON should be structured as follows:
{
"query": "<question to be answered by a news search>"
}
## Example
{
"query": "When was the last Buffalo Sabres hockey game?"
}
Focus on the most recent message to determine the query. Output only the JSON object without any additional text.`,
};
const conversation = messages.map((m) => ({
role: m.role,
content: m.content,
}));
conversation.push({ role: "user", content: `${latestUserMessage}` });
const m = [systemMessage, ...conversation];
const completion = await openai.beta.chat.completions.parse({
model: "gpt-4o-mini",
messages: m,
temperature: 0,
response_format: zodResponseFormat(QuerySchema, "query"),
});
const { query } = completion.choices[0].message.parsed;
console.log({ newsWebhookQuery: query });
return query;
}
}

View File

@@ -1,112 +0,0 @@
import { WorkflowOperator } from "manifold-workflow-engine";
import { zodResponseFormat } from "openai/helpers/zod";
import { z } from "zod";
const UrlActionSchema = z.object({
url: z.string(),
query: z.string(),
action: z.enum(["read", "scrape", "crawl", ""]),
});
export function createScrapeWebhookOperator({
eventHost,
streamId,
openai,
messages,
}) {
return new WorkflowOperator("web-scrape", async (state: any) => {
const { latestUserMessage } = state;
const webscrapeWebhookEndpoint = "/api/webhooks";
const resource = "web-scrape";
const context = await getQueryFromContext({
openai,
messages,
latestUserMessage,
});
const input = {
url: context?.url,
action: context?.action,
query: context.query,
};
const eventSource = new URL(eventHost);
const url = `${eventSource}api/webhooks`;
const stream = {
id: crypto.randomUUID(),
parent: streamId,
resource,
payload: input,
};
const createStreamResponse = await fetch(`${eventSource}api/webhooks`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
id: stream.id,
parent: streamId,
resource: "web-scrape",
payload: {
input,
},
}),
});
const raw = await createStreamResponse.text();
const { stream_url } = JSON.parse(raw);
const surl = eventHost + stream_url;
const webhook = { url: surl, id: stream.id, resource };
return {
...state,
webhook,
latestUserMessage: "",
latestAiMessage: "",
};
});
}
async function getQueryFromContext({ messages, openai, latestUserMessage }) {
const systemMessage = {
role: "system" as const,
content:
`You are modeling a structured output containing a single question, a URL, and an action, all relative to a single input.
Return the result as a JSON object in the following structure:
{
"url": "Full URL in the conversation that references the URL being interacted with. No trailing slash!",
"query": "Implied question about the resources at the URL.",
"action": "read | scrape | crawl"
}
- The input being modeled is conversational data from a different conversation than this one.
- Intent should represent a next likely action the system might take to satisfy or enhance the user's request.
Instructions:
1. Analyze the provided context and declare the url, action, and question implied by the latest message.
Output the JSON object. Do not include any additional explanations or text.`.trim(),
};
const conversation = messages.map((m) => ({
role: m.role,
content: m.content,
}));
conversation.push({ role: "user", content: `${latestUserMessage}` });
const m = [systemMessage, ...conversation];
const completion = await openai.beta.chat.completions.parse({
model: "gpt-4o-mini",
messages: m,
temperature: 0,
response_format: zodResponseFormat(UrlActionSchema, "UrlActionSchema"),
});
const { query, action, url } = completion.choices[0].message.parsed;
return { query, action, url };
}

View File

@@ -1,100 +0,0 @@
import { WorkflowOperator } from "manifold-workflow-engine";
import { zodResponseFormat } from "openai/helpers/zod";
import { z } from "zod";
const QuerySchema = z.object({
query: z.string(), // No min/max constraints in the schema
});
export function createSearchWebhookOperator({
eventHost,
streamId,
openai,
messages,
}) {
return new WorkflowOperator("web-search", async (state: any) => {
const { latestUserMessage } = state;
const websearchWebhookEndpoint = "/api/webhooks";
const resource = "web-search";
const input = await getQueryFromContext({
openai,
messages,
latestUserMessage,
});
// process webhooks
const eventSource = new URL(eventHost);
const url = `${eventSource}api/webhooks`;
const stream = {
id: crypto.randomUUID(),
parent: streamId,
resource,
payload: input,
};
const createStreamResponse = await fetch(`${eventSource}api/webhooks`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
id: stream.id,
parent: streamId,
resource: "web-search",
payload: {
input,
},
}),
});
const raw = await createStreamResponse.text();
const { stream_url } = JSON.parse(raw);
const surl = eventHost + stream_url;
const webhook = { url: surl, id: stream.id, resource };
return {
...state,
webhook,
latestUserMessage: "", // unset to break out of loop
latestAiMessage: "", // unset to break out of loop
};
});
}
async function getQueryFromContext({ messages, openai, latestUserMessage }) {
const systemMessage = {
role: "system",
content: `Analyze the latest message in the conversation and generate a JSON object with a single implied question for a web search. The JSON should be structured as follows:
{
"query": "the question that needs a web search"
}
## Example
{
"query": "What was the score of the last Buffalo Sabres hockey game?"
}
Focus on the most recent message to determine the query. Output only the JSON object without any additional text.`,
};
const conversation = messages.map((m) => ({
role: m.role,
content: m.content,
}));
conversation.push({ role: "user", content: `${latestUserMessage}` });
const m = [systemMessage, ...conversation];
const completion = await openai.beta.chat.completions.parse({
model: "gpt-4o-mini",
messages: m,
temperature: 0,
response_format: zodResponseFormat(QuerySchema, "query"),
});
const { query } = completion.choices[0].message.parsed;
return query;
}