basic network established

This commit is contained in:
geoffsee
2025-06-14 11:21:01 -04:00
commit 9747912595
33 changed files with 8377 additions and 0 deletions

View File

@@ -0,0 +1,242 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use sha2::{Sha256, Digest};
use chrono::{DateTime, Utc};
/// Represents a single entry in the distributed ledger
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LedgerEntry {
/// Unique identifier for the entry
pub id: String,
/// Timestamp when the entry was created
pub timestamp: DateTime<Utc>,
/// The actual data stored in the entry
pub data: serde_json::Value,
/// Hash of the previous entry in the chain
pub previous_hash: String,
/// Hash of this entry
pub hash: String,
/// Node ID that created this entry
pub creator_node_id: String,
/// Signatures from nodes that have validated this entry
pub signatures: HashMap<String, String>,
}
impl LedgerEntry {
/// Create a new ledger entry
pub fn new(
data: serde_json::Value,
previous_hash: String,
creator_node_id: String,
) -> Self {
let timestamp = Utc::now();
let id = format!("{}-{}", creator_node_id, timestamp.timestamp_millis());
let mut entry = Self {
id,
timestamp,
data,
previous_hash,
hash: String::new(),
creator_node_id,
signatures: HashMap::new(),
};
// Calculate the hash of this entry
entry.hash = entry.calculate_hash();
entry
}
/// Calculate the hash of this entry
pub fn calculate_hash(&self) -> String {
let mut hasher = Sha256::new();
// Hash the entry fields
hasher.update(self.id.as_bytes());
hasher.update(self.timestamp.to_rfc3339().as_bytes());
hasher.update(self.data.to_string().as_bytes());
hasher.update(self.previous_hash.as_bytes());
hasher.update(self.creator_node_id.as_bytes());
// Convert the hash to a hex string
format!("{:x}", hasher.finalize())
}
/// Add a signature from a node that has validated this entry
pub fn add_signature(&mut self, node_id: String, signature: String) {
self.signatures.insert(node_id, signature);
}
/// Verify that this entry is valid
pub fn is_valid(&self) -> bool {
// Check that the hash is correct
self.hash == self.calculate_hash()
}
}
/// The distributed ledger
#[derive(Debug)]
pub struct Ledger {
/// The chain of entries in the ledger
entries: Vec<LedgerEntry>,
/// The ID of this node
node_id: String,
/// Pending entries that have been received but not yet added to the chain
pending_entries: HashMap<String, LedgerEntry>,
/// Set of node IDs that are known to this node
known_nodes: HashSet<String>,
}
impl Ledger {
/// Create a new ledger
pub fn new(node_id: String) -> Self {
let mut known_nodes = HashSet::new();
known_nodes.insert(node_id.clone());
Self {
entries: Vec::new(),
node_id,
pending_entries: HashMap::new(),
known_nodes,
}
}
/// Add a new entry to the ledger
pub fn add_entry(&mut self, data: serde_json::Value) -> Result<LedgerEntry, String> {
let previous_hash = match self.entries.last() {
Some(entry) => entry.hash.clone(),
None => "0".repeat(64), // Genesis block has a hash of all zeros
};
let entry = LedgerEntry::new(data, previous_hash, self.node_id.clone());
// Add the entry to the chain
self.entries.push(entry.clone());
Ok(entry)
}
/// Get all entries in the ledger
pub fn get_entries(&self) -> &Vec<LedgerEntry> {
&self.entries
}
/// Get the last entry in the ledger
pub fn get_last_entry(&self) -> Option<&LedgerEntry> {
self.entries.last()
}
/// Add a pending entry that has been received from another node
pub fn add_pending_entry(&mut self, entry: LedgerEntry) {
self.pending_entries.insert(entry.id.clone(), entry);
}
/// Process pending entries and add them to the chain if they are valid
pub fn process_pending_entries(&mut self) -> Vec<LedgerEntry> {
let mut added_entries = Vec::new();
// Get the current last entry in the chain
let last_entry = match self.entries.last() {
Some(entry) => entry.clone(),
None => return added_entries,
};
// Find pending entries that link to the last entry
let mut entries_to_process: Vec<LedgerEntry> = self.pending_entries
.values()
.filter(|e| e.previous_hash == last_entry.hash)
.cloned()
.collect();
// Sort by timestamp to ensure deterministic ordering
entries_to_process.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
// Process each entry
for entry in entries_to_process {
if entry.is_valid() {
// Add the entry to the chain
self.entries.push(entry.clone());
// Remove from pending
self.pending_entries.remove(&entry.id);
// Add to the list of added entries
added_entries.push(entry);
}
}
added_entries
}
/// Add a known node to the network
pub fn add_known_node(&mut self, node_id: String) {
self.known_nodes.insert(node_id);
}
/// Get all known nodes in the network
pub fn get_known_nodes(&self) -> &HashSet<String> {
&self.known_nodes
}
}
/// Thread-safe wrapper around the ledger
#[derive(Clone)]
pub struct SharedLedger {
ledger: Arc<Mutex<Ledger>>,
}
impl SharedLedger {
/// Create a new shared ledger
pub fn new(node_id: String) -> Self {
Self {
ledger: Arc::new(Mutex::new(Ledger::new(node_id))),
}
}
/// Get a clone of the ledger Arc
pub fn clone_ledger(&self) -> Arc<Mutex<Ledger>> {
self.ledger.clone()
}
/// Add a new entry to the ledger
pub fn add_entry(&self, data: serde_json::Value) -> Result<LedgerEntry, String> {
let mut ledger = self.ledger.lock().unwrap();
ledger.add_entry(data)
}
/// Get all entries in the ledger
pub fn get_entries(&self) -> Vec<LedgerEntry> {
let ledger = self.ledger.lock().unwrap();
ledger.get_entries().clone()
}
/// Get the last entry in the ledger
pub fn get_last_entry(&self) -> Option<LedgerEntry> {
let ledger = self.ledger.lock().unwrap();
ledger.get_last_entry().cloned()
}
/// Add a pending entry that has been received from another node
pub fn add_pending_entry(&self, entry: LedgerEntry) {
let mut ledger = self.ledger.lock().unwrap();
ledger.add_pending_entry(entry);
}
/// Process pending entries and add them to the chain if they are valid
pub fn process_pending_entries(&self) -> Vec<LedgerEntry> {
let mut ledger = self.ledger.lock().unwrap();
ledger.process_pending_entries()
}
/// Add a known node to the network
pub fn add_known_node(&self, node_id: String) {
let mut ledger = self.ledger.lock().unwrap();
ledger.add_known_node(node_id);
}
/// Get all known nodes in the network
pub fn get_known_nodes(&self) -> HashSet<String> {
let ledger = self.ledger.lock().unwrap();
ledger.get_known_nodes().clone()
}
}

