Refactor gsio-node
: streamline namespace registration, periodic tasks, and connection handlers.
- Created smaller units for testing
This commit is contained in:
@@ -1,29 +1,29 @@
|
|||||||
// GSIO-Node: A distributed ledger node that uses both iroh and socketioxide
|
// GSIO-Node: a distributed-ledger node that interleaves iroh (discovery + blobs)
|
||||||
// for peer-to-peer communication and synchronization.
|
// and socketioxide (direct messaging) for fully-decentralized sync.
|
||||||
//
|
//
|
||||||
// This implementation interleaves iroh and socketioxide to make each node an independent
|
// - Iroh handles peer discovery and blob storage
|
||||||
// unit capable of synchronizing with new peers:
|
// - Socketioxide handles live peer-to-peer messaging
|
||||||
// - Iroh is used for peer discovery and blob storage
|
// - Each node is an autonomous sync unit
|
||||||
// - Socketioxide is used for direct communication between peers
|
|
||||||
// - Each node can discover new peers through iroh's discovery mechanisms
|
|
||||||
// - Nodes can share ledger entries and synchronize their state
|
|
||||||
// - Blob storage is handled by iroh for efficient data transfer
|
|
||||||
|
|
||||||
use axum::routing::get;
|
use axum::{routing::get, Router};
|
||||||
|
use iroh::{protocol::Router as IrohRouter, Endpoint};
|
||||||
|
use iroh_blobs::{
|
||||||
|
net_protocol::Blobs,
|
||||||
|
rpc::client::blobs::MemClient,
|
||||||
|
store::Store,
|
||||||
|
ticket::BlobTicket,
|
||||||
|
Hash, ALPN,
|
||||||
|
};
|
||||||
use serde_json::{json, Value as JsonValue};
|
use serde_json::{json, Value as JsonValue};
|
||||||
use socketioxide::{
|
use socketioxide::{
|
||||||
extract::{AckSender, Data, SocketRef},
|
extract::{AckSender, Data, SocketRef},
|
||||||
SocketIo,
|
SocketIo,
|
||||||
};
|
};
|
||||||
use std::sync::Arc;
|
use std::{str::FromStr, sync::Arc};
|
||||||
|
use tokio::net::TcpListener;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
use tracing_subscriber::FmtSubscriber;
|
use tracing_subscriber::FmtSubscriber;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use iroh::{protocol::Router, Endpoint};
|
|
||||||
use iroh_blobs::{store::Store, net_protocol::Blobs, Hash};
|
|
||||||
use std::str::FromStr;
|
|
||||||
use iroh_blobs::rpc::client::blobs::MemClient;
|
|
||||||
use iroh_blobs::ticket::BlobTicket;
|
|
||||||
|
|
||||||
mod ledger;
|
mod ledger;
|
||||||
mod p2p;
|
mod p2p;
|
||||||
@@ -31,454 +31,354 @@ mod p2p;
|
|||||||
use ledger::{LedgerEntry, SharedLedger};
|
use ledger::{LedgerEntry, SharedLedger};
|
||||||
use p2p::P2PManager;
|
use p2p::P2PManager;
|
||||||
|
|
||||||
// Handle regular client connections
|
/// ========== Socket.io namespace helpers ==========
|
||||||
async fn on_connect(socket: SocketRef, Data(data): Data<JsonValue>, p2p_manager: Arc<P2PManager>) {
|
fn register_root_namespace(io: &SocketIo, p2p: Arc<P2PManager>) {
|
||||||
|
let p2p_clone = p2p.clone();
|
||||||
|
io.ns("/", move |s, d| on_connect(s, d, p2p_clone.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_p2p_namespace(io: &SocketIo, p2p: Arc<P2PManager>) {
|
||||||
|
let p2p_clone = p2p.clone();
|
||||||
|
io.ns("/p2p", move |s, d| on_p2p_connect(s, d, p2p_clone.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_peer_namespace<S>(io: &SocketIo, p2p: Arc<P2PManager>, blobs: Arc<Blobs<S>>)
|
||||||
|
where
|
||||||
|
S: Store + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let p2p_clone = p2p.clone();
|
||||||
|
let blobs_arc = blobs.clone();
|
||||||
|
io.ns("/peers", async move |s, d| {
|
||||||
|
let blobs_client = blobs_arc.client();
|
||||||
|
on_peer_message(s, d, p2p_clone.clone(), &blobs_client.clone()).await.to_owned()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ========== Periodic tasks ==========
|
||||||
|
fn spawn_advertisement_task(io: SocketIo, node_id: String) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
if let Some(nsp) = io.of("/peers") {
|
||||||
|
nsp.emit("advertise", &json!({ "type": "advertise", "peer_id": node_id }))
|
||||||
|
.await
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_peer_discovery_task<S>(
|
||||||
|
endpoint: Endpoint,
|
||||||
|
router: IrohRouter,
|
||||||
|
io: SocketIo,
|
||||||
|
node_id: String,
|
||||||
|
blobs: Arc<Blobs<S>>,
|
||||||
|
) where
|
||||||
|
S: Store + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Stub channel for future custom hooks
|
||||||
|
let (_, mut _rx) = tokio::sync::mpsc::channel::<Vec<u8>>(100);
|
||||||
|
|
||||||
|
// Periodically announce presence
|
||||||
|
let router_clone = router.clone();
|
||||||
|
let blobs_clone = blobs.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let announcement = format!("gsio-node:{node_id}");
|
||||||
|
let res = blobs_clone.client().add_bytes(announcement).await.unwrap();
|
||||||
|
|
||||||
|
let addr = router_clone.endpoint().node_addr().await.unwrap();
|
||||||
|
let ticket = BlobTicket::new(addr, res.hash, res.format).unwrap();
|
||||||
|
|
||||||
|
info!("serving hash: {}", ticket.hash());
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Additional discovery-message processing could be added here
|
||||||
|
drop((_rx, endpoint)); // Silence unused warnings for now
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ========== Socket connection handlers ==========
|
||||||
|
async fn on_connect(socket: SocketRef, Data(data): Data<JsonValue>, p2p: Arc<P2PManager>) {
|
||||||
info!(ns = socket.ns(), ?socket.id, "Socket.IO client connected");
|
info!(ns = socket.ns(), ?socket.id, "Socket.IO client connected");
|
||||||
socket.emit("auth", &data).ok();
|
socket.emit("auth", &data).ok();
|
||||||
|
register_basic_handlers(&socket);
|
||||||
|
register_ledger_handlers(&socket, p2p).await;
|
||||||
|
}
|
||||||
|
|
||||||
// Set up basic message handlers
|
fn register_basic_handlers(socket: &SocketRef) {
|
||||||
socket.on("message", |socket: SocketRef, Data(data): Data<JsonValue>| async move {
|
socket.on("message", |socket: SocketRef, Data(d): Data<JsonValue>| async move {
|
||||||
info!(?data, "Received event:");
|
socket.emit("message-back", &d).ok();
|
||||||
socket.emit("message-back", &data).ok();
|
|
||||||
});
|
});
|
||||||
|
socket.on("ping", |socket: SocketRef, Data(d): Data<JsonValue>| async move {
|
||||||
socket.on("ping", |socket: SocketRef, Data(data): Data<JsonValue>| async move {
|
socket.emit("pong", &d).ok();
|
||||||
socket.emit("pong", &data).ok();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
socket.on(
|
socket.on(
|
||||||
"message-with-ack",
|
"message-with-ack",
|
||||||
|Data(data): Data<JsonValue>, ack: AckSender| async move {
|
|Data(d): Data<JsonValue>, ack: AckSender| async move {
|
||||||
info!(?data, "Received event");
|
ack.send(&d).ok();
|
||||||
ack.send(&data).ok();
|
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Set up ledger-related handlers
|
async fn register_ledger_handlers(socket: &SocketRef, p2p: Arc<P2PManager>) {
|
||||||
let p2p_manager_clone = p2p_manager.clone();
|
let add_clone = p2p.clone();
|
||||||
socket.on(
|
socket.on(
|
||||||
"add_ledger_entry",
|
"add_ledger_entry",
|
||||||
move |socket: SocketRef, Data(data): Data<JsonValue>| {
|
move |socket: SocketRef, Data(d): Data<JsonValue>| {
|
||||||
let p2p_manager = p2p_manager_clone.clone();
|
let p2p = add_clone.clone();
|
||||||
async move {
|
async move { handle_add_entry(socket, p2p, d).await }
|
||||||
info!(?data, "Adding ledger entry");
|
|
||||||
|
|
||||||
// Add the entry to the ledger
|
|
||||||
match p2p_manager.ledger.add_entry(data) {
|
|
||||||
Ok(entry) => {
|
|
||||||
// Broadcast the entry to all connected nodes
|
|
||||||
p2p_manager.broadcast_entry(entry.clone());
|
|
||||||
|
|
||||||
// Send the entry back to the client
|
|
||||||
socket.emit("ledger_entry_added", &serde_json::to_value(entry).unwrap()).ok();
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
socket.emit("error", &json!({ "error": e })).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let p2p_manager_clone = p2p_manager.clone();
|
let get_clone = p2p.clone();
|
||||||
socket.on("get_ledger", move |socket: SocketRef| {
|
socket.on("get_ledger", move |socket: SocketRef| {
|
||||||
let p2p_manager = p2p_manager_clone.clone();
|
let p2p = get_clone.clone();
|
||||||
async move {
|
async move {
|
||||||
info!("Getting ledger entries");
|
let entries = p2p.ledger.get_entries();
|
||||||
|
socket.emit("ledger_entries", &json!(entries)).ok();
|
||||||
// Get all entries in the ledger
|
|
||||||
let entries = p2p_manager.ledger.get_entries();
|
|
||||||
|
|
||||||
// Send the entries to the client
|
|
||||||
socket.emit("ledger_entries", &serde_json::to_value(entries).unwrap()).ok();
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let p2p_manager_clone = p2p_manager.clone();
|
let nodes_clone = p2p.clone();
|
||||||
socket.on("get_known_nodes", move |socket: SocketRef| {
|
socket.on("get_known_nodes", move |socket: SocketRef| {
|
||||||
let p2p_manager = p2p_manager_clone.clone();
|
let p2p = nodes_clone.clone();
|
||||||
async move {
|
async move {
|
||||||
info!("Getting known nodes");
|
let nodes = p2p.ledger.get_known_nodes();
|
||||||
|
|
||||||
// Get all known nodes
|
|
||||||
let nodes = p2p_manager.ledger.get_known_nodes();
|
|
||||||
|
|
||||||
// Send the nodes to the client
|
|
||||||
socket.emit("known_nodes", &json!({ "nodes": nodes })).ok();
|
socket.emit("known_nodes", &json!({ "nodes": nodes })).ok();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle p2p node connections
|
async fn handle_add_entry(socket: SocketRef, p2p: Arc<P2PManager>, data: JsonValue) {
|
||||||
async fn on_p2p_connect(socket: SocketRef, Data(data): Data<JsonValue>, p2p_manager: Arc<P2PManager>) {
|
match p2p.ledger.add_entry(data) {
|
||||||
|
Ok(entry) => {
|
||||||
|
p2p.broadcast_entry(entry.clone());
|
||||||
|
socket.emit("ledger_entry_added", &json!(entry)).ok();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
socket.emit("error", &json!({ "error": e })).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_p2p_connect(socket: SocketRef, Data(data): Data<JsonValue>, p2p: Arc<P2PManager>) {
|
||||||
info!(ns = socket.ns(), ?socket.id, "P2P node connected");
|
info!(ns = socket.ns(), ?socket.id, "P2P node connected");
|
||||||
|
p2p.handle_connection(socket, data);
|
||||||
// Handle the connection in the p2p manager
|
|
||||||
p2p_manager.handle_connection(socket, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
tracing::subscriber::set_global_default(FmtSubscriber::default())?;
|
|
||||||
|
|
||||||
// create an iroh endpoint that includes the standard discovery mechanisms
|
|
||||||
// we've built at number0
|
|
||||||
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
|
|
||||||
|
|
||||||
// create an in-memory blob store
|
|
||||||
// use `iroh_blobs::net_protocol::Blobs::persistent` to load or create a
|
|
||||||
// persistent blob store from a path
|
|
||||||
let blobs = Arc::new(Blobs::memory().build(&endpoint));
|
|
||||||
|
|
||||||
// turn on the "rpc" feature if you need to create blobs client
|
|
||||||
let blobs_client = blobs.client();
|
|
||||||
|
|
||||||
// build the router
|
|
||||||
let iroh_router = Router::builder(endpoint.clone())
|
|
||||||
.accept(iroh_blobs::ALPN, blobs.clone())
|
|
||||||
.spawn();
|
|
||||||
|
|
||||||
// Generate a unique ID for this node
|
|
||||||
let node_id = Uuid::new_v4();
|
|
||||||
info!("Starting node with ID: {}", node_id);
|
|
||||||
|
|
||||||
|
|
||||||
// Create the shared ledger
|
|
||||||
let ledger = SharedLedger::new(node_id.to_string());
|
|
||||||
|
|
||||||
// Create the p2p manager
|
|
||||||
let p2p_manager = Arc::new(P2PManager::new(node_id.to_string(), ledger));
|
|
||||||
|
|
||||||
// Store the blobs for later use
|
|
||||||
let blobs_arc = blobs.clone();
|
|
||||||
|
|
||||||
let (layer, io) = SocketIo::new_layer();
|
|
||||||
|
|
||||||
// Set up namespaces
|
|
||||||
let p2p_manager_clone = p2p_manager.clone();
|
|
||||||
io.ns("/", move |s, d| on_connect(s, d, p2p_manager_clone.clone()));
|
|
||||||
|
|
||||||
let p2p_manager_clone = p2p_manager.clone();
|
|
||||||
io.ns("/p2p", move |s, d| on_p2p_connect(s, d, p2p_manager_clone.clone()));
|
|
||||||
|
|
||||||
let p2p_manager_clone = p2p_manager.clone();
|
|
||||||
// Pass the blobs to the peer message handler
|
|
||||||
io.ns("/peers", async move |s, d| {
|
|
||||||
let blobs_client = blobs_arc.client();
|
|
||||||
|
|
||||||
on_peer_message(s, d, p2p_manager_clone.clone(), &blobs_client.clone()).await.to_owned()
|
|
||||||
});
|
|
||||||
|
|
||||||
// Set up periodic advertisement of this node via socketioxide
|
|
||||||
let io_clone = io.clone();
|
|
||||||
let node_id_string = node_id.to_string();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
loop {
|
|
||||||
io_clone.of("/peers").unwrap().emit("advertise", &json!({
|
|
||||||
"type": "advertise",
|
|
||||||
"peer_id": node_id_string
|
|
||||||
})).await.ok();
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Set up iroh peer discovery
|
|
||||||
let endpoint_clone = endpoint.clone();
|
|
||||||
let io_clone = io.clone();
|
|
||||||
let node_id_string = node_id.to_string();
|
|
||||||
let blobs_clone = blobs.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
// Create a channel for peer discovery
|
|
||||||
let (_, mut rx) = tokio::sync::mpsc::channel(100);
|
|
||||||
|
|
||||||
// Start listening for peer announcements
|
|
||||||
// Subscribe to the topic for peer discovery
|
|
||||||
let _handle = iroh_router.endpoint().accept().await.unwrap();
|
|
||||||
|
|
||||||
// Periodically announce our presence
|
|
||||||
let endpoint_clone2 = endpoint_clone.clone();
|
|
||||||
let node_id_string2 = node_id_string.clone();
|
|
||||||
let blobs_inner = blobs_clone.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
loop {
|
|
||||||
// Announce our presence
|
|
||||||
let announcement = format!("gsio-node:{}", node_id_string2);
|
|
||||||
// TODO: Fix this when we have the correct iroh API
|
|
||||||
|
|
||||||
// add some data and remember the hash
|
|
||||||
let res = blobs_inner.client().add_bytes(announcement).await.unwrap();
|
|
||||||
|
|
||||||
// create a ticket
|
|
||||||
let addr = iroh_router.endpoint().node_addr().await.unwrap();
|
|
||||||
let ticket = BlobTicket::new(addr, res.hash, res.format).unwrap();
|
|
||||||
|
|
||||||
// print some info about the node
|
|
||||||
println!("serving hash: {}", ticket.hash());
|
|
||||||
println!("node id: {}", ticket.node_addr().node_id);
|
|
||||||
println!("node listening addresses:");
|
|
||||||
for addr in ticket.node_addr().direct_addresses() {
|
|
||||||
println!("\t{:?}", addr);
|
|
||||||
}
|
|
||||||
println!(
|
|
||||||
"node relay server url: {:?}",
|
|
||||||
ticket
|
|
||||||
.node_addr()
|
|
||||||
.relay_url()
|
|
||||||
.expect("a default relay url should be provided")
|
|
||||||
.to_string()
|
|
||||||
);
|
|
||||||
// print the ticket, containing all the above information
|
|
||||||
println!("\nin another terminal, run:");
|
|
||||||
println!("\t cargo run --example hello-world-fetch {}", ticket);
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Process peer announcements
|
|
||||||
while let Some(msg) = rx.recv().await {
|
|
||||||
if let Ok(announcement) = String::from_utf8(msg) {
|
|
||||||
if announcement.starts_with("gsio-node:") {
|
|
||||||
let parts: Vec<&str> = announcement.splitn(2, ':').collect();
|
|
||||||
if parts.len() == 2 {
|
|
||||||
let peer_id = parts[1];
|
|
||||||
|
|
||||||
// Don't connect to ourselves
|
|
||||||
if peer_id != node_id_string {
|
|
||||||
info!("Discovered peer via iroh: {}", peer_id);
|
|
||||||
|
|
||||||
// Emit a message to the peers namespace to handle the new peer
|
|
||||||
io_clone.of("/peers").unwrap().emit("peer_discovered", &json!({
|
|
||||||
"type": "peer_discovered",
|
|
||||||
"peer_id": peer_id
|
|
||||||
})).await.ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let app = axum::Router::new()
|
|
||||||
.route("/", get(|| async { "GSIO-Net Distributed Ledger Node" }))
|
|
||||||
.layer(layer);
|
|
||||||
|
|
||||||
info!("Starting server on port 3000");
|
|
||||||
info!("Node is ready for peer synchronization using both socketioxide and iroh");
|
|
||||||
info!("- Using iroh for peer discovery and blob storage");
|
|
||||||
info!("- Using socketioxide for direct communication between peers");
|
|
||||||
info!("- Each node is an independent unit capable of synchronizing with new peers");
|
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
|
|
||||||
axum::serve(listener, app).await.unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// ========== Peer-to-peer message router ==========
|
||||||
async fn on_peer_message(
|
async fn on_peer_message(
|
||||||
socket: SocketRef,
|
socket: SocketRef,
|
||||||
Data(data): Data<JsonValue>,
|
Data(data): Data<JsonValue>,
|
||||||
p2p_manager: Arc<P2PManager>,
|
p2p: Arc<P2PManager>,
|
||||||
blobs_client: &MemClient,
|
blobs_client: &MemClient,
|
||||||
) {
|
) {
|
||||||
info!(ns = socket.ns(), ?socket.id, "Peer message received");
|
if let Some(msg_type) = data.get("type").and_then(|t| t.as_str()) {
|
||||||
|
match msg_type {
|
||||||
// Handle different types of peer messages
|
"peer_discovered" => handle_peer_discovered(socket, p2p, &data).await,
|
||||||
if let Some(message_type) = data.get("type").and_then(|t| t.as_str()) {
|
"advertise" => handle_advertise(socket, p2p, &data).await,
|
||||||
match message_type {
|
"sync_request" => handle_sync_request(socket, p2p, &data).await,
|
||||||
"peer_discovered" => {
|
"sync_response" => handle_sync_response(socket, p2p, &data).await,
|
||||||
// A peer was discovered via iroh
|
"fetch_blob" => handle_fetch_blob(socket, p2p, &data, blobs_client).await,
|
||||||
if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) {
|
"entry_announce" => handle_entry_announce(socket, p2p, &data).await,
|
||||||
info!("Peer discovered via iroh: {}", peer_id);
|
"blob_available" => handle_blob_available(socket, p2p, &data).await,
|
||||||
|
_ => info!("Unknown peer message type: {msg_type}"),
|
||||||
// Add the peer to the known nodes
|
|
||||||
p2p_manager.ledger.add_known_node(peer_id.to_string());
|
|
||||||
|
|
||||||
// Send an advertise message to initiate connection
|
|
||||||
socket.emit("advertise", &json!({
|
|
||||||
"type": "advertise",
|
|
||||||
"peer_id": p2p_manager.node_id()
|
|
||||||
})).ok();
|
|
||||||
}
|
}
|
||||||
},
|
|
||||||
"advertise" => {
|
|
||||||
// A peer is advertising its presence
|
|
||||||
if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) {
|
|
||||||
info!("Peer advertised: {}", peer_id);
|
|
||||||
|
|
||||||
// Add the peer to the known nodes
|
|
||||||
p2p_manager.ledger.add_known_node(peer_id.to_string());
|
|
||||||
|
|
||||||
// Respond with our node ID
|
|
||||||
socket.emit("peer_ack", &json!({
|
|
||||||
"type": "ack",
|
|
||||||
"peer_id": p2p_manager.node_id()
|
|
||||||
})).ok();
|
|
||||||
|
|
||||||
// Request a sync of the ledger
|
|
||||||
socket.emit("peer_sync_request", &json!({
|
|
||||||
"type": "sync_request",
|
|
||||||
"peer_id": p2p_manager.node_id()
|
|
||||||
})).ok();
|
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
"sync_request" => {
|
|
||||||
// A peer is requesting a sync of our ledger
|
/// ---- Individual peer-message helpers ----
|
||||||
|
async fn handle_peer_discovered(socket: SocketRef, p2p: Arc<P2PManager>, data: &JsonValue) {
|
||||||
if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) {
|
if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) {
|
||||||
info!("Sync requested by peer: {}", peer_id);
|
p2p.ledger.add_known_node(peer_id.to_owned());
|
||||||
|
socket
|
||||||
|
.emit(
|
||||||
|
"advertise",
|
||||||
|
&json!({ "type": "advertise", "peer_id": p2p.node_id() }),
|
||||||
|
)
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get all entries in the ledger
|
async fn handle_advertise(socket: SocketRef, p2p: Arc<P2PManager>, data: &JsonValue) {
|
||||||
let entries = p2p_manager.ledger.get_entries();
|
if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) {
|
||||||
|
p2p.ledger.add_known_node(peer_id.to_owned());
|
||||||
|
socket
|
||||||
|
.emit("peer_ack", &json!({ "type": "ack", "peer_id": p2p.node_id() }))
|
||||||
|
.ok();
|
||||||
|
socket
|
||||||
|
.emit(
|
||||||
|
"peer_sync_request",
|
||||||
|
&json!({ "type": "sync_request", "peer_id": p2p.node_id() }),
|
||||||
|
)
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Send the entries to the peer
|
async fn handle_sync_request(socket: SocketRef, p2p: Arc<P2PManager>, _data: &JsonValue) {
|
||||||
socket.emit("peer_sync_response", &json!({
|
let entries = p2p.ledger.get_entries();
|
||||||
|
socket
|
||||||
|
.emit(
|
||||||
|
"peer_sync_response",
|
||||||
|
&json!({
|
||||||
"type": "sync_response",
|
"type": "sync_response",
|
||||||
"peer_id": p2p_manager.node_id(),
|
"peer_id": p2p.node_id(),
|
||||||
"entries": entries
|
"entries": entries
|
||||||
})).ok();
|
}),
|
||||||
}
|
)
|
||||||
},
|
.ok();
|
||||||
"sync_response" => {
|
}
|
||||||
// A peer is sending us their ledger entries
|
|
||||||
if let Some(peer_id) = data.get("peer_id").and_then(|id| id.as_str()) {
|
|
||||||
info!("Sync response from peer: {}", peer_id);
|
|
||||||
|
|
||||||
// Process the entries
|
async fn handle_sync_response(_socket: SocketRef, p2p: Arc<P2PManager>, data: &JsonValue) {
|
||||||
if let Some(entries) = data.get("entries") {
|
if let Some(entries_val) = data.get("entries") {
|
||||||
if let Ok(entries) = serde_json::from_value::<Vec<LedgerEntry>>(entries.clone()) {
|
if let Ok(entries) = serde_json::from_value::<Vec<LedgerEntry>>(entries_val.clone()) {
|
||||||
for entry in entries {
|
for e in entries {
|
||||||
// Add the entry to the pending entries
|
p2p.ledger.add_pending_entry(e);
|
||||||
p2p_manager.ledger.add_pending_entry(entry);
|
|
||||||
}
|
}
|
||||||
|
let added = p2p.ledger.process_pending_entries();
|
||||||
|
info!("Added {} entries from peer sync", added.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Process pending entries
|
async fn handle_fetch_blob(
|
||||||
let added_entries = p2p_manager.ledger.process_pending_entries();
|
socket: SocketRef,
|
||||||
info!("Added {} entries from peer sync", added_entries.len());
|
p2p: Arc<P2PManager>,
|
||||||
}
|
data: &JsonValue,
|
||||||
}
|
_blobs_client: &MemClient,
|
||||||
|
) {
|
||||||
// If blob hash is provided, fetch the blob using iroh
|
|
||||||
if let Some(blob_hash) = data.get("blob_hash").and_then(|h| h.as_str()) {
|
if let Some(blob_hash) = data.get("blob_hash").and_then(|h| h.as_str()) {
|
||||||
info!("Blob hash provided in sync response: {}", blob_hash);
|
let hash_str = blob_hash.to_owned();
|
||||||
|
|
||||||
// Emit a message to fetch the blob
|
|
||||||
socket.emit("fetch_blob", &json!({
|
|
||||||
"type": "fetch_blob",
|
|
||||||
"peer_id": p2p_manager.node_id(),
|
|
||||||
"blob_hash": blob_hash
|
|
||||||
})).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"fetch_blob" => {
|
|
||||||
// A peer is requesting a blob
|
|
||||||
if let Some(blob_hash) = data.get("blob_hash").and_then(|h| h.as_str()) {
|
|
||||||
info!("Blob fetch requested: {}", blob_hash);
|
|
||||||
|
|
||||||
// Clone blob_hash to avoid lifetime issues
|
|
||||||
let blob_hash = blob_hash.to_string();
|
|
||||||
|
|
||||||
// Use iroh blobs to fetch the blob
|
|
||||||
let socket_clone = socket.clone();
|
let socket_clone = socket.clone();
|
||||||
let node_id = p2p_manager.node_id().to_string();
|
let node_id = p2p.node_id().to_owned();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Parse the hash and fetch the blob
|
match Hash::from_str(&hash_str) {
|
||||||
match Hash::from_str(&blob_hash) {
|
Ok(_hash) => {
|
||||||
Ok(hash) => {
|
socket_clone
|
||||||
// Acknowledge the fetch
|
.emit(
|
||||||
socket_clone.emit("blob_fetch_ack", &json!({
|
"blob_fetch_ack",
|
||||||
|
&json!({
|
||||||
"type": "blob_fetch_ack",
|
"type": "blob_fetch_ack",
|
||||||
"peer_id": node_id,
|
"peer_id": node_id,
|
||||||
"blob_hash": blob_hash,
|
"blob_hash": hash_str,
|
||||||
"status": "success"
|
"status": "success"
|
||||||
})).ok();
|
}),
|
||||||
},
|
)
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Report error
|
socket_clone
|
||||||
socket_clone.emit("blob_fetch_ack", &json!({
|
.emit(
|
||||||
|
"blob_fetch_ack",
|
||||||
|
&json!({
|
||||||
"type": "blob_fetch_ack",
|
"type": "blob_fetch_ack",
|
||||||
"peer_id": node_id,
|
"peer_id": node_id,
|
||||||
"blob_hash": blob_hash,
|
"blob_hash": hash_str,
|
||||||
"status": "error",
|
"status": "error",
|
||||||
"error": format!("Invalid hash: {}", e)
|
"error": format!("Invalid hash: {e}")
|
||||||
})).ok();
|
}),
|
||||||
|
)
|
||||||
|
.ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
"entry_announce" => {
|
|
||||||
// A peer is announcing a new ledger entry
|
|
||||||
if let Some(entry) = data.get("entry") {
|
|
||||||
if let Ok(entry) = serde_json::from_value::<LedgerEntry>(entry.clone()) {
|
|
||||||
info!("Entry announced by peer: {}", entry.id);
|
|
||||||
|
|
||||||
// Add the entry to the pending entries
|
async fn handle_entry_announce(socket: SocketRef, p2p: Arc<P2PManager>, data: &JsonValue) {
|
||||||
p2p_manager.ledger.add_pending_entry(entry.clone());
|
if let Some(entry_val) = data.get("entry") {
|
||||||
|
if let Ok(entry) = serde_json::from_value::<LedgerEntry>(entry_val.clone()) {
|
||||||
// Process pending entries
|
p2p.ledger.add_pending_entry(entry.clone());
|
||||||
let added_entries = p2p_manager.ledger.process_pending_entries();
|
let added = p2p.ledger.process_pending_entries();
|
||||||
|
for e in added {
|
||||||
// Broadcast any new entries that were added
|
p2p.broadcast_entry(e);
|
||||||
for entry in added_entries {
|
|
||||||
p2p_manager.broadcast_entry(entry.clone());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the entry in iroh blobs
|
|
||||||
let socket_clone = socket.clone();
|
let socket_clone = socket.clone();
|
||||||
let node_id = p2p_manager.node_id().to_string();
|
let node_id = p2p.node_id().to_owned();
|
||||||
|
|
||||||
// Clone entry id to avoid lifetime issues
|
|
||||||
let entry_id = entry.id.clone();
|
let entry_id = entry.id.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Create a hash from the entry JSON
|
let hash_str = format!("entry-{entry_id}-hash");
|
||||||
|
if Hash::from_str(&hash_str).is_ok() {
|
||||||
// In a real implementation, we would store the entry in iroh blobs
|
socket_clone
|
||||||
// and get the hash from the storage operation
|
.emit(
|
||||||
|
"blob_available",
|
||||||
// For now, create a hash from the entry ID
|
&json!({
|
||||||
let hash_str = format!("entry-{}-hash", entry_id);
|
|
||||||
|
|
||||||
// Try to parse the hash
|
|
||||||
match Hash::from_str(&hash_str) {
|
|
||||||
Ok(hash) => {
|
|
||||||
// Notify peers about the blob
|
|
||||||
socket_clone.emit("blob_available", &json!({
|
|
||||||
"type": "blob_available",
|
"type": "blob_available",
|
||||||
"peer_id": node_id,
|
"peer_id": node_id,
|
||||||
"entry_id": entry_id,
|
"entry_id": entry_id,
|
||||||
"blob_hash": hash_str
|
"blob_hash": hash_str
|
||||||
})).ok();
|
}),
|
||||||
},
|
)
|
||||||
Err(e) => {
|
.ok();
|
||||||
// Log the error
|
|
||||||
info!("Error creating hash for entry {}: {}", entry_id, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
"blob_available" => {
|
|
||||||
// A peer is notifying us about an available blob
|
|
||||||
if let Some(blob_hash) = data.get("blob_hash").and_then(|h| h.as_str()) {
|
|
||||||
if let Some(entry_id) = data.get("entry_id").and_then(|id| id.as_str()) {
|
|
||||||
info!("Blob available for entry {}: {}", entry_id, blob_hash);
|
|
||||||
|
|
||||||
// Request the blob
|
async fn handle_blob_available(socket: SocketRef, p2p: Arc<P2PManager>, data: &JsonValue) {
|
||||||
socket.emit("fetch_blob", &json!({
|
if let (Some(blob_hash), Some(entry_id)) = (
|
||||||
|
data.get("blob_hash").and_then(|h| h.as_str()),
|
||||||
|
data.get("entry_id").and_then(|id| id.as_str()),
|
||||||
|
) {
|
||||||
|
socket
|
||||||
|
.emit(
|
||||||
|
"fetch_blob",
|
||||||
|
&json!({
|
||||||
"type": "fetch_blob",
|
"type": "fetch_blob",
|
||||||
"peer_id": p2p_manager.node_id(),
|
"peer_id": p2p.node_id(),
|
||||||
"blob_hash": blob_hash,
|
"blob_hash": blob_hash,
|
||||||
"entry_id": entry_id
|
"entry_id": entry_id
|
||||||
})).ok();
|
}),
|
||||||
}
|
)
|
||||||
}
|
.ok();
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
info!("Unknown peer message type: {}", message_type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// ========== Application bootstrap ==========
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
tracing::subscriber::set_global_default(FmtSubscriber::default())?;
|
||||||
|
|
||||||
|
// --- IROH SETUP --------------------------------------------------------
|
||||||
|
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
|
||||||
|
// Concrete store type inferred from the builder
|
||||||
|
let blobs = Arc::new(Blobs::memory().build(&endpoint));
|
||||||
|
let router = IrohRouter::builder(endpoint.clone())
|
||||||
|
.accept(ALPN, blobs.clone())
|
||||||
|
.spawn();
|
||||||
|
|
||||||
|
// --- NODE & LEDGER -----------------------------------------------------
|
||||||
|
let node_id = Uuid::new_v4();
|
||||||
|
info!("Starting node with ID: {node_id}");
|
||||||
|
let ledger = SharedLedger::new(node_id.to_string());
|
||||||
|
let p2p = Arc::new(P2PManager::new(node_id.to_string(), ledger));
|
||||||
|
|
||||||
|
// --- SOCKET.IO ---------------------------------------------------------
|
||||||
|
let (layer, io) = SocketIo::new_layer();
|
||||||
|
register_root_namespace(&io, p2p.clone());
|
||||||
|
register_p2p_namespace(&io, p2p.clone());
|
||||||
|
register_peer_namespace(&io, p2p.clone(), blobs.clone());
|
||||||
|
|
||||||
|
spawn_advertisement_task(io.clone(), node_id.to_string());
|
||||||
|
spawn_peer_discovery_task(endpoint, router, io.clone(), node_id.to_string(), blobs);
|
||||||
|
|
||||||
|
// --- HTTP SERVER -------------------------------------------------------
|
||||||
|
let app = Router::new()
|
||||||
|
.route("/", get(|| async { "GSIO-Net Distributed Ledger Node" }))
|
||||||
|
.layer(layer);
|
||||||
|
|
||||||
|
info!("Server listening on 0.0.0.0:3000");
|
||||||
|
let listener = TcpListener::bind("0.0.0.0:3000").await?;
|
||||||
|
axum::serve(listener, app).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user