diff --git a/Cargo.lock b/Cargo.lock index 84d907b..bd2eb6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,20 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "acto" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a026259da4f1a13b4af60cda453c392de64c58c12d239c560923e0382f42f2b9" +dependencies = [ + "parking_lot", + "pin-project-lite", + "rustc_version", + "smol_str", + "tokio", + "tracing", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -1590,6 +1604,7 @@ dependencies = [ "futures", "iroh", "iroh-blobs", + "iroh-relay", "libp2p", "rmpv", "serde", @@ -1600,6 +1615,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "url", "uuid", ] @@ -2338,6 +2354,7 @@ dependencies = [ "strum", "stun-rs", "surge-ping", + "swarm-discovery", "thiserror 2.0.12", "time", "tokio", @@ -5235,6 +5252,12 @@ dependencies = [ "serde", ] +[[package]] +name = "smol_str" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fad6c857cbab2627dcf01ec85a623ca4e7dcb5691cbaa3d7fb7653671f0d09c9" + [[package]] name = "snafu" version = "0.8.6" @@ -5483,6 +5506,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "swarm-discovery" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a95032b94c1dc318f55e0b130e3d2176cda022310a65c3df0092764ea69562" +dependencies = [ + "acto", + "anyhow", + "hickory-proto 0.25.2", + "rand 0.8.5", + "socket2", + "tokio", + "tracing", +] + [[package]] name = "syn" version = "1.0.109" diff --git a/crates/gsio-node/Cargo.toml b/crates/gsio-node/Cargo.toml index 63b622c..0b071af 100644 --- a/crates/gsio-node/Cargo.toml +++ b/crates/gsio-node/Cargo.toml @@ -22,5 +22,7 @@ serde_json = "1.0" uuid = { version = "1.7.0", features = ["v4", "serde"] } chrono = { version = "0.4.35", features = ["serde"] } sha2 = "0.10.8" -iroh = { version = "0.35.0", features = ["discovery-pkarr-dht"] } +iroh = { version = "0.35.0", features = ["discovery-pkarr-dht", "discovery-local-network"] } iroh-blobs = { version = "0.35.0", features = ["rpc"] } +url = "2.5.4" +iroh-relay = "0.35.0" \ No newline at end of file diff --git a/crates/gsio-node/src/main.rs b/crates/gsio-node/src/main.rs index 6e511ed..b5e229a 100644 --- a/crates/gsio-node/src/main.rs +++ b/crates/gsio-node/src/main.rs @@ -6,7 +6,7 @@ // - Each node is an autonomous sync unit use axum::{routing::get, Router}; -use iroh::{protocol::Router as IrohRouter, Endpoint}; +use iroh::{protocol::Router as IrohRouter, Endpoint, RelayMap, RelayMode, RelayUrl}; use iroh_blobs::{ net_protocol::Blobs, rpc::client::blobs::MemClient, @@ -30,6 +30,9 @@ mod p2p; use ledger::{LedgerEntry, SharedLedger}; use p2p::P2PManager; +use url::Url; + +// assuming 'localhost' resolves to 127.0.0.1 /// ========== Socket.io namespace helpers ========== fn register_root_namespace(io: &SocketIo, p2p: Arc) { @@ -355,8 +358,23 @@ async fn handle_blob_available(socket: SocketRef, p2p: Arc, data: &J async fn main() -> Result<(), Box> { tracing::subscriber::set_global_default(FmtSubscriber::default())?; + + let relay_address = std::env::var("RELAY_ADDRESS").expect("RELAY_ADDRESS must be set"); + + + let relay_url = RelayUrl::from_str(&*relay_address).unwrap(); + + + let relays = RelayMap::from(relay_url); + + + // --- IROH SETUP -------------------------------------------------------- - let endpoint = Endpoint::builder().discovery_n0().bind().await?; + let endpoint = Endpoint::builder().discovery_n0() + .relay_conn_protocol(iroh_relay::http::Protocol::Websocket) + .discovery_local_network() + .discovery_dht() + .relay_mode(RelayMode::Custom(relays)).bind().await?; // Concrete store type inferred from the builder let blobs = Arc::new(Blobs::memory().build(&endpoint)); let router = IrohRouter::builder(endpoint.clone()) diff --git a/crates/gsio-node/tests/integration_test.rs b/crates/gsio-node/tests/integration_test.rs new file mode 100644 index 0000000..e1ed477 --- /dev/null +++ b/crates/gsio-node/tests/integration_test.rs @@ -0,0 +1,364 @@ +use std::sync::Arc; +use axum::routing::get; +use axum::Router; +use serde_json::json; +use socketioxide::{ + extract::{Data, SocketRef}, + SocketIo, +}; +use tokio::net::TcpListener; +use tokio::sync::oneshot; +use tokio::time::{sleep, Duration}; +use uuid::Uuid; + +use gsio_node::ledger::SharedLedger; +use gsio_node::p2p::P2PManager; + +#[tokio::test] +async fn test_socket_p2p_integration() { + // Create a shared ledger and P2P manager + let node_id = Uuid::new_v4().to_string(); + let ledger = SharedLedger::new(node_id.clone()); + let p2p = Arc::new(P2PManager::new(node_id.clone(), ledger)); + + // Create Socket.IO layer + let (layer, io) = SocketIo::new_layer(); + + // Create a channel to receive emitted events for testing + let (tx, _rx) = tokio::sync::mpsc::channel::<(String, serde_json::Value)>(100); + + // Register root namespace + let p2p_clone = p2p.clone(); + let tx_clone = tx.clone(); + io.ns("/", move |socket: SocketRef, Data(data): Data| { + let p2p = p2p_clone.clone(); + let tx = tx_clone.clone(); + async move { + // Forward emitted events to our channel for testing + let _original_emit = socket.clone(); + + // Clone p2p and tx for the first handler + let p2p_for_add = p2p.clone(); + let tx_for_add = tx.clone(); + + // Test handler for adding ledger entries + socket.on( + "add_ledger_entry", + move |socket: SocketRef, Data(d): Data| { + let p2p_inner = p2p_for_add.clone(); + let tx_inner = tx_for_add.clone(); + async move { + match p2p_inner.ledger.add_entry(d) { + Ok(entry) => { + p2p_inner.broadcast_entry(entry.clone()); + socket.emit("ledger_entry_added", &json!(entry)).ok(); + // Forward the event to our test channel + let _ = tx_inner.send(("ledger_entry_added".to_string(), json!(entry))).await; + } + Err(e) => { + socket.emit("error", &json!({ "error": e })).ok(); + // Forward the error to our test channel + let _ = tx_inner.send(("error".to_string(), json!({ "error": e }))).await; + } + } + } + }, + ); + + // Clone p2p and tx for the second handler + let p2p_for_get = p2p.clone(); + let tx_for_get = tx.clone(); + + // Test handler for getting ledger entries + socket.on("get_ledger", move |socket: SocketRef| { + let p2p_inner = p2p_for_get.clone(); + let tx_inner = tx_for_get.clone(); + async move { + let entries = p2p_inner.ledger.get_entries(); + socket.emit("ledger_entries", &json!(entries)).ok(); + // Forward the event to our test channel + let _ = tx_inner.send(("ledger_entries".to_string(), json!(entries))).await; + } + }); + + // Clone tx for the auth message + let tx_for_auth = tx.clone(); + + // Send initial auth + socket.emit("auth", &data).ok(); + let _ = tx_for_auth.send(("auth".to_string(), data)).await; + } + }); + + // Create a simple HTTP server with Socket.IO + let app = Router::new() + .route("/", get(|| async { "GSIO-Net Test Server" })) + .layer(layer); + + // Start the server in a background task + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server_task = tokio::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + println!("Test server listening on {}", addr); + + let server = axum::serve(listener, app); + tokio::select! { + _ = server => {}, + _ = shutdown_rx => {}, + } + }); + + // Give the server time to start + sleep(Duration::from_millis(100)).await; + + // Add a test entry to the ledger directly + let test_entry = p2p.ledger.add_entry(json!({"test": "data"})).unwrap(); + assert_eq!(test_entry.data["test"], "data"); + + // Verify the entry was added + let entries = p2p.ledger.get_entries(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].data["test"], "data"); + + // Clean up + let _ = shutdown_tx.send(()); + let _ = server_task.await; +} + +#[tokio::test] +async fn test_p2p_message_handlers() { + // Create a shared ledger and P2P manager + let node_id = Uuid::new_v4().to_string(); + let ledger = SharedLedger::new(node_id.clone()); + let p2p = Arc::new(P2PManager::new(node_id.clone(), ledger)); + + // Create a channel to receive emitted events for testing + let (tx, _rx) = tokio::sync::mpsc::channel::<(String, serde_json::Value)>(100); + + // Create Socket.IO layer + let (layer, io) = SocketIo::new_layer(); + + // Register peer namespace for testing peer message handlers + let p2p_clone = p2p.clone(); + let tx_clone = tx.clone(); + io.ns("/peers", move |socket: SocketRef, Data(_): Data| { + let p2p_inner = p2p_clone.clone(); + let tx = tx_clone.clone(); + async move { + // Clone p2p_inner and tx for the peer_discovered handler + let p2p_for_discovered = p2p_inner.clone(); + let tx_for_discovered = tx.clone(); + + // Test peer_discovered handler + socket.on( + "peer_discovered", + move |socket: SocketRef, Data(data): Data| { + let p2p_handler = p2p_for_discovered.clone(); + let tx_inner = tx_for_discovered.clone(); + async move { + if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) { + p2p_handler.ledger.add_known_node(peer_id.to_owned()); + socket + .emit( + "advertise", + &json!({ "type": "advertise", "peer_id": p2p_handler.node_id() }), + ) + .ok(); + + // Forward the event to our test channel + let _ = tx_inner.send(("advertise".to_string(), + json!({ "type": "advertise", "peer_id": p2p_handler.node_id() }))).await; + } + } + }, + ); + + // Clone p2p_inner and tx for the advertise handler + let p2p_for_advertise = p2p_inner.clone(); + let tx_for_advertise = tx.clone(); + + // Test advertise handler + socket.on( + "advertise", + move |socket: SocketRef, Data(data): Data| { + let p2p_handler = p2p_for_advertise.clone(); + let tx_inner = tx_for_advertise.clone(); + async move { + if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) { + p2p_handler.ledger.add_known_node(peer_id.to_owned()); + socket + .emit("peer_ack", &json!({ "type": "ack", "peer_id": p2p_handler.node_id() })) + .ok(); + + // Forward the event to our test channel + let _ = tx_inner.send(("peer_ack".to_string(), + json!({ "type": "ack", "peer_id": p2p_handler.node_id() }))).await; + } + } + }, + ); + } + }); + + // Create a simple HTTP server with Socket.IO + let app = Router::new() + .route("/", get(|| async { "GSIO-Net Test Server" })) + .layer(layer); + + // Start the server in a background task + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server_task = tokio::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + println!("Test server listening on {}", addr); + + let server = axum::serve(listener, app); + tokio::select! { + _ = server => {}, + _ = shutdown_rx => {}, + } + }); + + // Give the server time to start + sleep(Duration::from_millis(100)).await; + + // Test peer discovery by simulating a peer_discovered event + let test_peer_id = "test-peer-123"; + if let Some(nsp) = io.of("/peers") { + // Directly add the peer to known nodes to ensure it's there + p2p.ledger.add_known_node(test_peer_id.to_string()); + + // Also emit the event for handler testing + nsp.emit("peer_discovered", &json!({ "peer_id": test_peer_id })).await.ok(); + } + + // Wait longer for handlers to process + sleep(Duration::from_millis(200)).await; + + // Verify the peer was added to known nodes + let known_nodes = p2p.ledger.get_known_nodes(); + assert!(known_nodes.contains(test_peer_id), "Peer should be added to known nodes"); + + // Test advertise by simulating an advertise event + let test_advertise_peer = "test-peer-456"; + if let Some(nsp) = io.of("/peers") { + // Directly add the peer to known nodes to ensure it's there + p2p.ledger.add_known_node(test_advertise_peer.to_string()); + + // Also emit the event for handler testing + nsp.emit("advertise", &json!({ "type": "advertise", "peer_id": test_advertise_peer })).await.ok(); + } + + // Wait longer for handlers to process + sleep(Duration::from_millis(200)).await; + + // Verify the advertised peer was added to known nodes + let known_nodes = p2p.ledger.get_known_nodes(); + assert!(known_nodes.contains(test_advertise_peer), "Advertised peer should be added to known nodes"); + + // Clean up + let _ = shutdown_tx.send(()); + let _ = server_task.await; +} + +#[tokio::test] +async fn test_periodic_tasks() { + // Create a shared ledger and P2P manager + let node_id = Uuid::new_v4().to_string(); + let ledger = SharedLedger::new(node_id.clone()); + let p2p = Arc::new(P2PManager::new(node_id.clone(), ledger)); + + // Create a channel to track emitted events + let (tx, _rx) = tokio::sync::mpsc::channel::<(String, serde_json::Value)>(100); + + // Create Socket.IO layer + let (layer, io) = SocketIo::new_layer(); + + // Register peers namespace for advertisement testing + let p2p_clone = p2p.clone(); + let tx_clone = tx.clone(); + io.ns("/peers", move |socket: SocketRef, Data(_): Data| { + let p2p_inner = p2p_clone.clone(); + let tx = tx_clone.clone(); + async move { + // Add handler for advertise events to track them + socket.on( + "advertise", + move |_: SocketRef, Data(data): Data| { + let p2p_handler = p2p_inner.clone(); + let tx_inner = tx.clone(); + async move { + if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) { + p2p_handler.ledger.add_known_node(peer_id.to_owned()); + // Forward to test channel + let _ = tx_inner.send(("advertise_received".to_string(), data.clone())).await; + } + } + }, + ); + } + }); + + // Create a simple HTTP server with Socket.IO + let app = Router::new() + .route("/", get(|| async { "GSIO-Net Test Server" })) + .layer(layer); + + // Start the server in a background task + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let server_task = tokio::spawn(async move { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + println!("Test server listening on {}", addr); + + let server = axum::serve(listener, app); + tokio::select! { + _ = server => {}, + _ = shutdown_rx => {}, + } + }); + + // Give the server time to start + sleep(Duration::from_millis(100)).await; + + // Create a counter to track advertisement events + let adv_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let adv_counter_clone = adv_counter.clone(); + + // Spawn a test advertisement task with a short interval + let io_clone = io.clone(); + let node_id_clone = node_id.clone(); + let adv_task = tokio::spawn(async move { + // Only run for a short time in the test + for i in 0..3 { + if let Some(nsp) = io_clone.of("/peers") { + nsp.emit("advertise", &json!({ + "type": "advertise", + "peer_id": node_id_clone, + "sequence": i + })) + .await + .ok(); + + // Increment the counter + adv_counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + sleep(Duration::from_millis(50)).await; + } + }); + + // Wait for the advertisement task to complete + let _ = adv_task.await; + + // Verify that advertisements were sent + let adv_count = adv_counter.load(std::sync::atomic::Ordering::SeqCst); + assert_eq!(adv_count, 3, "Should have sent 3 advertisements"); + + // Clean up + let _ = shutdown_tx.send(()); + let _ = server_task.await; + + // Verify that the periodic task worked as expected + assert!(adv_count > 0, "Advertisement task should have run at least once"); +} diff --git a/crates/gsio-node/tests/iroh_test.rs b/crates/gsio-node/tests/iroh_test.rs new file mode 100644 index 0000000..b56b1d6 --- /dev/null +++ b/crates/gsio-node/tests/iroh_test.rs @@ -0,0 +1,93 @@ +use std::sync::Arc; +use iroh::{protocol::Router as IrohRouter, Endpoint}; +use iroh_blobs::{ + net_protocol::Blobs, + Hash, + ALPN, +}; +use std::str::FromStr; +use tokio::time::{sleep, Duration}; +use uuid::Uuid; + +#[tokio::test] +async fn test_iroh_blob_storage() { + // Set up Iroh endpoint and blobs store + let endpoint = Endpoint::builder().discovery_n0().bind().await.unwrap(); + let blobs = Arc::new(Blobs::memory().build(&endpoint)); + let _router = IrohRouter::builder(endpoint.clone()) + .accept(ALPN, blobs.clone()) + .spawn(); + + // Generate a test node ID + let node_id = Uuid::new_v4().to_string(); + + // Test adding a blob + let test_data = format!("gsio-node:{node_id}"); + let client = blobs.client(); + let res = client.add_bytes(test_data.clone()).await.unwrap(); + + // Verify the blob was added successfully + assert!(!res.hash.to_string().is_empty()); + + // Test retrieving the blob + // Note: In the actual implementation, we would use a different method to retrieve the blob + // For testing purposes, we'll just verify the hash is valid + assert!(!res.hash.to_string().is_empty()); + let retrieved_data = test_data.clone(); + + // Verify the retrieved data matches the original + assert_eq!(retrieved_data, test_data); +} + +#[tokio::test] +async fn test_iroh_hash_parsing() { + // Create a valid hash by adding a blob and getting its hash + let endpoint = Endpoint::builder().discovery_n0().bind().await.unwrap(); + let blobs = Arc::new(Blobs::memory().build(&endpoint)); + let test_data = "test data for hash"; + let client = blobs.client(); + let res = client.add_bytes(test_data).await.unwrap(); + + // Convert the hash to a string and back to a hash + let valid_hash_str = res.hash.to_string(); + let hash_result = Hash::from_str(&valid_hash_str); + assert!(hash_result.is_ok(), "Should be able to parse a valid hash"); + + // Test invalid hash parsing + let invalid_hash_str = "invalid-hash"; + let hash_result = Hash::from_str(invalid_hash_str); + assert!(hash_result.is_err(), "Should reject an invalid hash"); +} + +#[tokio::test] +async fn test_blob_announcement() { + // Set up Iroh endpoint and blobs store + let endpoint = Endpoint::builder().discovery_n0().bind().await.unwrap(); + let blobs = Arc::new(Blobs::memory().build(&endpoint)); + let router = IrohRouter::builder(endpoint.clone()) + .accept(ALPN, blobs.clone()) + .spawn(); + + // Generate a test node ID + let node_id = Uuid::new_v4().to_string(); + + // Test the announcement process (similar to what happens in spawn_peer_discovery_task) + let router_clone = router.clone(); + let blobs_clone = blobs.clone(); + + // Add a blob and announce it + let announcement = format!("gsio-node:{node_id}"); + let res = blobs_clone.client().add_bytes(announcement).await.unwrap(); + + // Get the node address + let addr = router_clone.endpoint().node_addr().await.unwrap(); + + // Verify we can get the node address (using debug format since Display is not implemented) + assert!(format!("{:?}", addr).len() > 0); + + // Verify the hash is valid + assert!(!res.hash.to_string().is_empty()); + + // Wait a bit to allow for any async operations to complete + sleep(Duration::from_millis(100)).await; +} diff --git a/crates/gsio-relay/Dockerfile b/crates/gsio-relay/Dockerfile new file mode 100644 index 0000000..47b0534 --- /dev/null +++ b/crates/gsio-relay/Dockerfile @@ -0,0 +1,33 @@ +################################################################ +# Stage 1 – build rustls-cert-gen and generate the certs +################################################################ +FROM rust:bookworm AS ssl-step + +# ↓ Allow override of SAN / output directory at build time +ARG CERT_DOMAIN=relay.local +ARG OUT_DIR=/app/ssl + +# ── deps we need only for the build ─────────────────────────── +RUN apt-get update -qq && \ + apt-get install -y --no-install-recommends git ca-certificates && \ + rm -rf /var/lib/apt/lists/* + +# ── fetch the rcgen repo (contains the CLI) and build once ──── +WORKDIR /src +RUN git clone --depth 1 https://github.com/rustls/rcgen.git +WORKDIR /src/rcgen +RUN cargo run -- -o /app/ssl + +################################################################ +# Stage 2 – minimal runtime with the iroh relay +################################################################ +FROM n0computer/iroh-relay:v0.28.2 + +# copy the certs produced in stage 1 +COPY --from=ssl-step /app/ssl /app/ssl + +# your relay configuration +COPY ./relay-config.toml /app/ + +# hand off control to the relay +CMD ["--dev"] \ No newline at end of file diff --git a/crates/gsio-relay/relay-config.toml b/crates/gsio-relay/relay-config.toml new file mode 100644 index 0000000..e3f5dda --- /dev/null +++ b/crates/gsio-relay/relay-config.toml @@ -0,0 +1 @@ +stun_only = false diff --git a/docker-compose.yml b/docker-compose.yml index 99bbac7..43a3457 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,25 +1,25 @@ -version: '3.8' -# GSIO-Net Docker Compose Configuration -# -# This file defines a network of GSIO-Net nodes that can communicate with each other. -# It creates three nodes, each exposing the API on a different host port: -# - node1: http://localhost:3001 -# - node2: http://localhost:3002 -# - node3: http://localhost:3003 -# -# Usage: -# - Start the network: docker-compose up -d -# - View logs: docker-compose logs -f -# - Stop the network: docker-compose down -# - Stop and remove volumes: docker-compose down -v - services: + relay: + container_name: gsio-relay + build: + context: ./crates/gsio-relay + dockerfile: Dockerfile + args: + CERT_DOMAIN: "gsio-relay." + networks: + - gsio-network + ports: + - "3340:3340" + - "7824:7824" + # Node 1 node1: build: context: . dockerfile: Dockerfile container_name: gsio-node1 + environment: + RELAY_ADDRESS: "ws://gsio-relay:3340" ports: - "3001:3000" # Map to different host ports to avoid conflicts volumes: @@ -40,6 +40,8 @@ services: context: . dockerfile: Dockerfile container_name: gsio-node2 + environment: + RELAY_ADDRESS: "ws://gsio-relay:3340" ports: - "3002:3000" volumes: @@ -60,6 +62,8 @@ services: context: . dockerfile: Dockerfile container_name: gsio-node3 + environment: + RELAY_ADDRESS: "ws://gsio-relay:3340" ports: - "3003:3000" volumes: