From 36ba5250390675dab3e430a13a315f6260445717 Mon Sep 17 00:00:00 2001 From: RonaldsonBellande Date: Wed, 30 Oct 2024 11:21:24 -0400 Subject: [PATCH] latest pushes --- README.md | 4 +- src/bellande_mesh_sync.rs | 2 +- src/mesh/mesh.rs | 279 ++++++++++++++------------------- src/node/node.rs | 45 +++--- src/persistence/persistence.rs | 3 +- 5 files changed, 144 insertions(+), 189 deletions(-) diff --git a/README.md b/README.md index 89cf28e..20b2cf3 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ ### Node Management - Discovery -- Automatic peer finding +- Automatic node finding - Bootstrap nodes - Node registration - Network topology @@ -35,7 +35,7 @@ - Connection monitoring - Data Synchronization -- Peer sync protocols +- Node sync protocols - Data chunk transfer - State reconciliation - Conflict resolution diff --git a/src/bellande_mesh_sync.rs b/src/bellande_mesh_sync.rs index 03838de..220b41e 100644 --- a/src/bellande_mesh_sync.rs +++ b/src/bellande_mesh_sync.rs @@ -29,7 +29,7 @@ mod persistence; pub use crate::config::config::Config; pub use crate::error::error::BellandeMeshError; pub use crate::mesh::mesh::BellandeMeshSync; -use crate::mesh::mesh::NetworkStats; +pub use crate::mesh::mesh::NetworkStats; pub use crate::metrics::metrics::MetricsManager; pub use crate::node::node::{Node, NodeId, PublicKey}; pub use crate::persistence::persistence::PersistenceManager; diff --git a/src/mesh/mesh.rs b/src/mesh/mesh.rs index a1d2059..a232036 100644 --- a/src/mesh/mesh.rs +++ b/src/mesh/mesh.rs @@ -30,11 +30,11 @@ use std::fs; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::net::{TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket}; use tokio::signal; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, Mutex, RwLock}; use tokio_util::sync::CancellationToken; // Constants @@ -83,7 +83,7 @@ pub struct BellandeMeshSync { https_client: Client>, stats: Arc>, metrics: Arc, - message_tx: mpsc::Sender<(Message, SocketAddr)>, + message_tx: Arc>>, message_rx: Arc>>, cancel_token: CancellationToken, } @@ -136,7 +136,7 @@ impl BellandeMeshSync { https_client, stats, metrics, - message_tx, + message_tx: Arc::new(Mutex::new(message_tx)), message_rx: Arc::new(Mutex::new(message_rx)), cancel_token: CancellationToken::new(), }) @@ -169,7 +169,7 @@ impl BellandeMeshSync { https_client, stats, metrics, - message_tx, + message_tx: Arc::new(Mutex::new(message_tx)), message_rx: Arc::new(Mutex::new(message_rx)), cancel_token: CancellationToken::new(), }) @@ -204,12 +204,10 @@ impl BellandeMeshSync { } pub async fn start(&self) -> Result<(), BellandeMeshError> { - let mut running = self - .running - .write() - .map_err(|_| BellandeMeshError::LockError)?; - *running = true; - drop(running); + { + let mut running = self.running.write().await; + *running = true; + } self.start_message_handler().await?; self.start_protocol_listeners().await?; @@ -219,10 +217,7 @@ impl BellandeMeshSync { } pub async fn stop(&self) -> Result<(), BellandeMeshError> { - let mut running = self - .running - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut running = self.running.write().await; *running = false; Ok(()) } @@ -230,8 +225,16 @@ impl BellandeMeshSync { pub async fn start_message_handler(&self) -> Result<(), BellandeMeshError> { let handler = self.clone(); tokio::spawn(async move { - let mut rx = handler.message_rx.lock().await; - while let Some((message, addr)) = rx.recv().await { + loop { + let message = { + let mut rx = handler.message_rx.lock().await; + match rx.recv().await { + Some(msg) => msg, + None => break, + } + }; + + let (message, addr) = message; if let Err(e) = handler.handle_message_internal(message, addr).await { eprintln!("Message handling error from {}: {}", addr, e); } @@ -254,18 +257,28 @@ impl BellandeMeshSync { .listen_address .parse::() .map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?; + let listener = TokioTcpListener::bind(addr).await?; let handler = self.clone(); tokio::spawn(async move { - while handler.is_running().unwrap_or(false) { - if let Ok((stream, addr)) = listener.accept().await { - let handler = handler.clone(); - tokio::spawn(async move { - if let Err(e) = handler.handle_tcp_connection(stream).await { - eprintln!("TCP error from {}: {}", addr, e); + loop { + match handler.is_running().await { + Ok(true) => { + if let Ok((stream, addr)) = listener.accept().await { + let handler = handler.clone(); + tokio::spawn(async move { + if let Err(e) = handler.handle_tcp_connection(stream).await { + eprintln!("TCP error from {}: {}", addr, e); + } + }); } - }); + } + Ok(false) => break, + Err(e) => { + eprintln!("Error checking running state: {}", e); + break; + } } } }); @@ -371,7 +384,7 @@ impl BellandeMeshSync { let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config); tokio::spawn(async move { - while handler.is_running().unwrap_or(false) { + while handler.is_running().await.unwrap_or(false) { if let Ok((stream, addr)) = listener.accept().await { let handler = handler.clone(); let acceptor = tls_acceptor.clone(); @@ -492,10 +505,14 @@ impl BellandeMeshSync { let (mut reader, mut writer) = tokio::io::split(stream); let mut buf = [0u8; MAX_MESSAGE_SIZE]; - while self.is_running()? { + loop { + if !self.is_running().await? { + break; + } + match self.read_async_message(&mut reader, &mut buf).await { Ok(message) => { - self.update_stats(|stats| stats.total_messages += 1); + self.update_stats(|stats| stats.total_messages += 1).await?; if let Err(e) = self.write_async_message(&mut writer, &message).await { eprintln!("Write error: {}", e); break; @@ -694,19 +711,20 @@ impl BellandeMeshSync { vec![], ); - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; - if let Some(node) = nodes.iter().find(|n| n.address == src) { - let mut data = node - .data - .write() - .map_err(|_| BellandeMeshError::LockError)?; - data.insert(NodeId::from_bytes(&key), chunk); + let target_addr = { + let nodes = self.nodes.read().await; + if let Some(node) = nodes.iter().find(|n| n.address == src) { + let mut data = node.data.write().await; + data.insert(NodeId::from_bytes(&key), chunk); + Some(node.address) + } else { + None + } + }; - // Replicate to closest nodes + if target_addr.is_some() { let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3).await; + for target_node in closest_nodes { let store_msg = Message::Store { key: key.clone(), @@ -714,6 +732,7 @@ impl BellandeMeshSync { sender: self.get_local_id().await?, token: rand::random(), }; + if let Err(e) = self.send_message(target_node.address, &store_msg).await { eprintln!( "Failed to replicate store to {}: {}", @@ -747,26 +766,24 @@ impl BellandeMeshSync { src: SocketAddr, ) -> Result<(), BellandeMeshError> { let node_id = NodeId::from_bytes(&key); - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; + // If node.data is tokio::sync::RwLock for node in nodes.iter() { - if let Ok(data) = node.data.read() { - if let Some(chunk) = data.get(&node_id) { - let response = Message::Value { - key: key.clone(), - value: chunk.content.clone(), - sender: self.get_local_id().await?, - token: rand::random(), - }; - return self.send_message(src, &response).await; - } + let data = node.data.read().await; + if let Some(chunk) = data.get(&node_id) { + let response = Message::Value { + key: key.clone(), + value: chunk.content.clone(), + sender: self.get_local_id().await?, + token: rand::random(), + }; + return self.send_message(src, &response).await; } } // If not found, return closest nodes + drop(nodes); let closest_nodes = self.find_closest_nodes(&node_id, 20).await; let response = Message::Nodes { nodes: closest_nodes, @@ -806,15 +823,12 @@ impl BellandeMeshSync { vec![], ); - let mut nodes = self - .nodes - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut nodes = self.nodes.write().await; if let Some(node) = nodes.first_mut() { - if let Ok(mut data) = node.data.write() { - data.insert(NodeId::from_bytes(&key), chunk); - self.metrics.increment_sync_operations(); - } + // If node.data is tokio::sync::RwLock + let mut data = node.data.write().await; + data.insert(NodeId::from_bytes(&key), chunk); + self.metrics.increment_sync_operations(); } Ok(()) } @@ -823,10 +837,7 @@ impl BellandeMeshSync { where F: FnOnce(&mut NetworkStats), { - let mut stats = self - .stats - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut stats = self.stats.write().await; updater(&mut stats); Ok(()) } @@ -862,10 +873,7 @@ impl BellandeMeshSync { } pub async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> { - let mut nodes = self - .nodes - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut nodes = self.nodes.write().await; if let Some(node) = nodes.iter_mut().find(|n| n.address == addr) { node.update_last_seen(); } @@ -900,13 +908,10 @@ impl BellandeMeshSync { let new_node = Node::new(id, addr, public_key); { - let mut nodes = self - .nodes - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut nodes = self.nodes.write().await; if !nodes.iter().any(|n| n.id == new_node.id) { nodes.push(new_node.clone()); - self.update_stats(|stats| stats.active_nodes += 1); + self.update_stats(|stats| stats.active_nodes += 1).await?; } } @@ -915,10 +920,7 @@ impl BellandeMeshSync { } pub async fn handle_data_sync(&self, chunks: Vec) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; for chunk in chunks { if let Some(node) = nodes.iter().find(|n| n.id == chunk.author) { @@ -934,10 +936,7 @@ impl BellandeMeshSync { ids: &[NodeId], src: SocketAddr, ) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; let mut chunks = Vec::new(); for node in nodes.iter() { @@ -968,10 +967,7 @@ impl BellandeMeshSync { sender: NodeId, src: SocketAddr, ) -> Result<(), BellandeMeshError> { - let mut nodes = self - .nodes - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut nodes = self.nodes.write().await; if let Some(node) = nodes.iter_mut().find(|n| n.id == sender) { node.update_last_seen(); } @@ -1013,17 +1009,14 @@ impl BellandeMeshSync { pub async fn sync_with_peers(&self) -> Result<(), BellandeMeshError> { let nodes = { - let nodes_guard = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes_guard = self.nodes.read().await; nodes_guard.to_vec() }; for node in nodes { if let Ok(mut stream) = TcpStream::connect(node.address) { let chunks = { - let data = node.data.read().map_err(|_| BellandeMeshError::LockError)?; + let data = node.data.read().await; // Assuming node.data is also tokio::sync::RwLock data.keys().cloned().collect::>() }; @@ -1043,7 +1036,8 @@ impl BellandeMeshSync { } } - self.update_stats(|stats| stats.last_sync = SystemTime::now()); + self.update_stats(|stats| stats.last_sync = SystemTime::now()) + .await; Ok(()) } @@ -1057,7 +1051,7 @@ impl BellandeMeshSync { // Find the target node if let Some(node) = nodes.iter().find(|n| n.id == node_id) { - // Get write lock on node's data + // Get write lock on node's data (using tokio::sync::RwLock) let mut data = node.data.write().await; // Insert all chunks @@ -1088,7 +1082,12 @@ impl BellandeMeshSync { tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(interval)); - while let Ok(true) = running.read().map(|guard| *guard) { + loop { + let is_running = *running.read().await; + if !is_running { + break; + } + interval.tick().await; metrics.increment_sync_operations(); } @@ -1098,10 +1097,7 @@ impl BellandeMeshSync { } pub async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError> { - let mut nodes = self - .nodes - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut nodes = self.nodes.write().await; if nodes.len() > max_conn { nodes.sort_by(|a, b| b.last_seen.cmp(&a.last_seen)); nodes.truncate(max_conn); @@ -1110,10 +1106,7 @@ impl BellandeMeshSync { } pub async fn broadcast_data(&self, data: Vec) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; let local_id = self.get_local_id().await?; let message = Message::Store { @@ -1133,14 +1126,8 @@ impl BellandeMeshSync { } pub async fn get_network_stats(&self) -> Result { - let stats = self - .stats - .read() - .map_err(|_| BellandeMeshError::LockError)?; - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let stats = self.stats.read().await; + let nodes = self.nodes.read().await; Ok(NetworkStats { tcp_connections: stats.tcp_connections, @@ -1159,10 +1146,7 @@ impl BellandeMeshSync { node_id: NodeId, data: Vec, ) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; let node = nodes .iter() .find(|n| n.id == node_id) @@ -1181,10 +1165,7 @@ impl BellandeMeshSync { pub async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> { let timeout = Duration::from_secs(self.config.node_timeout); - let mut nodes = self - .nodes - .write() - .map_err(|_| BellandeMeshError::LockError)?; + let mut nodes = self.nodes.write().await; let initial_count = nodes.len(); nodes.retain(|node| { @@ -1197,17 +1178,15 @@ impl BellandeMeshSync { let removed = initial_count - nodes.len(); if removed > 0 { - self.update_stats(|stats| stats.active_nodes -= removed); + self.update_stats(|stats| stats.active_nodes -= removed) + .await?; } Ok(()) } pub async fn send_ping_to_all_nodes(&self) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; let local_id = self.get_local_id().await?; let token = rand::random::(); let ping = Message::Ping { @@ -1241,34 +1220,26 @@ impl BellandeMeshSync { } pub async fn get_status(&self) -> String { - let stats = self - .stats - .read() - .map(|guard| guard.clone()) - .unwrap_or_else(|_| NetworkStats::default()); - + let stats = self.stats.read().await.clone(); serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "Error getting status".to_string()) } pub async fn is_running(&self) -> Result { - self.running - .read() - .map_err(|_| BellandeMeshError::LockError) - .map(|guard| *guard) + Ok(*self.running.read().await) } pub async fn get_local_id(&self) -> Result { - self.nodes + Ok(self + .nodes .read() - .map_err(|_| BellandeMeshError::LockError) - .map(|nodes| nodes.first().map(|n| n.id).unwrap_or_else(|| NodeId::new())) + .await + .first() + .map(|n| n.id) + .unwrap_or_else(|| NodeId::new())) } pub async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; let message = Message::JoinResponse { accepted: true, nodes: vec![new_node.clone()], @@ -1281,48 +1252,28 @@ impl BellandeMeshSync { } } } - Ok(()) } - pub async fn broadcast_message(&self, message: Message) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; - + let nodes = self.nodes.read().await; for node in nodes.iter() { if let Err(e) = self.send_message(node.address, &message).await { eprintln!("Failed to broadcast message to {}: {}", node.address, e); } } - Ok(()) } pub async fn get_node_count(&self) -> Result { - Ok(self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)? - .len()) + Ok(self.nodes.read().await.len()) } pub async fn get_node_list(&self) -> Result, BellandeMeshError> { - Ok(self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)? - .iter() - .map(|node| node.id) - .collect()) + Ok(self.nodes.read().await.iter().map(|node| node.id).collect()) } pub async fn is_node_connected(&self, node_id: &NodeId) -> Result { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self.nodes.read().await; Ok(nodes.iter().any(|node| &node.id == node_id)) } } @@ -1338,7 +1289,7 @@ impl Clone for BellandeMeshSync { https_client: self.https_client.clone(), stats: Arc::clone(&self.stats), metrics: Arc::clone(&self.metrics), - message_tx: self.message_tx.clone(), + message_tx: Arc::clone(&self.message_tx), message_rx: Arc::clone(&self.message_rx), cancel_token: CancellationToken::new(), } diff --git a/src/node/node.rs b/src/node/node.rs index 675a4ec..dadc844 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -19,8 +19,9 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, SystemTime}; +use tokio::sync::RwLock; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Serialize, Deserialize, PartialOrd, Ord)] pub struct NodeId([u8; 32]); @@ -167,32 +168,34 @@ impl Node { } pub fn add_data_chunk(&self, chunk: DataChunk) -> bool { - match self.data.write() { - Ok(mut data) => { - data.insert(chunk.id, chunk); - true - } - Err(_) => false, - } + let data = self.data.clone(); + tokio::runtime::Handle::current().block_on(async move { + let mut guard = data.write().await; + guard.insert(chunk.id, chunk); + true + }) } pub fn get_data_chunk(&self, id: &NodeId) -> Option { - self.data.read().ok()?.get(id).cloned() + let data = self.data.clone(); + tokio::runtime::Handle::current().block_on(async move { + let guard = data.read().await; + guard.get(id).cloned() + }) } pub fn remove_old_data(&self, max_age: Duration) -> bool { - match self.data.write() { - Ok(mut data) => { - let now = SystemTime::now(); - data.retain(|_, chunk| { - now.duration_since(chunk.last_modified) - .map(|age| age <= max_age) - .unwrap_or(false) - }); - true - } - Err(_) => false, - } + let data = self.data.clone(); + tokio::runtime::Handle::current().block_on(async move { + let mut guard = data.write().await; + let now = SystemTime::now(); + guard.retain(|_, chunk| { + now.duration_since(chunk.last_modified) + .map(|age| age <= max_age) + .unwrap_or(false) + }); + true + }) } pub fn is_alive(&self, timeout: Duration) -> bool { diff --git a/src/persistence/persistence.rs b/src/persistence/persistence.rs index 79c44a9..27c6ce6 100644 --- a/src/persistence/persistence.rs +++ b/src/persistence/persistence.rs @@ -22,8 +22,9 @@ use std::fs::{File, OpenOptions}; use std::io::{BufReader, BufWriter}; use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use std::time::SystemTime; +use tokio::sync::RwLock; #[derive(Serialize, Deserialize)] struct SerializableNode {