View File

@@ -0,0 +1,2 @@
pub mod ledger;
pub mod p2p;

View File

@@ -0,0 +1,137 @@
use axum::routing::get;
use serde_json::{json, Value as JsonValue};
use socketioxide::{
extract::{AckSender, Data, SocketRef},
SocketIo,
};
use std::sync::Arc;
use tracing::info;
use tracing_subscriber::FmtSubscriber;
use uuid::Uuid;
mod ledger;
mod p2p;
use ledger::SharedLedger;
use p2p::P2PManager;
// Handle regular client connections
async fn on_connect(socket: SocketRef, Data(data): Data<JsonValue>, p2p_manager: Arc<P2PManager>) {
info!(ns = socket.ns(), ?socket.id, "Socket.IO client connected");
socket.emit("auth", &data).ok();
// Set up basic message handlers
socket.on("message", |socket: SocketRef, Data(data): Data<JsonValue>| async move {
info!(?data, "Received event:");
socket.emit("message-back", &data).ok();
});
socket.on("ping", |socket: SocketRef, Data(data): Data<JsonValue>| async move {
socket.emit("pong", &data).ok();
});
socket.on(
"message-with-ack",
|Data(data): Data<JsonValue>, ack: AckSender| async move {
info!(?data, "Received event");
ack.send(&data).ok();
},
);
// Set up ledger-related handlers
let p2p_manager_clone = p2p_manager.clone();
socket.on(
"add_ledger_entry",
move |socket: SocketRef, Data(data): Data<JsonValue>| {
let p2p_manager = p2p_manager_clone.clone();
async move {
info!(?data, "Adding ledger entry");
// Add the entry to the ledger
match p2p_manager.ledger.add_entry(data) {
Ok(entry) => {
// Broadcast the entry to all connected nodes
p2p_manager.broadcast_entry(entry.clone());
// Send the entry back to the client
socket.emit("ledger_entry_added", &serde_json::to_value(entry).unwrap()).ok();
},
Err(e) => {
socket.emit("error", &json!({ "error": e })).ok();
}
}
}
},
);
let p2p_manager_clone = p2p_manager.clone();
socket.on("get_ledger", move |socket: SocketRef| {
let p2p_manager = p2p_manager_clone.clone();
async move {
info!("Getting ledger entries");
// Get all entries in the ledger
let entries = p2p_manager.ledger.get_entries();
// Send the entries to the client
socket.emit("ledger_entries", &serde_json::to_value(entries).unwrap()).ok();
}
});
let p2p_manager_clone = p2p_manager.clone();
socket.on("get_known_nodes", move |socket: SocketRef| {
let p2p_manager = p2p_manager_clone.clone();
async move {
info!("Getting known nodes");
// Get all known nodes
let nodes = p2p_manager.ledger.get_known_nodes();
// Send the nodes to the client
socket.emit("known_nodes", &json!({ "nodes": nodes })).ok();
}
});
}
// Handle p2p node connections
async fn on_p2p_connect(socket: SocketRef, Data(data): Data<JsonValue>, p2p_manager: Arc<P2PManager>) {
info!(ns = socket.ns(), ?socket.id, "P2P node connected");
// Handle the connection in the p2p manager
p2p_manager.handle_connection(socket, data);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(FmtSubscriber::default())?;
// Generate a unique ID for this node
let node_id = Uuid::new_v4().to_string();
info!("Starting node with ID: {}", node_id);
// Create the shared ledger
let ledger = SharedLedger::new(node_id.clone());
// Create the p2p manager
let p2p_manager = Arc::new(P2PManager::new(node_id, ledger));
let (layer, io) = SocketIo::new_layer();
// Set up namespaces
let p2p_manager_clone = p2p_manager.clone();
io.ns("/", move |s, d| on_connect(s, d, p2p_manager_clone.clone()));
let p2p_manager_clone = p2p_manager.clone();
io.ns("/p2p", move |s, d| on_p2p_connect(s, d, p2p_manager_clone.clone()));
let app = axum::Router::new()
.route("/", get(|| async { "GSIO-Net Distributed Ledger Node" }))
.layer(layer);
info!("Starting server on port 3000");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}

