diff --git a/src/agents/image_generator.rs b/src/agents/image_generator.rs index 07bcec5..7705d90 100644 --- a/src/agents/image_generator.rs +++ b/src/agents/image_generator.rs @@ -1,7 +1,7 @@ use crate::utils::utils::run_agent; use tokio::process::Child; -pub async fn image_generator(stream_id: &str, input: &str) -> Result { +pub async fn agent(stream_id: &str, input: &str) -> Result { tracing::debug!( "Running image generator, \ninput: {}", input diff --git a/src/agents/mod.rs b/src/agents/mod.rs index 207298e..3cf8f91 100644 --- a/src/agents/mod.rs +++ b/src/agents/mod.rs @@ -1,4 +1,4 @@ -pub mod news; -pub mod scrape; -pub mod search; -pub mod image_generator; \ No newline at end of file +pub(crate) mod news; +pub(crate) mod scrape; +pub(crate) mod search; +pub(crate) mod image_generator; \ No newline at end of file diff --git a/src/agents/news.rs b/src/agents/news.rs index bc8370b..5986d57 100644 --- a/src/agents/news.rs +++ b/src/agents/news.rs @@ -1,6 +1,6 @@ use crate::utils::utils::run_agent; use tokio::process::Child; -pub async fn news_agent(stream_id: &str, input: &str) -> Result { +pub async fn agent(stream_id: &str, input: &str) -> Result { run_agent(stream_id, input, "./packages/genaiscript/genaisrc/news-search.genai.mts", 10).await } diff --git a/src/agents/scrape.rs b/src/agents/scrape.rs index 73e1f27..f22792e 100644 --- a/src/agents/scrape.rs +++ b/src/agents/scrape.rs @@ -1,6 +1,6 @@ use crate::utils::utils::run_agent; use tokio::process::Child; -pub async fn scrape_agent(stream_id: &str, input: &str) -> Result { +pub async fn agent(stream_id: &str, input: &str) -> Result { run_agent(stream_id, input, "./packages/genaiscript/genaisrc/web-scrape.genai.mts", 10).await } diff --git a/src/agents/search.rs b/src/agents/search.rs index 8a0bb25..ab45d13 100644 --- a/src/agents/search.rs +++ b/src/agents/search.rs @@ -3,7 +3,7 @@ use tracing; use crate::utils::utils::run_agent; -pub async fn search_agent(stream_id: &str, input: &str) -> Result { +pub async fn agent(stream_id: &str, input: &str) -> Result { run_agent(stream_id, input, "./packages/genaiscript/genaisrc/web-search.genai.mts", 10).await } @@ -11,13 +11,13 @@ pub async fn search_agent(stream_id: &str, input: &str) -> Result #[cfg(test)] mod tests { use std::fmt::Debug; - use crate::agents::search::search_agent; + use crate::agents::search::agent; #[tokio::test] async fn test_search_execution() { let input = "Who won the 2024 presidential election?"; - let mut command = search_agent("test-stream", input).await.unwrap(); + let mut command = agent("test-stream", input).await.unwrap(); // command.stdout.take().unwrap().read_to_string(&mut String::new()).await.unwrap(); // Optionally, you can capture and inspect stdout if needed: diff --git a/src/handlers/webhooks.rs b/src/handlers/webhooks.rs index c7e0776..5c4aba0 100644 --- a/src/handlers/webhooks.rs +++ b/src/handlers/webhooks.rs @@ -1,7 +1,3 @@ -use crate::agents; -use crate::agents::news::news_agent; -use crate::agents::scrape::scrape_agent; -use crate::agents::search::search_agent; use axum::response::Response; use axum::{ body::Body, extract::Path, extract::Query, http::StatusCode, response::IntoResponse, Json, @@ -18,7 +14,6 @@ use std::time::Duration; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::Mutex; -use crate::agents::image_generator::image_generator; // init sled lazy_static! { @@ -31,7 +26,6 @@ pub async fn handle_webhooks(Path(agent_id): Path) -> impl IntoResponse let db = DB.lock().await; match db.get(&agent_id) { Ok(Some(data)) => { - let mut info: StreamInfo = match serde_json::from_slice(&data) { Ok(info) => info, Err(e) => { @@ -40,7 +34,6 @@ pub async fn handle_webhooks(Path(agent_id): Path) -> impl IntoResponse } }; - // Increment the call_count in the database info.call_count += 1; let updated_info_bytes = match serde_json::to_vec(&info) { @@ -54,7 +47,10 @@ pub async fn handle_webhooks(Path(agent_id): Path) -> impl IntoResponse match db.insert(&agent_id, updated_info_bytes) { Ok(_) => { if let Err(e) = db.flush_async().await { - tracing::error!("Failed to persist updated call_count to the database: {}", e); + tracing::error!( + "Failed to persist updated call_count to the database: {}", + e + ); return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } } @@ -82,7 +78,7 @@ pub async fn handle_webhooks(Path(agent_id): Path) -> impl IntoResponse } }; - if(info.call_count > 1) { + if (info.call_count > 1) { return StatusCode::OK.into_response(); } @@ -94,12 +90,14 @@ pub async fn handle_webhooks(Path(agent_id): Path) -> impl IntoResponse resource, agent_id ); - + let cmd = match resource.as_str() { - "web-search" => search_agent(agent_id.as_str(), &*input).await, - "news-search" => news_agent(agent_id.as_str(), &*input).await, - "image-generator" => image_generator(agent_id.as_str(), &*input).await, - "web-scrape" => scrape_agent(agent_id.as_str(), &*input).await, + "web-search" => crate::agents::search::agent(agent_id.as_str(), &*input).await, + "news-search" => crate::agents::news::agent(agent_id.as_str(), &*input).await, + "image-generator" => { + crate::agents::image_generator::agent(agent_id.as_str(), &*input).await + } + "web-scrape" => crate::agents::scrape::agent(agent_id.as_str(), &*input).await, _ => { tracing::error!("Unsupported resource type: {}", resource); return StatusCode::BAD_REQUEST.into_response(); @@ -131,7 +129,7 @@ pub async fn handle_webhooks(Path(agent_id): Path) -> impl IntoResponse .header("Connection", "keep-alive") .header("X-Accel-Buffering", "yes") .body(Body::from_stream(sse_stream)) - .unwrap() + .unwrap(); } Ok(None) => { tracing::error!("Stream ID not found: {}", agent_id); @@ -183,7 +181,6 @@ struct StreamInfo { call_count: i32, } - #[derive(Deserialize, Serialize, Debug)] pub struct WebhookPostRequest { id: String, @@ -207,7 +204,7 @@ pub async fn handle_webhooks_post(Json(payload): Json) -> im resource: payload.resource.clone(), payload: payload.payload, parent: payload.parent.clone(), - call_count: 0 + call_count: 0, }; let info_bytes = match serde_json::to_vec(&info) { @@ -232,19 +229,22 @@ pub async fn handle_webhooks_post(Json(payload): Json) -> im match db.get(&stream_id) { Ok(Some(_)) => { let stream_url = format!("/webhooks/{}", stream_id); - tracing::info!("Successfully created and verified stream URL: {}", stream_url); + tracing::info!( + "Successfully created and verified stream URL: {}", + stream_url + ); Json(WebhookPostResponse { stream_url }).into_response() - }, + } Ok(None) => { tracing::error!("Failed to verify stream creation: {}", stream_id); StatusCode::INTERNAL_SERVER_ERROR.into_response() - }, + } Err(e) => { tracing::error!("Error verifying stream creation: {}", e); StatusCode::INTERNAL_SERVER_ERROR.into_response() } } - }, + } Err(e) => { tracing::error!("Failed to flush DB: {}", e); StatusCode::INTERNAL_SERVER_ERROR.into_response()