latest pushes

This commit is contained in:
2024-10-30 11:21:24 -04:00
parent f74774bb46
commit 36ba525039
5 changed files with 144 additions and 189 deletions

View File

@@ -23,7 +23,7 @@
### Node Management
- Discovery
- Automatic peer finding
- Automatic node finding
- Bootstrap nodes
- Node registration
- Network topology
@@ -35,7 +35,7 @@
- Connection monitoring
- Data Synchronization
- Peer sync protocols
- Node sync protocols
- Data chunk transfer
- State reconciliation
- Conflict resolution

View File

@@ -29,7 +29,7 @@ mod persistence;
pub use crate::config::config::Config;
pub use crate::error::error::BellandeMeshError;
pub use crate::mesh::mesh::BellandeMeshSync;
use crate::mesh::mesh::NetworkStats;
pub use crate::mesh::mesh::NetworkStats;
pub use crate::metrics::metrics::MetricsManager;
pub use crate::node::node::{Node, NodeId, PublicKey};
pub use crate::persistence::persistence::PersistenceManager;

View File

@@ -30,11 +30,11 @@ use std::fs;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::net::{TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket};
use tokio::signal;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio_util::sync::CancellationToken;
// Constants
@@ -83,7 +83,7 @@ pub struct BellandeMeshSync {
https_client: Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>,
stats: Arc<RwLock<NetworkStats>>,
metrics: Arc<MetricsManager>,
message_tx: mpsc::Sender<(Message, SocketAddr)>,
message_tx: Arc<Mutex<mpsc::Sender<(Message, SocketAddr)>>>,
message_rx: Arc<Mutex<mpsc::Receiver<(Message, SocketAddr)>>>,
cancel_token: CancellationToken,
}
@@ -136,7 +136,7 @@ impl BellandeMeshSync {
https_client,
stats,
metrics,
message_tx,
message_tx: Arc::new(Mutex::new(message_tx)),
message_rx: Arc::new(Mutex::new(message_rx)),
cancel_token: CancellationToken::new(),
})
@@ -169,7 +169,7 @@ impl BellandeMeshSync {
https_client,
stats,
metrics,
message_tx,
message_tx: Arc::new(Mutex::new(message_tx)),
message_rx: Arc::new(Mutex::new(message_rx)),
cancel_token: CancellationToken::new(),
})
@@ -204,12 +204,10 @@ impl BellandeMeshSync {
}
pub async fn start(&self) -> Result<(), BellandeMeshError> {
let mut running = self
.running
.write()
.map_err(|_| BellandeMeshError::LockError)?;
*running = true;
drop(running);
{
let mut running = self.running.write().await;
*running = true;
}
self.start_message_handler().await?;
self.start_protocol_listeners().await?;
@@ -219,10 +217,7 @@ impl BellandeMeshSync {
}
pub async fn stop(&self) -> Result<(), BellandeMeshError> {
let mut running = self
.running
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut running = self.running.write().await;
*running = false;
Ok(())
}
@@ -230,8 +225,16 @@ impl BellandeMeshSync {
pub async fn start_message_handler(&self) -> Result<(), BellandeMeshError> {
let handler = self.clone();
tokio::spawn(async move {
let mut rx = handler.message_rx.lock().await;
while let Some((message, addr)) = rx.recv().await {
loop {
let message = {
let mut rx = handler.message_rx.lock().await;
match rx.recv().await {
Some(msg) => msg,
None => break,
}
};
let (message, addr) = message;
if let Err(e) = handler.handle_message_internal(message, addr).await {
eprintln!("Message handling error from {}: {}", addr, e);
}
@@ -254,18 +257,28 @@ impl BellandeMeshSync {
.listen_address
.parse::<SocketAddr>()
.map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?;
let listener = TokioTcpListener::bind(addr).await?;
let handler = self.clone();
tokio::spawn(async move {
while handler.is_running().unwrap_or(false) {
if let Ok((stream, addr)) = listener.accept().await {
let handler = handler.clone();
tokio::spawn(async move {
if let Err(e) = handler.handle_tcp_connection(stream).await {
eprintln!("TCP error from {}: {}", addr, e);
loop {
match handler.is_running().await {
Ok(true) => {
if let Ok((stream, addr)) = listener.accept().await {
let handler = handler.clone();
tokio::spawn(async move {
if let Err(e) = handler.handle_tcp_connection(stream).await {
eprintln!("TCP error from {}: {}", addr, e);
}
});
}
});
}
Ok(false) => break,
Err(e) => {
eprintln!("Error checking running state: {}", e);
break;
}
}
}
});
@@ -371,7 +384,7 @@ impl BellandeMeshSync {
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config);
tokio::spawn(async move {
while handler.is_running().unwrap_or(false) {
while handler.is_running().await.unwrap_or(false) {
if let Ok((stream, addr)) = listener.accept().await {
let handler = handler.clone();
let acceptor = tls_acceptor.clone();
@@ -492,10 +505,14 @@ impl BellandeMeshSync {
let (mut reader, mut writer) = tokio::io::split(stream);
let mut buf = [0u8; MAX_MESSAGE_SIZE];
while self.is_running()? {
loop {
if !self.is_running().await? {
break;
}
match self.read_async_message(&mut reader, &mut buf).await {
Ok(message) => {
self.update_stats(|stats| stats.total_messages += 1);
self.update_stats(|stats| stats.total_messages += 1).await?;
if let Err(e) = self.write_async_message(&mut writer, &message).await {
eprintln!("Write error: {}", e);
break;
@@ -694,19 +711,20 @@ impl BellandeMeshSync {
vec![],
);
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
if let Some(node) = nodes.iter().find(|n| n.address == src) {
let mut data = node
.data
.write()
.map_err(|_| BellandeMeshError::LockError)?;
data.insert(NodeId::from_bytes(&key), chunk);
let target_addr = {
let nodes = self.nodes.read().await;
if let Some(node) = nodes.iter().find(|n| n.address == src) {
let mut data = node.data.write().await;
data.insert(NodeId::from_bytes(&key), chunk);
Some(node.address)
} else {
None
}
};
// Replicate to closest nodes
if target_addr.is_some() {
let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3).await;
for target_node in closest_nodes {
let store_msg = Message::Store {
key: key.clone(),
@@ -714,6 +732,7 @@ impl BellandeMeshSync {
sender: self.get_local_id().await?,
token: rand::random(),
};
if let Err(e) = self.send_message(target_node.address, &store_msg).await {
eprintln!(
"Failed to replicate store to {}: {}",
@@ -747,26 +766,24 @@ impl BellandeMeshSync {
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let node_id = NodeId::from_bytes(&key);
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
// If node.data is tokio::sync::RwLock
for node in nodes.iter() {
if let Ok(data) = node.data.read() {
if let Some(chunk) = data.get(&node_id) {
let response = Message::Value {
key: key.clone(),
value: chunk.content.clone(),
sender: self.get_local_id().await?,
token: rand::random(),
};
return self.send_message(src, &response).await;
}
let data = node.data.read().await;
if let Some(chunk) = data.get(&node_id) {
let response = Message::Value {
key: key.clone(),
value: chunk.content.clone(),
sender: self.get_local_id().await?,
token: rand::random(),
};
return self.send_message(src, &response).await;
}
}
// If not found, return closest nodes
drop(nodes);
let closest_nodes = self.find_closest_nodes(&node_id, 20).await;
let response = Message::Nodes {
nodes: closest_nodes,
@@ -806,15 +823,12 @@ impl BellandeMeshSync {
vec![],
);
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut nodes = self.nodes.write().await;
if let Some(node) = nodes.first_mut() {
if let Ok(mut data) = node.data.write() {
data.insert(NodeId::from_bytes(&key), chunk);
self.metrics.increment_sync_operations();
}
// If node.data is tokio::sync::RwLock
let mut data = node.data.write().await;
data.insert(NodeId::from_bytes(&key), chunk);
self.metrics.increment_sync_operations();
}
Ok(())
}
@@ -823,10 +837,7 @@ impl BellandeMeshSync {
where
F: FnOnce(&mut NetworkStats),
{
let mut stats = self
.stats
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut stats = self.stats.write().await;
updater(&mut stats);
Ok(())
}
@@ -862,10 +873,7 @@ impl BellandeMeshSync {
}
pub async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> {
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut nodes = self.nodes.write().await;
if let Some(node) = nodes.iter_mut().find(|n| n.address == addr) {
node.update_last_seen();
}
@@ -900,13 +908,10 @@ impl BellandeMeshSync {
let new_node = Node::new(id, addr, public_key);
{
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut nodes = self.nodes.write().await;
if !nodes.iter().any(|n| n.id == new_node.id) {
nodes.push(new_node.clone());
self.update_stats(|stats| stats.active_nodes += 1);
self.update_stats(|stats| stats.active_nodes += 1).await?;
}
}
@@ -915,10 +920,7 @@ impl BellandeMeshSync {
}
pub async fn handle_data_sync(&self, chunks: Vec<DataChunk>) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
for chunk in chunks {
if let Some(node) = nodes.iter().find(|n| n.id == chunk.author) {
@@ -934,10 +936,7 @@ impl BellandeMeshSync {
ids: &[NodeId],
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
let mut chunks = Vec::new();
for node in nodes.iter() {
@@ -968,10 +967,7 @@ impl BellandeMeshSync {
sender: NodeId,
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut nodes = self.nodes.write().await;
if let Some(node) = nodes.iter_mut().find(|n| n.id == sender) {
node.update_last_seen();
}
@@ -1013,17 +1009,14 @@ impl BellandeMeshSync {
pub async fn sync_with_peers(&self) -> Result<(), BellandeMeshError> {
let nodes = {
let nodes_guard = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes_guard = self.nodes.read().await;
nodes_guard.to_vec()
};
for node in nodes {
if let Ok(mut stream) = TcpStream::connect(node.address) {
let chunks = {
let data = node.data.read().map_err(|_| BellandeMeshError::LockError)?;
let data = node.data.read().await; // Assuming node.data is also tokio::sync::RwLock
data.keys().cloned().collect::<Vec<_>>()
};
@@ -1043,7 +1036,8 @@ impl BellandeMeshSync {
}
}
self.update_stats(|stats| stats.last_sync = SystemTime::now());
self.update_stats(|stats| stats.last_sync = SystemTime::now())
.await;
Ok(())
}
@@ -1057,7 +1051,7 @@ impl BellandeMeshSync {
// Find the target node
if let Some(node) = nodes.iter().find(|n| n.id == node_id) {
// Get write lock on node's data
// Get write lock on node's data (using tokio::sync::RwLock)
let mut data = node.data.write().await;
// Insert all chunks
@@ -1088,7 +1082,12 @@ impl BellandeMeshSync {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(interval));
while let Ok(true) = running.read().map(|guard| *guard) {
loop {
let is_running = *running.read().await;
if !is_running {
break;
}
interval.tick().await;
metrics.increment_sync_operations();
}
@@ -1098,10 +1097,7 @@ impl BellandeMeshSync {
}
pub async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError> {
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut nodes = self.nodes.write().await;
if nodes.len() > max_conn {
nodes.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
nodes.truncate(max_conn);
@@ -1110,10 +1106,7 @@ impl BellandeMeshSync {
}
pub async fn broadcast_data(&self, data: Vec<u8>) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
let local_id = self.get_local_id().await?;
let message = Message::Store {
@@ -1133,14 +1126,8 @@ impl BellandeMeshSync {
}
pub async fn get_network_stats(&self) -> Result<NetworkStats, BellandeMeshError> {
let stats = self
.stats
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let stats = self.stats.read().await;
let nodes = self.nodes.read().await;
Ok(NetworkStats {
tcp_connections: stats.tcp_connections,
@@ -1159,10 +1146,7 @@ impl BellandeMeshSync {
node_id: NodeId,
data: Vec<u8>,
) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
let node = nodes
.iter()
.find(|n| n.id == node_id)
@@ -1181,10 +1165,7 @@ impl BellandeMeshSync {
pub async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> {
let timeout = Duration::from_secs(self.config.node_timeout);
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
let mut nodes = self.nodes.write().await;
let initial_count = nodes.len();
nodes.retain(|node| {
@@ -1197,17 +1178,15 @@ impl BellandeMeshSync {
let removed = initial_count - nodes.len();
if removed > 0 {
self.update_stats(|stats| stats.active_nodes -= removed);
self.update_stats(|stats| stats.active_nodes -= removed)
.await?;
}
Ok(())
}
pub async fn send_ping_to_all_nodes(&self) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
let local_id = self.get_local_id().await?;
let token = rand::random::<u64>();
let ping = Message::Ping {
@@ -1241,34 +1220,26 @@ impl BellandeMeshSync {
}
pub async fn get_status(&self) -> String {
let stats = self
.stats
.read()
.map(|guard| guard.clone())
.unwrap_or_else(|_| NetworkStats::default());
let stats = self.stats.read().await.clone();
serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "Error getting status".to_string())
}
pub async fn is_running(&self) -> Result<bool, BellandeMeshError> {
self.running
.read()
.map_err(|_| BellandeMeshError::LockError)
.map(|guard| *guard)
Ok(*self.running.read().await)
}
pub async fn get_local_id(&self) -> Result<NodeId, BellandeMeshError> {
self.nodes
Ok(self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)
.map(|nodes| nodes.first().map(|n| n.id).unwrap_or_else(|| NodeId::new()))
.await
.first()
.map(|n| n.id)
.unwrap_or_else(|| NodeId::new()))
}
pub async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
let message = Message::JoinResponse {
accepted: true,
nodes: vec![new_node.clone()],
@@ -1281,48 +1252,28 @@ impl BellandeMeshSync {
}
}
}
Ok(())
}
pub async fn broadcast_message(&self, message: Message) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
for node in nodes.iter() {
if let Err(e) = self.send_message(node.address, &message).await {
eprintln!("Failed to broadcast message to {}: {}", node.address, e);
}
}
Ok(())
}
pub async fn get_node_count(&self) -> Result<usize, BellandeMeshError> {
Ok(self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?
.len())
Ok(self.nodes.read().await.len())
}
pub async fn get_node_list(&self) -> Result<Vec<NodeId>, BellandeMeshError> {
Ok(self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?
.iter()
.map(|node| node.id)
.collect())
Ok(self.nodes.read().await.iter().map(|node| node.id).collect())
}
pub async fn is_node_connected(&self, node_id: &NodeId) -> Result<bool, BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self.nodes.read().await;
Ok(nodes.iter().any(|node| &node.id == node_id))
}
}
@@ -1338,7 +1289,7 @@ impl Clone for BellandeMeshSync {
https_client: self.https_client.clone(),
stats: Arc::clone(&self.stats),
metrics: Arc::clone(&self.metrics),
message_tx: self.message_tx.clone(),
message_tx: Arc::clone(&self.message_tx),
message_rx: Arc::clone(&self.message_rx),
cancel_token: CancellationToken::new(),
}

View File

@@ -19,8 +19,9 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Serialize, Deserialize, PartialOrd, Ord)]
pub struct NodeId([u8; 32]);
@@ -167,32 +168,34 @@ impl Node {
}
pub fn add_data_chunk(&self, chunk: DataChunk) -> bool {
match self.data.write() {
Ok(mut data) => {
data.insert(chunk.id, chunk);
true
}
Err(_) => false,
}
let data = self.data.clone();
tokio::runtime::Handle::current().block_on(async move {
let mut guard = data.write().await;
guard.insert(chunk.id, chunk);
true
})
}
pub fn get_data_chunk(&self, id: &NodeId) -> Option<DataChunk> {
self.data.read().ok()?.get(id).cloned()
let data = self.data.clone();
tokio::runtime::Handle::current().block_on(async move {
let guard = data.read().await;
guard.get(id).cloned()
})
}
pub fn remove_old_data(&self, max_age: Duration) -> bool {
match self.data.write() {
Ok(mut data) => {
let now = SystemTime::now();
data.retain(|_, chunk| {
now.duration_since(chunk.last_modified)
.map(|age| age <= max_age)
.unwrap_or(false)
});
true
}
Err(_) => false,
}
let data = self.data.clone();
tokio::runtime::Handle::current().block_on(async move {
let mut guard = data.write().await;
let now = SystemTime::now();
guard.retain(|_, chunk| {
now.duration_since(chunk.last_modified)
.map(|age| age <= max_age)
.unwrap_or(false)
});
true
})
}
pub fn is_alive(&self, timeout: Duration) -> bool {

View File

@@ -22,8 +22,9 @@ use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tokio::sync::RwLock;
#[derive(Serialize, Deserialize)]
struct SerializableNode {