332
crates/gsio-node/src/p2p.rs Normal file
View File

@@ -0,0 +1,332 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};
use socketioxide::extract::{Data, SocketRef};
use tracing::info;
use uuid::Uuid;
use crate::ledger::{LedgerEntry, SharedLedger};
/// Types of messages that can be sent between nodes
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MessageType {
/// Announce a new node joining the network
NodeAnnounce,
/// Request the list of known nodes
NodeListRequest,
/// Response with the list of known nodes
NodeListResponse,
/// Announce a new ledger entry
EntryAnnounce,
/// Request a specific ledger entry
EntryRequest,
/// Response with a requested ledger entry
EntryResponse,
/// Request all ledger entries
LedgerSyncRequest,
/// Response with all ledger entries
LedgerSyncResponse,
}
/// A message sent between nodes in the p2p network
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct P2PMessage {
/// Type of message
pub message_type: MessageType,
/// Unique ID for this message
pub message_id: String,
/// ID of the node that sent this message
pub sender_id: String,
/// ID of the node that should receive this message (empty for broadcast)
pub recipient_id: String,
/// The actual message payload
pub payload: JsonValue,
}
impl P2PMessage {
/// Create a new p2p message
pub fn new(
message_type: MessageType,
sender_id: String,
recipient_id: String,
payload: JsonValue,
) -> Self {
Self {
message_type,
message_id: Uuid::new_v4().to_string(),
sender_id,
recipient_id,
payload,
}
}
}
/// Manages p2p communication between nodes
pub struct P2PManager {
/// The ID of this node
node_id: String,
/// The shared ledger
pub ledger: SharedLedger,
/// Connected sockets by node ID
connected_nodes: Arc<Mutex<HashMap<String, SocketRef>>>,
}
impl P2PManager {
/// Create a new p2p manager
pub fn new(node_id: String, ledger: SharedLedger) -> Self {
Self {
node_id,
ledger,
connected_nodes: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Get the node ID
pub fn node_id(&self) -> &str {
&self.node_id
}
/// Get a clone of the connected nodes Arc
pub fn clone_connected_nodes(&self) -> Arc<Mutex<HashMap<String, SocketRef>>> {
self.connected_nodes.clone()
}
/// Handle a new connection from another node
pub fn handle_connection(&self, socket: SocketRef, data: JsonValue) {
info!(ns = socket.ns(), ?socket.id, "P2P node connected");
// Extract the node ID from the connection data
let node_id = match data.get("node_id") {
Some(id) => id.as_str().unwrap_or("unknown").to_string(),
None => "unknown".to_string(),
};
// Add the node to the connected nodes
{
let mut connected_nodes = self.connected_nodes.lock().unwrap();
connected_nodes.insert(node_id.clone(), socket.clone());
}
// Add the node to the known nodes in the ledger
self.ledger.add_known_node(node_id.clone());
// Send a node announce message to all other nodes
self.broadcast_message(P2PMessage::new(
MessageType::NodeAnnounce,
self.node_id.clone(),
"".to_string(),
json!({ "node_id": node_id }),
));
// Set up event handlers for this socket
self.setup_socket_handlers(socket);
}
/// Set up event handlers for a socket
fn setup_socket_handlers(&self, socket: SocketRef) {
let p2p_manager = self.clone();
// Handle p2p messages
socket.on("p2p_message", move |socket: SocketRef, Data(data): Data<JsonValue>| {
let p2p_manager = p2p_manager.clone();
async move {
info!(?data, "Received p2p message");
// Parse the message
let message: P2PMessage = match serde_json::from_value(data) {
Ok(msg) => msg,
Err(e) => {
info!("Error parsing p2p message: {}", e);
return;
}
};
// Handle the message
p2p_manager.handle_message(socket, message);
}
});
}
/// Handle a p2p message
fn handle_message(&self, socket: SocketRef, message: P2PMessage) {
match message.message_type {
MessageType::NodeAnnounce => self.handle_node_announce(message),
MessageType::NodeListRequest => self.handle_node_list_request(socket, message),
MessageType::EntryAnnounce => self.handle_entry_announce(message),
MessageType::EntryRequest => self.handle_entry_request(socket, message),
MessageType::LedgerSyncRequest => self.handle_ledger_sync_request(socket, message),
_ => info!("Unhandled message type: {:?}", message.message_type),
}
}
/// Handle a node announce message
fn handle_node_announce(&self, message: P2PMessage) {
// Extract the node ID from the message
let node_id = match message.payload.get("node_id") {
Some(id) => id.as_str().unwrap_or("unknown").to_string(),
None => "unknown".to_string(),
};
// Add the node to the known nodes in the ledger
self.ledger.add_known_node(node_id);
}
/// Handle a node list request message
fn handle_node_list_request(&self, socket: SocketRef, message: P2PMessage) {
// Get the list of known nodes
let known_nodes = self.ledger.get_known_nodes();
// Send the response
let response = P2PMessage::new(
MessageType::NodeListResponse,
self.node_id.clone(),
message.sender_id,
json!({ "nodes": known_nodes }),
);
socket.emit("p2p_message", &serde_json::to_value(response).unwrap()).ok();
}
/// Handle an entry announce message
fn handle_entry_announce(&self, message: P2PMessage) {
// Extract the entry from the message
let entry: LedgerEntry = match serde_json::from_value(message.payload.clone()) {
Ok(entry) => entry,
Err(e) => {
info!("Error parsing entry announce: {}", e);
return;
}
};
// Add the entry to the pending entries
self.ledger.add_pending_entry(entry);
// Process pending entries
let added_entries = self.ledger.process_pending_entries();
// Announce any new entries that were added
for entry in added_entries {
self.broadcast_entry(entry);
}
}
/// Handle an entry request message
fn handle_entry_request(&self, socket: SocketRef, message: P2PMessage) {
// Extract the entry ID from the message
let entry_id = match message.payload.get("entry_id") {
Some(id) => id.as_str().unwrap_or("").to_string(),
None => "".to_string(),
};
// Find the entry in the ledger
let entries = self.ledger.get_entries();
let entry = entries.iter().find(|e| e.id == entry_id);
// Send the response
if let Some(entry) = entry {
let response = P2PMessage::new(
MessageType::EntryResponse,
self.node_id.clone(),
message.sender_id,
serde_json::to_value(entry).unwrap(),
);
socket.emit("p2p_message", &serde_json::to_value(response).unwrap()).ok();
}
}
/// Handle a ledger sync request message
fn handle_ledger_sync_request(&self, socket: SocketRef, message: P2PMessage) {
// Get all entries in the ledger
let entries = self.ledger.get_entries();
// Send the response
let response = P2PMessage::new(
MessageType::LedgerSyncResponse,
self.node_id.clone(),
message.sender_id,
serde_json::to_value(entries).unwrap(),
);
socket.emit("p2p_message", &serde_json::to_value(response).unwrap()).ok();
}
/// Broadcast a message to all connected nodes
pub fn broadcast_message(&self, message: P2PMessage) {
let connected_nodes = self.connected_nodes.lock().unwrap();
for (_, socket) in connected_nodes.iter() {
socket.emit("p2p_message", &serde_json::to_value(message.clone()).unwrap()).ok();
}
}
/// Broadcast a new ledger entry to all connected nodes
pub fn broadcast_entry(&self, entry: LedgerEntry) {
let message = P2PMessage::new(
MessageType::EntryAnnounce,
self.node_id.clone(),
"".to_string(),
serde_json::to_value(entry).unwrap(),
);
self.broadcast_message(message);
}
/// Send a message to a specific node
pub fn send_message(&self, recipient_id: String, message: P2PMessage) -> bool {
let connected_nodes = self.connected_nodes.lock().unwrap();
if let Some(socket) = connected_nodes.get(&recipient_id) {
socket.emit("p2p_message", &serde_json::to_value(message).unwrap()).is_ok()
} else {
false
}
}
/// Request the list of known nodes from a specific node
pub fn request_node_list(&self, recipient_id: String) -> bool {
let message = P2PMessage::new(
MessageType::NodeListRequest,
self.node_id.clone(),
recipient_id.clone(),
json!({}),
);
self.send_message(recipient_id, message)
}
/// Request a specific ledger entry from a specific node
pub fn request_entry(&self, recipient_id: String, entry_id: String) -> bool {
let message = P2PMessage::new(
MessageType::EntryRequest,
self.node_id.clone(),
recipient_id.clone(),
json!({ "entry_id": entry_id }),
);
self.send_message(recipient_id, message)
}
/// Request all ledger entries from a specific node
pub fn request_ledger_sync(&self, recipient_id: String) -> bool {
let message = P2PMessage::new(
MessageType::LedgerSyncRequest,
self.node_id.clone(),
recipient_id.clone(),
json!({}),
);
self.send_message(recipient_id, message)
}
}
impl Clone for P2PManager {
fn clone(&self) -> Self {
Self {
node_id: self.node_id.clone(),
ledger: self.ledger.clone(),
connected_nodes: self.connected_nodes.clone(),
}
}
}