From f8c3b3e408ebae44cce844181b2088f2821f2234 Mon Sep 17 00:00:00 2001 From: RonaldsonBellande <ronaldsonbellande@gmail.com> Date: Wed, 23 Oct 2024 17:18:01 -0400 Subject: [PATCH] latest pushes --- .gitignore | 4 + Cargo.toml | 23 ++ README.md | 1 + dependencies.bellande | 17 ++ make_rust_executable.bellos | 1 + make_rust_executable.sh | 1 + src/bellande_mesh_sync.rs | 42 ++++ src/config/config.rs | 40 ++++ src/config/mod.rs | 1 + src/data/data.rs | 49 +++++ src/data/mod.rs | 1 + src/dht/dht.rs | 392 +++++++++++++++++++++++++++++++++ src/dht/mod.rs | 1 + src/encryption/encryption.rs | 169 ++++++++++++++ src/encryption/mod.rs | 1 + src/error/error.rs | 84 +++++++ src/error/mod.rs | 1 + src/mesh/mesh.rs | 332 ++++++++++++++++++++++++++++ src/mesh/mod.rs | 1 + src/metrics/metrics.rs | 141 ++++++++++++ src/metrics/mod.rs | 1 + src/node/mod.rs | 1 + src/node/node.rs | 242 ++++++++++++++++++++ src/persistence/mod.rs | 1 + src/persistence/persistence.rs | 192 ++++++++++++++++ src/tests/mod.rs | 1 + src/tests/tests.rs | 127 +++++++++++ src/utilities/mod.rs | 1 + src/utilities/utilities.rs | 36 +++ 29 files changed, 1904 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 dependencies.bellande create mode 100755 make_rust_executable.bellos create mode 100755 make_rust_executable.sh create mode 100644 src/bellande_mesh_sync.rs create mode 100644 src/config/config.rs create mode 100644 src/config/mod.rs create mode 100644 src/data/data.rs create mode 100644 src/data/mod.rs create mode 100644 src/dht/dht.rs create mode 100644 src/dht/mod.rs create mode 100644 src/encryption/encryption.rs create mode 100644 src/encryption/mod.rs create mode 100644 src/error/error.rs create mode 100644 src/error/mod.rs create mode 100644 src/mesh/mesh.rs create mode 100644 src/mesh/mod.rs create mode 100644 src/metrics/metrics.rs create mode 100644 src/metrics/mod.rs create mode 100644 src/node/mod.rs create mode 100644 src/node/node.rs create mode 100644 src/persistence/mod.rs create mode 100644 src/persistence/persistence.rs create mode 100644 src/tests/mod.rs create mode 100644 src/tests/tests.rs create mode 100644 src/utilities/mod.rs create mode 100644 src/utilities/utilities.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1a33982 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +target +git_scripts +.env +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9da4e7f --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "bellande_mesh_sync" +version = "0.0.1" +edition = "2021" +authors = ["Ronaldson Bellande <ronaldsonbellande@gmail.com>"] +description = "Advanced peer-to-peer data synchronization system for distributed applications" +license = "GPL-3.0-or-later" +repository = "https://github.com/Architecture-Mechanism/bellande_mesh_sync" +documentation = "https://bellande-architecture-mechanism-research-innovation-center.org/bellande_mesh_sync/docs" +readme = "README.md" +keywords = ["p2p", "synchronization", "distributed-systems", "mesh-network", "bellande_mesh_sync"] +categories = ["network-programming", "asynchronous"] + +[lib] +path = "src/bellande_mesh_sync.rs" + +[dependencies] +tokio = { version = "1.28", features = ["rt", "net", "time", "sync", "macros"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +bincode = "1.3" +rand = "0.8" +uuid = { version = "1.3", features = ["v4", "serde"] } diff --git a/README.md b/README.md index b95b007..f8ff6da 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Bellande Mesh Sync +- a comprehensive data synchronization system ## License Bellande Mesh Sync is distributed under the [GNU General Public License v3.0](https://www.gnu.org/licenses/gpl-3.0.en.html), see [LICENSE](https://github.com/Architecture-Mechanism/bellande_mesh_sync/blob/main/LICENSE) and [NOTICE](https://github.com/Architecture-Mechanism/bellande_mesh_sync/blob/main/LICENSE) for more information. diff --git a/dependencies.bellande b/dependencies.bellande new file mode 100644 index 0000000..492e2c4 --- /dev/null +++ b/dependencies.bellande @@ -0,0 +1,17 @@ +tokio: "1.28" + features: + - rt + - net + - time + - sync + - macros +serde: "1.0" + features: + - derive +serde_json: "1.0" +bincode: "1.3" +rand: "0.8" +uuid: "1.3" + features: + - v4 + - serde diff --git a/make_rust_executable.bellos b/make_rust_executable.bellos new file mode 100755 index 0000000..b00be0c --- /dev/null +++ b/make_rust_executable.bellos @@ -0,0 +1 @@ +bellande_rust_executable -d dependencies.bellande -s src -m bellande_mesh_sync.rs -o executable/bellande_mesh_sync diff --git a/make_rust_executable.sh b/make_rust_executable.sh new file mode 100755 index 0000000..b00be0c --- /dev/null +++ b/make_rust_executable.sh @@ -0,0 +1 @@ +bellande_rust_executable -d dependencies.bellande -s src -m bellande_mesh_sync.rs -o executable/bellande_mesh_sync diff --git a/src/bellande_mesh_sync.rs b/src/bellande_mesh_sync.rs new file mode 100644 index 0000000..54f35f7 --- /dev/null +++ b/src/bellande_mesh_sync.rs @@ -0,0 +1,42 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +mod config; +mod data; +mod dht; +mod encryption; +mod error; +mod mesh; +mod metrics; +mod node; +mod persistence; + +pub use crate::config::config::Config; +pub use crate::error::error::BellandeMeshError; +pub use crate::mesh::mesh::BellandeMeshSync; + +/// Initialize the BellandeMeshSync System +/// This function takes a configuration and sets up the BellandeMeshSync System. +/// It returns a BellandeMeshSync instance that can be used to interact with the mesh network. +pub async fn init(config: Config) -> Result<BellandeMeshSync, BellandeMeshError> { + let bellande_mesh = BellandeMeshSync::new(&config)?; + Ok(bellande_mesh) +} + +/// Start the BellandeMeshSync System +/// This function takes a BellandeMeshSync instance and starts the mesh network operations. +pub async fn start(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> { + bellande_mesh.start() +} diff --git a/src/config/config.rs b/src/config/config.rs new file mode 100644 index 0000000..0e7c4f9 --- /dev/null +++ b/src/config/config.rs @@ -0,0 +1,40 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use serde::{Deserialize, Serialize}; +use std::fs; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Config { + pub db_url: String, + pub listen_address: String, + pub bootstrap_nodes: Vec<String>, + pub sync_interval: u64, + pub node_timeout: u64, +} + +impl Config { + pub fn load(path: &str) -> Result<Self, Box<dyn std::error::Error>> { + let config_str = fs::read_to_string(path)?; + Ok(serde_json::from_str(&config_str)?) + } + pub fn sync_interval(&self) -> Option<u64> { + Some(self.sync_interval) + } + + pub fn node_timeout(&self) -> Option<u64> { + Some(self.node_timeout) + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs new file mode 100644 index 0000000..ef68c36 --- /dev/null +++ b/src/config/mod.rs @@ -0,0 +1 @@ +pub mod config; diff --git a/src/data/data.rs b/src/data/data.rs new file mode 100644 index 0000000..09240c1 --- /dev/null +++ b/src/data/data.rs @@ -0,0 +1,49 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use serde::{Deserialize, Serialize}; +use std::time::SystemTime; +use uuid::Uuid; + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct DataChunk { + pub id: Uuid, + pub content: Vec<u8>, + pub checksum: String, + pub version: u64, + pub last_modified: SystemTime, + pub author: Uuid, + pub parent_versions: Vec<u64>, +} + +impl DataChunk { + pub fn new( + content: Vec<u8>, + checksum: String, + version: u64, + author: Uuid, + parent_versions: Vec<u64>, + ) -> Self { + Self { + id: Uuid::new_v4(), + content, + checksum, + version, + last_modified: SystemTime::now(), + author, + parent_versions, + } + } +} diff --git a/src/data/mod.rs b/src/data/mod.rs new file mode 100644 index 0000000..7a345e4 --- /dev/null +++ b/src/data/mod.rs @@ -0,0 +1 @@ +pub mod data; diff --git a/src/dht/dht.rs b/src/dht/dht.rs new file mode 100644 index 0000000..fd0397f --- /dev/null +++ b/src/dht/dht.rs @@ -0,0 +1,392 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::error::error::BellandeMeshError; +use crate::node::node::{Message, Node, NodeId, PublicKey}; +use rand::Rng; +use std::collections::{HashMap, HashSet}; +use std::io; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::{Arc, Mutex, RwLock}; +use std::thread; +use std::time::{Duration, Instant, SystemTime}; + +const ALPHA: usize = 3; +const K: usize = 20; +const BUCKET_REFRESH_INTERVAL: Duration = Duration::from_secs(3600); +const REPLICATION_INTERVAL: Duration = Duration::from_secs(3600); +const NODE_TIMEOUT: Duration = Duration::from_secs(900); +const PING_INTERVAL: Duration = Duration::from_secs(300); + +#[derive(Clone, Debug)] +struct KBucket { + nodes: Vec<Node>, + last_updated: Instant, +} + +impl KBucket { + fn new() -> Self { + Self { + nodes: Vec::with_capacity(K), + last_updated: Instant::now(), + } + } + + fn add_node(&mut self, node: Node) { + if let Some(existing) = self.nodes.iter_mut().find(|n| n.id == node.id) { + existing.last_seen = node.last_seen; + existing.rtt = node.rtt; + return; + } + + if self.nodes.len() < K { + self.nodes.push(node); + } else if let Some(pos) = self.nodes.iter().position(|n| n.failed_queries > 0) { + self.nodes[pos] = node; + } + + self.last_updated = Instant::now(); + self.nodes.sort_by_key(|n| n.rtt); + } + + fn remove_node(&mut self, id: &NodeId) { + self.nodes.retain(|n| n.id != *id); + } + + fn get_nodes(&self) -> Vec<Node> { + self.nodes.clone() + } +} + +#[derive(Clone, Debug)] +pub struct StorageValue { + pub data: Vec<u8>, + pub timestamp: SystemTime, + pub ttl: Duration, +} + +impl StorageValue { + pub fn new(data: Vec<u8>, ttl: Duration) -> Self { + Self { + data, + timestamp: SystemTime::now(), + ttl, + } + } + + pub fn is_expired(&self) -> bool { + SystemTime::now() + .duration_since(self.timestamp) + .map(|elapsed| elapsed >= self.ttl) + .unwrap_or(true) + } +} + +struct RoutingTable { + buckets: Vec<KBucket>, + local_id: NodeId, +} + +impl RoutingTable { + fn new(local_id: NodeId) -> Self { + Self { + buckets: (0..256).map(|_| KBucket::new()).collect(), + local_id, + } + } + + fn add_node(&mut self, node: Node) { + let bucket_idx = self.bucket_index(&node.id); + self.buckets[bucket_idx].add_node(node); + } + + fn get_closest_nodes(&self, target: &NodeId) -> Vec<Node> { + let mut nodes = Vec::new(); + for bucket in &self.buckets { + nodes.extend(bucket.get_nodes()); + } + + nodes.sort_by_cached_key(|node| node.id.distance(target)); + nodes.truncate(K); + nodes + } + + fn bucket_index(&self, id: &NodeId) -> usize { + let distance = self.local_id.distance(id); + let distance_bytes = distance.as_ref(); + let mut leading_zeros = 0; + + for &byte in distance_bytes { + if byte == 0 { + leading_zeros += 8; + } else { + leading_zeros += byte.leading_zeros() as usize; + break; + } + } + + (255 - leading_zeros).min(255) + } +} + +pub struct DhtWrapper { + routing_table: Arc<RwLock<RoutingTable>>, + storage: Arc<RwLock<HashMap<Vec<u8>, StorageValue>>>, + socket: Arc<UdpSocket>, + pending_requests: Arc<Mutex<HashMap<u64, Instant>>>, +} + +impl DhtWrapper { + pub fn new(bind_addr: SocketAddr, local_key: &[u8]) -> Result<Self, BellandeMeshError> { + let socket = UdpSocket::bind(bind_addr)?; + socket.set_nonblocking(true)?; + + let local_id = NodeId::from_bytes(local_key); + let routing_table = RoutingTable::new(local_id); + + let wrapper = Self { + routing_table: Arc::new(RwLock::new(routing_table)), + storage: Arc::new(RwLock::new(HashMap::new())), + socket: Arc::new(socket), + pending_requests: Arc::new(Mutex::new(HashMap::new())), + }; + + wrapper.start_background_tasks(); + Ok(wrapper) + } + + fn start_background_tasks(&self) { + let dht = self.clone(); + thread::spawn(move || loop { + thread::sleep(PING_INTERVAL); + if let Err(e) = dht.maintenance_task() { + eprintln!("Maintenance error: {}", e); + } + }); + + let dht = self.clone(); + thread::spawn(move || loop { + if let Err(e) = dht.handle_incoming_messages() { + eprintln!("Message handling error: {}", e); + thread::sleep(Duration::from_millis(100)); + } + }); + } + + fn maintenance_task(&self) -> Result<(), BellandeMeshError> { + // Clean up expired requests + { + let mut requests = self + .pending_requests + .lock() + .map_err(|_| BellandeMeshError::LockError)?; + requests.retain(|_, time| time.elapsed() < Duration::from_secs(60)); + } + + // Get outdated buckets + let outdated_buckets = { + let table = self + .routing_table + .read() + .map_err(|_| BellandeMeshError::LockError)?; + table + .buckets + .iter() + .enumerate() + .filter(|(_, bucket)| bucket.last_updated.elapsed() > BUCKET_REFRESH_INTERVAL) + .map(|(i, _)| i) + .collect::<Vec<_>>() + }; + + // Refresh outdated buckets + for i in outdated_buckets { + let target_id = NodeId::new(); + self.find_node(&target_id)?; + } + + Ok(()) + } + + fn handle_incoming_messages(&self) -> Result<(), BellandeMeshError> { + let mut buf = vec![0u8; 65536]; + loop { + match self.socket.recv_from(&mut buf) { + Ok((len, src)) => { + let message = bincode::deserialize(&buf[..len]) + .map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?; + self.handle_message(message, src)?; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + continue; + } + Err(e) => return Err(BellandeMeshError::IoError(e)), + } + } + } + + fn handle_message(&self, message: Message, src: SocketAddr) -> Result<(), BellandeMeshError> { + match message { + Message::Ping { sender, token } => { + let response = Message::Pong { + sender: self.get_local_id()?, + token, + }; + self.send_message(&src, &response)?; + } + Message::Pong { sender, token } => { + if let Some(start_time) = self + .pending_requests + .lock() + .map_err(|_| BellandeMeshError::LockError)? + .remove(&token) + { + let rtt = start_time.elapsed(); + let mut node = Node::new(sender, src, PublicKey::new([0; 32])); + node.update_rtt(rtt); + self.routing_table + .write() + .map_err(|_| BellandeMeshError::LockError)? + .add_node(node); + } + } + Message::FindNode { + sender, + target, + token, + } => { + let nodes = self.get_closest_nodes(&target)?; + let response = Message::Nodes { + sender: self.get_local_id()?, + nodes, + token, + }; + self.send_message(&src, &response)?; + } + Message::Store { + sender, + key, + value, + token, + } => { + self.store(&key, &value, REPLICATION_INTERVAL)?; + let response = Message::Pong { + sender: self.get_local_id()?, + token, + }; + self.send_message(&src, &response)?; + } + _ => {} + } + Ok(()) + } + + fn send_message(&self, addr: &SocketAddr, msg: &Message) -> Result<(), BellandeMeshError> { + let data = + bincode::serialize(msg).map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; + self.socket.send_to(&data, addr)?; + Ok(()) + } + + fn get_local_id(&self) -> Result<NodeId, BellandeMeshError> { + Ok(self + .routing_table + .read() + .map_err(|_| BellandeMeshError::LockError)? + .local_id + .clone()) + } + + fn get_closest_nodes(&self, target: &NodeId) -> Result<Vec<Node>, BellandeMeshError> { + Ok(self + .routing_table + .read() + .map_err(|_| BellandeMeshError::LockError)? + .get_closest_nodes(target)) + } + + pub fn store(&self, key: &[u8], value: &[u8], ttl: Duration) -> Result<(), BellandeMeshError> { + let storage_value = StorageValue::new(value.to_vec(), ttl); + let mut storage = self + .storage + .write() + .map_err(|_| BellandeMeshError::LockError)?; + storage.insert(key.to_vec(), storage_value); + + let target = NodeId::from_bytes(key); + let closest_nodes = self.get_closest_nodes(&target)?; + + for node in closest_nodes { + let token = rand::thread_rng().gen(); + let msg = Message::Store { + sender: self.get_local_id()?, + key: key.to_vec(), + value: value.to_vec(), + token, + }; + let _ = self.send_message(&node.address, &msg); + } + + Ok(()) + } + + fn cleanup_storage(&self) -> Result<(), BellandeMeshError> { + let mut storage = self + .storage + .write() + .map_err(|_| BellandeMeshError::LockError)?; + storage.retain(|_, value| !value.is_expired()); + Ok(()) + } + + pub fn find_node(&self, target: &NodeId) -> Result<Vec<Node>, BellandeMeshError> { + let closest = self.get_closest_nodes(target)?; + let mut queried_nodes = HashSet::new(); + let mut results = Vec::new(); + + for node in closest { + if queried_nodes.contains(&node.id) { + continue; + } + + let token = rand::thread_rng().gen(); + let msg = Message::FindNode { + sender: self.get_local_id()?, + target: target.clone(), + token, + }; + + if let Ok(()) = self.send_message(&node.address, &msg) { + queried_nodes.insert(node.id); + results.push(node); + } + } + + results.sort_by_cached_key(|node| node.id.distance(target)); + results.truncate(K); + Ok(results) + } +} + +impl Clone for DhtWrapper { + fn clone(&self) -> Self { + Self { + routing_table: Arc::clone(&self.routing_table), + storage: Arc::clone(&self.storage), + socket: Arc::clone(&self.socket), + pending_requests: Arc::clone(&self.pending_requests), + } + } +} diff --git a/src/dht/mod.rs b/src/dht/mod.rs new file mode 100644 index 0000000..01d8155 --- /dev/null +++ b/src/dht/mod.rs @@ -0,0 +1 @@ +pub mod dht; diff --git a/src/encryption/encryption.rs b/src/encryption/encryption.rs new file mode 100644 index 0000000..3ebb48e --- /dev/null +++ b/src/encryption/encryption.rs @@ -0,0 +1,169 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use std::collections::hash_map::DefaultHasher; +use std::error::Error; +use std::fmt; +use std::hash::{Hash, Hasher}; + +#[derive(Debug)] +pub enum EncryptionError { + InvalidKeyLength, + InvalidDataFormat, + EncryptionError, + DecryptionError, +} + +impl fmt::Display for EncryptionError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + EncryptionError::InvalidKeyLength => write!(f, "Invalid key length"), + EncryptionError::InvalidDataFormat => write!(f, "Invalid data format"), + EncryptionError::EncryptionError => write!(f, "Encryption error"), + EncryptionError::DecryptionError => write!(f, "Decryption error"), + } + } +} + +impl Error for EncryptionError {} + +#[derive(Clone)] +pub struct PublicKey([u8; 32]); + +pub struct PrivateKey([u8; 32]); + +pub struct Signature([u8; 64]); + +impl PublicKey { + pub fn to_bytes(&self) -> [u8; 32] { + self.0 + } +} + +pub struct EncryptionManager { + public_key: PublicKey, + private_key: PrivateKey, +} + +impl EncryptionManager { + pub fn new() -> Self { + let private_key = Self::generate_random_bytes(); + let public_key = Self::derive_public_key(&private_key); + + Self { + public_key: PublicKey(public_key), + private_key: PrivateKey(private_key), + } + } + + pub fn encrypt(&self, data: &[u8], shared_secret: &[u8]) -> Result<Vec<u8>, EncryptionError> { + let key = self.derive_key(shared_secret)?; + let nonce = Self::generate_random_bytes(); + + let mut result = nonce.to_vec(); + result.extend_from_slice(data); + + for (i, byte) in result.iter_mut().enumerate().skip(12) { + *byte ^= key[i % key.len()]; + } + + Ok(result) + } + + pub fn decrypt( + &self, + encrypted_data: &[u8], + shared_secret: &[u8], + ) -> Result<Vec<u8>, EncryptionError> { + if encrypted_data.len() < 12 { + return Err(EncryptionError::InvalidDataFormat); + } + + let key = self.derive_key(shared_secret)?; + let mut decrypted = encrypted_data[12..].to_vec(); + + for (i, byte) in decrypted.iter_mut().enumerate() { + *byte ^= key[i % key.len()]; + } + + Ok(decrypted) + } + + pub fn sign(&self, data: &[u8]) -> Signature { + let mut hasher = DefaultHasher::new(); + data.hash(&mut hasher); + self.private_key.0.hash(&mut hasher); + let hash = hasher.finish(); + + let mut signature = [0u8; 64]; + signature[..8].copy_from_slice(&hash.to_be_bytes()); + Signature(signature) + } + + pub fn verify(&self, public_key: &PublicKey, data: &[u8], signature: &Signature) -> bool { + let mut hasher = DefaultHasher::new(); + data.hash(&mut hasher); + public_key.0.hash(&mut hasher); + let hash = hasher.finish(); + + hash.to_be_bytes() == signature.0[..8] + } + + pub fn public_key(&self) -> PublicKey { + self.public_key.clone() + } + + fn derive_key(&self, shared_secret: &[u8]) -> Result<Vec<u8>, EncryptionError> { + if shared_secret.len() < 32 { + return Err(EncryptionError::InvalidKeyLength); + } + let mut hasher = DefaultHasher::new(); + shared_secret.hash(&mut hasher); + let hash = hasher.finish(); + Ok(hash.to_be_bytes().to_vec()) + } + + fn derive_public_key(private_key: &[u8; 32]) -> [u8; 32] { + let mut hasher = DefaultHasher::new(); + private_key.hash(&mut hasher); + let hash = hasher.finish(); + let mut public_key = [0u8; 32]; + public_key[..8].copy_from_slice(&hash.to_be_bytes()); + public_key + } + + fn generate_random_bytes() -> [u8; 32] { + let mut bytes = [0u8; 32]; + for byte in &mut bytes { + *byte = Self::generate_random_byte(); + } + bytes + } + + fn generate_random_byte() -> u8 { + use std::time::{SystemTime, UNIX_EPOCH}; + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .subsec_nanos(); + (nanos & 0xFF) as u8 + } +} + +impl Default for EncryptionManager { + fn default() -> Self { + Self::new() + } +} diff --git a/src/encryption/mod.rs b/src/encryption/mod.rs new file mode 100644 index 0000000..81c4253 --- /dev/null +++ b/src/encryption/mod.rs @@ -0,0 +1 @@ +pub mod encryption; diff --git a/src/error/error.rs b/src/error/error.rs new file mode 100644 index 0000000..624480f --- /dev/null +++ b/src/error/error.rs @@ -0,0 +1,84 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use std::array::TryFromSliceError; +use std::error::Error; +use std::fmt; + +#[derive(Debug)] +pub enum BellandeMeshError { + IoError(std::io::Error), + LockError, + ConversionError, + PersistenceError(String), + InvalidAddress, + ProtocolError(String), + Serialization(String), + Database(String), + Encryption(String), + Authentication, + NodeNotFound, + Dht(String), + RateLimitExceeded, + ConflictResolution, + Migration(String), + Deserialization(String), + NetworkError(String), + ArrayConversionError(TryFromSliceError), + Timeout, +} + +impl fmt::Display for BellandeMeshError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BellandeMeshError::IoError(e) => write!(f, "IO error: {}", e), + BellandeMeshError::LockError => write!(f, "Lock acquisition failed"), + BellandeMeshError::ConversionError => write!(f, "Type conversion error"), + BellandeMeshError::PersistenceError(e) => write!(f, "Persistence error: {}", e), + BellandeMeshError::InvalidAddress => write!(f, "Invalid address"), + BellandeMeshError::ProtocolError(e) => write!(f, "Protocol error: {}", e), + BellandeMeshError::Serialization(err) => write!(f, "Serialization error: {}", err), + BellandeMeshError::Database(err) => write!(f, "Database error: {}", err), + BellandeMeshError::Encryption(err) => write!(f, "Encryption error: {}", err), + BellandeMeshError::Authentication => write!(f, "Authentication error"), + BellandeMeshError::NodeNotFound => write!(f, "Node not found"), + BellandeMeshError::Dht(err) => write!(f, "DHT error: {}", err), + BellandeMeshError::RateLimitExceeded => write!(f, "Rate limit exceeded"), + BellandeMeshError::ConflictResolution => write!(f, "Conflict resolution error"), + BellandeMeshError::Migration(err) => write!(f, "Migration error: {}", err), + BellandeMeshError::Deserialization(err) => write!(f, "Deserialization error: {}", err), + BellandeMeshError::NetworkError(e) => write!(f, "Network error: {}", e), + BellandeMeshError::ArrayConversionError(e) => { + write!(f, "Array conversion error: {}", e) + } + BellandeMeshError::Timeout => write!(f, "Operation timed out"), + } + } +} + +impl Error for BellandeMeshError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + BellandeMeshError::IoError(err) => Some(err), + _ => None, + } + } +} + +impl From<std::io::Error> for BellandeMeshError { + fn from(err: std::io::Error) -> Self { + BellandeMeshError::IoError(err) + } +} diff --git a/src/error/mod.rs b/src/error/mod.rs new file mode 100644 index 0000000..a91e735 --- /dev/null +++ b/src/error/mod.rs @@ -0,0 +1 @@ +pub mod error; diff --git a/src/mesh/mesh.rs b/src/mesh/mesh.rs new file mode 100644 index 0000000..45ab7c7 --- /dev/null +++ b/src/mesh/mesh.rs @@ -0,0 +1,332 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::config::config::Config; +use crate::error::error::BellandeMeshError; +use crate::node::node::{DataChunk, Message, Node, NodeId, PublicKey}; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; + +pub struct BellandeMeshSync { + config: Arc<Config>, + nodes: Arc<RwLock<Vec<Node>>>, + running: Arc<RwLock<bool>>, +} + +impl BellandeMeshSync { + pub fn new(config: &Config) -> Result<Self, BellandeMeshError> { + Ok(Self { + config: Arc::new(config.clone()), + nodes: Arc::new(RwLock::new(Vec::new())), + running: Arc::new(RwLock::new(true)), + }) + } + + pub fn start(&self) -> Result<(), BellandeMeshError> { + let mut running = self + .running + .write() + .map_err(|_| BellandeMeshError::LockError)?; + *running = true; + drop(running); + + self.start_listener()?; + self.start_maintenance_tasks(); + Ok(()) + } + + pub fn stop(&self) -> Result<(), BellandeMeshError> { + let mut running = self + .running + .write() + .map_err(|_| BellandeMeshError::LockError)?; + *running = false; + Ok(()) + } + + fn is_running(&self) -> Result<bool, BellandeMeshError> { + self.running + .read() + .map_err(|_| BellandeMeshError::LockError) + .map(|guard| *guard) + } + + fn start_listener(&self) -> Result<(), BellandeMeshError> { + let listener = TcpListener::bind(&self.config.listen_address)?; + let mesh = self.clone(); + + thread::spawn(move || { + for stream in listener.incoming() { + if !mesh.is_running().unwrap_or(false) { + break; + } + + match stream { + Ok(stream) => { + let mesh_clone = mesh.clone(); + thread::spawn(move || { + if let Err(e) = mesh_clone.handle_connection(stream) { + eprintln!("Connection error: {}", e); + } + }); + } + Err(e) => eprintln!("Accept error: {}", e), + } + } + }); + + Ok(()) + } + + fn start_maintenance_tasks(&self) { + // Start sync task + let mesh = self.clone(); + thread::spawn(move || { + while let Ok(true) = mesh.is_running() { + if let Err(e) = mesh.sync_with_peers() { + eprintln!("Sync error: {}", e); + } + thread::sleep(Duration::from_secs(60)); + } + }); + + // Start cleanup task + let mesh = self.clone(); + thread::spawn(move || { + while let Ok(true) = mesh.is_running() { + if let Err(e) = mesh.cleanup_dead_nodes() { + eprintln!("Cleanup error: {}", e); + } + thread::sleep(Duration::from_secs(300)); + } + }); + } + + fn handle_connection(&self, mut stream: TcpStream) -> Result<(), BellandeMeshError> { + stream.set_read_timeout(Some(Duration::from_secs(30)))?; + stream.set_write_timeout(Some(Duration::from_secs(30)))?; + + let running = self + .running + .read() + .map_err(|_| BellandeMeshError::LockError)?; + while *running { + let message = self.read_message(&mut stream)?; + self.handle_message(message, &mut stream)?; + } + + Ok(()) + } + + fn read_message(&self, stream: &mut TcpStream) -> Result<Message, BellandeMeshError> { + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf)?; + let len = u32::from_be_bytes(len_buf) as usize; + + let mut msg_buf = vec![0u8; len]; + stream.read_exact(&mut msg_buf)?; + + bincode::deserialize(&msg_buf) + .map_err(|e| BellandeMeshError::Deserialization(e.to_string())) + } + + fn write_message( + &self, + stream: &mut TcpStream, + message: &Message, + ) -> Result<(), BellandeMeshError> { + let data = bincode::serialize(message) + .map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; + let len = (data.len() as u32).to_be_bytes(); + + stream.write_all(&len)?; + stream.write_all(&data)?; + stream.flush()?; + + Ok(()) + } + + fn handle_message( + &self, + message: Message, + stream: &mut TcpStream, + ) -> Result<(), BellandeMeshError> { + match message { + Message::JoinRequest { id, public_key } => { + let peer_addr = stream.peer_addr()?; + self.handle_join_request(id, public_key, peer_addr)?; + + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + let response = Message::JoinResponse { + accepted: true, + nodes: nodes.clone(), + }; + self.write_message(stream, &response)?; + } + Message::DataSync { chunks } => { + self.handle_data_sync(chunks)?; + } + Message::DataRequest { ids } => { + self.handle_data_request(&ids, stream)?; + } + Message::Heartbeat => { + let addr = stream.peer_addr()?; + self.update_node_last_seen(addr)?; + } + _ => {} + } + + Ok(()) + } + + fn handle_join_request( + &self, + id: NodeId, + public_key: PublicKey, + addr: SocketAddr, + ) -> Result<(), BellandeMeshError> { + let new_node = Node::new(id, addr, public_key); + let mut nodes = self + .nodes + .write() + .map_err(|_| BellandeMeshError::LockError)?; + + if !nodes.iter().any(|n| n.id == new_node.id) { + nodes.push(new_node); + } + + Ok(()) + } + + fn handle_data_sync(&self, chunks: Vec<DataChunk>) -> Result<(), BellandeMeshError> { + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + + for chunk in chunks { + if let Some(node) = nodes.iter().find(|n| n.id == chunk.author) { + let _ = node.add_data_chunk(chunk); + } + } + + Ok(()) + } + + fn handle_data_request( + &self, + ids: &[NodeId], + stream: &mut TcpStream, + ) -> Result<(), BellandeMeshError> { + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + let mut chunks = Vec::new(); + + for node in nodes.iter() { + for id in ids { + if let Some(chunk) = node.get_data_chunk(id) { + chunks.push(chunk); + } + } + } + + let response = Message::DataSync { chunks }; + self.write_message(stream, &response)?; + + Ok(()) + } + + fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> { + let mut nodes = self + .nodes + .write() + .map_err(|_| BellandeMeshError::LockError)?; + + if let Some(node) = nodes.iter_mut().find(|n| n.address == addr) { + node.update_last_seen(); + } + + Ok(()) + } + + fn sync_with_peers(&self) -> Result<(), BellandeMeshError> { + // Take a snapshot of current nodes to avoid holding the lock + let nodes = { + let nodes_guard = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + nodes_guard.clone() + }; + + for node in nodes { + if let Ok(mut stream) = TcpStream::connect(node.address) { + let chunks = { + let data = node.data.read().map_err(|_| BellandeMeshError::LockError)?; + data.keys().cloned().collect::<Vec<_>>() + }; + + let request = Message::DataRequest { ids: chunks }; + if let Err(e) = self.write_message(&mut stream, &request) { + eprintln!("Failed to sync with {}: {}", node.address, e); + continue; + } + + // Handle response + if let Ok(Message::DataSync { chunks }) = self.read_message(&mut stream) { + self.handle_data_sync(chunks)?; + } + } + } + + Ok(()) + } + + fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> { + let timeout = Duration::from_secs(self.config.node_timeout); + let mut nodes = self + .nodes + .write() + .map_err(|_| BellandeMeshError::LockError)?; + + nodes.retain(|node| { + let is_alive = node.is_alive(timeout); + if !is_alive { + eprintln!("Removing dead node: {}", node.address); + } + is_alive + }); + + Ok(()) + } +} + +impl Clone for BellandeMeshSync { + fn clone(&self) -> Self { + Self { + config: Arc::clone(&self.config), + nodes: Arc::clone(&self.nodes), + running: Arc::clone(&self.running), + } + } +} diff --git a/src/mesh/mod.rs b/src/mesh/mod.rs new file mode 100644 index 0000000..cb3e16e --- /dev/null +++ b/src/mesh/mod.rs @@ -0,0 +1 @@ +pub mod mesh; diff --git a/src/metrics/metrics.rs b/src/metrics/metrics.rs new file mode 100644 index 0000000..7e22339 --- /dev/null +++ b/src/metrics/metrics.rs @@ -0,0 +1,141 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use std::collections::HashMap; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; + +#[derive(Clone)] +pub struct Counter { + value: Arc<AtomicI64>, +} + +impl Counter { + fn new() -> Self { + Self { + value: Arc::new(AtomicI64::new(0)), + } + } + + pub fn inc(&self) { + self.value.fetch_add(1, Ordering::Relaxed); + } + + pub fn get(&self) -> i64 { + self.value.load(Ordering::Relaxed) + } +} + +#[derive(Clone)] +pub struct Gauge { + value: Arc<AtomicI64>, +} + +impl Gauge { + fn new() -> Self { + Self { + value: Arc::new(AtomicI64::new(0)), + } + } + + pub fn set(&self, v: i64) { + self.value.store(v, Ordering::Relaxed); + } + + pub fn get(&self) -> i64 { + self.value.load(Ordering::Relaxed) + } +} + +pub struct MetricsManager { + counters: HashMap<String, Counter>, + gauges: HashMap<String, Gauge>, +} + +impl MetricsManager { + pub fn new() -> Self { + let mut manager = Self { + counters: HashMap::new(), + gauges: HashMap::new(), + }; + + // Initialize metrics + manager.create_gauge("active_nodes"); + manager.create_gauge("data_chunks"); + manager.create_counter("sync_operations"); + manager.create_counter("conflicts_resolved"); + manager.create_counter("network_errors"); + + manager + } + + fn create_counter(&mut self, name: &str) { + self.counters.insert(name.to_string(), Counter::new()); + } + + fn create_gauge(&mut self, name: &str) { + self.gauges.insert(name.to_string(), Gauge::new()); + } + + pub fn update_active_nodes(&self, count: i64) { + if let Some(gauge) = self.gauges.get("active_nodes") { + gauge.set(count); + } + } + + pub fn update_data_chunks(&self, count: i64) { + if let Some(gauge) = self.gauges.get("data_chunks") { + gauge.set(count); + } + } + + pub fn increment_sync_operations(&self) { + if let Some(counter) = self.counters.get("sync_operations") { + counter.inc(); + } + } + + pub fn increment_conflicts_resolved(&self) { + if let Some(counter) = self.counters.get("conflicts_resolved") { + counter.inc(); + } + } + + pub fn increment_network_errors(&self) { + if let Some(counter) = self.counters.get("network_errors") { + counter.inc(); + } + } + + pub fn get_metrics(&self) -> String { + let mut result = String::new(); + + for (name, gauge) in &self.gauges { + result.push_str(&format!("{} {}\n", name, gauge.get())); + } + + for (name, counter) in &self.counters { + result.push_str(&format!("{} {}\n", name, counter.get())); + } + + result + } +} + +impl Default for MetricsManager { + fn default() -> Self { + Self::new() + } +} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..e144883 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1 @@ +pub mod metrics; diff --git a/src/node/mod.rs b/src/node/mod.rs new file mode 100644 index 0000000..492bc84 --- /dev/null +++ b/src/node/mod.rs @@ -0,0 +1 @@ +pub mod node; diff --git a/src/node/node.rs b/src/node/node.rs new file mode 100644 index 0000000..4a8ff25 --- /dev/null +++ b/src/node/node.rs @@ -0,0 +1,242 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use rand::{thread_rng, RngCore}; +use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, SystemTime}; + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Serialize, Deserialize, PartialOrd, Ord)] +pub struct NodeId([u8; 32]); + +impl NodeId { + pub fn new() -> Self { + let mut id = [0u8; 32]; + thread_rng().fill_bytes(&mut id); + NodeId(id) + } + + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut id = [0u8; 32]; + let len = std::cmp::min(bytes.len(), 32); + id[..len].copy_from_slice(&bytes[..len]); + NodeId(id) + } + + pub fn distance(&self, other: &NodeId) -> NodeId { + let mut result = [0u8; 32]; + for i in 0..32 { + result[i] = self.0[i] ^ other.0[i]; + } + NodeId(result) + } + + pub fn as_ref(&self) -> &[u8] { + &self.0 + } + + pub fn to_hex(&self) -> String { + self.0.iter().map(|b| format!("{:02x}", b)).collect() + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PublicKey([u8; 32]); + +impl PublicKey { + pub fn new(bytes: [u8; 32]) -> Self { + PublicKey(bytes) + } + + pub fn as_bytes(&self) -> &[u8; 32] { + &self.0 + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DataChunk { + pub id: NodeId, + pub content: Vec<u8>, + pub timestamp: SystemTime, + pub author: NodeId, +} + +impl DataChunk { + pub fn new(id: NodeId, content: Vec<u8>, author: NodeId) -> Self { + Self { + id, + content, + timestamp: SystemTime::now(), + author, + } + } + + pub fn size(&self) -> usize { + std::mem::size_of::<NodeId>() * 2 + self.content.len() + std::mem::size_of::<SystemTime>() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Message { + Ping { + sender: NodeId, + token: u64, + }, + Pong { + sender: NodeId, + token: u64, + }, + Store { + sender: NodeId, + key: Vec<u8>, + value: Vec<u8>, + token: u64, + }, + FindNode { + sender: NodeId, + target: NodeId, + token: u64, + }, + FindValue { + sender: NodeId, + key: Vec<u8>, + token: u64, + }, + Nodes { + sender: NodeId, + nodes: Vec<Node>, + token: u64, + }, + Value { + sender: NodeId, + key: Vec<u8>, + value: Vec<u8>, + token: u64, + }, + JoinRequest { + id: NodeId, + public_key: PublicKey, + }, + JoinResponse { + accepted: bool, + nodes: Vec<Node>, + }, + DataSync { + chunks: Vec<DataChunk>, + }, + DataRequest { + ids: Vec<NodeId>, + }, + Heartbeat, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Node { + pub id: NodeId, + pub address: SocketAddr, + pub public_key: PublicKey, + pub last_seen: SystemTime, + pub rtt: Duration, + pub failed_queries: u32, + #[serde(skip)] + pub data: Arc<RwLock<HashMap<NodeId, DataChunk>>>, +} + +impl Node { + pub fn new(id: NodeId, address: SocketAddr, public_key: PublicKey) -> Self { + Self { + id, + address, + public_key, + last_seen: SystemTime::now(), + rtt: Duration::from_secs(0), + failed_queries: 0, + data: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub fn update_last_seen(&mut self) { + self.last_seen = SystemTime::now(); + } + + pub fn mark_failed(&mut self) { + self.failed_queries += 1; + } + + pub fn update_rtt(&mut self, rtt: Duration) { + self.rtt = rtt; + self.last_seen = SystemTime::now(); + self.failed_queries = 0; + } + + pub fn add_data_chunk(&self, chunk: DataChunk) -> bool { + match self.data.write() { + Ok(mut data) => { + data.insert(chunk.id, chunk); + true + } + Err(_) => false, + } + } + + pub fn get_data_chunk(&self, id: &NodeId) -> Option<DataChunk> { + self.data.read().ok()?.get(id).cloned() + } + + pub fn remove_old_data(&self, max_age: Duration) -> bool { + match self.data.write() { + Ok(mut data) => { + let now = SystemTime::now(); + data.retain(|_, chunk| { + now.duration_since(chunk.timestamp) + .map(|age| age <= max_age) + .unwrap_or(false) + }); + true + } + Err(_) => false, + } + } + + pub fn is_alive(&self, timeout: Duration) -> bool { + SystemTime::now() + .duration_since(self.last_seen) + .map(|duration| duration < timeout) + .unwrap_or(false) + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for Node {} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option<Ordering> { + Some(self.id.cmp(&other.id)) + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> Ordering { + self.id.cmp(&other.id) + } +} diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs new file mode 100644 index 0000000..c93e4ff --- /dev/null +++ b/src/persistence/mod.rs @@ -0,0 +1 @@ +pub mod persistence; diff --git a/src/persistence/persistence.rs b/src/persistence/persistence.rs new file mode 100644 index 0000000..be56af7 --- /dev/null +++ b/src/persistence/persistence.rs @@ -0,0 +1,192 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::error::error::BellandeMeshError; +use crate::node::node::{DataChunk, Node, NodeId, PublicKey}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::io::{BufReader, BufWriter}; +use std::os::unix::fs::MetadataExt; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::SystemTime; + +#[derive(Serialize, Deserialize)] +struct SerializableNode { + id: NodeId, + address: std::net::SocketAddr, + public_key: PublicKey, + last_seen: SystemTime, +} + +impl From<&Node> for SerializableNode { + fn from(node: &Node) -> Self { + SerializableNode { + id: node.id, + address: node.address, + public_key: node.public_key.clone(), + last_seen: node.last_seen, + } + } +} + +impl SerializableNode { + fn into_node(self) -> Node { + Node { + id: self.id, + address: self.address, + public_key: self.public_key, + last_seen: self.last_seen, + rtt: Default::default(), + failed_queries: 0, + data: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +pub struct PersistenceManager { + data_dir: PathBuf, + connections: Arc<Mutex<Vec<FileConnection>>>, +} + +struct FileConnection { + file: File, +} + +impl PersistenceManager { + pub fn new(data_dir: &str) -> Result<Self, BellandeMeshError> { + let path = Path::new(data_dir); + std::fs::create_dir_all(path)?; + + Ok(Self { + data_dir: path.to_path_buf(), + connections: Arc::new(Mutex::new(Vec::new())), + }) + } + + pub fn save_node(&self, node: &Node) -> Result<(), BellandeMeshError> { + let path = self + .data_dir + .join("nodes") + .join(format!("{}.dat", node.id.to_hex())); + + std::fs::create_dir_all(path.parent().unwrap())?; + let file = self.get_connection(&path)?; + let writer = BufWriter::new(file); + + let serializable_node = SerializableNode::from(node); + bincode::serialize_into(writer, &serializable_node) + .map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; + + Ok(()) + } + + pub fn load_nodes(&self) -> Result<Vec<Node>, BellandeMeshError> { + let nodes_dir = self.data_dir.join("nodes"); + if !nodes_dir.exists() { + return Ok(Vec::new()); + } + + let mut nodes = Vec::new(); + for entry in std::fs::read_dir(nodes_dir)? { + let entry = entry?; + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) == Some("dat") { + let file = self.get_connection(&path)?; + let reader = BufReader::new(file); + + let serializable_node: SerializableNode = bincode::deserialize_from(reader) + .map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?; + + nodes.push(serializable_node.into_node()); + } + } + + Ok(nodes) + } + + pub fn save_data_chunk( + &self, + node_id: &NodeId, + chunk: &DataChunk, + ) -> Result<(), BellandeMeshError> { + let path = self + .data_dir + .join("data_chunks") + .join(node_id.to_hex()) + .join(format!("{}.dat", chunk.id.to_hex())); + + std::fs::create_dir_all(path.parent().unwrap())?; + let file = self.get_connection(&path)?; + let writer = BufWriter::new(file); + + bincode::serialize_into(writer, chunk) + .map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; + + Ok(()) + } + + pub fn load_data_chunks( + &self, + node_id: &NodeId, + ) -> Result<HashMap<NodeId, DataChunk>, BellandeMeshError> { + let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex()); + + if !chunks_dir.exists() { + return Ok(HashMap::new()); + } + + let mut chunks = HashMap::new(); + for entry in std::fs::read_dir(chunks_dir)? { + let entry = entry?; + let path = entry.path(); + if path.extension().and_then(|s| s.to_str()) == Some("dat") { + let file = self.get_connection(&path)?; + let reader = BufReader::new(file); + + let chunk: DataChunk = bincode::deserialize_from(reader) + .map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?; + chunks.insert(chunk.id, chunk); + } + } + + Ok(chunks) + } + + fn get_connection(&self, path: &Path) -> Result<File, BellandeMeshError> { + let mut connections = self + .connections + .lock() + .map_err(|_| BellandeMeshError::LockError)?; + + if let Some(conn) = connections + .iter_mut() + .find(|c| c.file.metadata().unwrap().ino() == path.metadata().unwrap().ino()) + { + Ok(conn.file.try_clone()?) + } else { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path)?; + connections.push(FileConnection { + file: file.try_clone()?, + }); + Ok(file) + } + } +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 0000000..15ab560 --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1 @@ +pub mod tests; diff --git a/src/tests/tests.rs b/src/tests/tests.rs new file mode 100644 index 0000000..23a72fa --- /dev/null +++ b/src/tests/tests.rs @@ -0,0 +1,127 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::config::Config; + use crate::mesh::mesh::BellandeMeshSync; + use tokio::net::TcpStream; + use uuid::Uuid; + + async fn setup_test_mesh() -> BellandeMeshSync { + let config = Config { + db_url: ":memory:".to_string(), + listen_address: "127.0.0.1:0".to_string(), + bootstrap_nodes: vec![], + }; + BellandeMeshSync::new(&config).await.unwrap() + } + + #[tokio::test] + async fn test_node_join() { + let mesh = setup_test_mesh().await; + let node_id = Uuid::new_v4(); + let public_key = mesh.encryption.public_key(); + + // Simulate a node joining + mesh.handle_join_request(node_id, public_key).await.unwrap(); + + let nodes = mesh.nodes.read().await; + assert!(nodes.iter().any(|node| node.id == node_id)); + } + + #[tokio::test] + async fn test_data_sync() { + let mesh = setup_test_mesh().await; + let node_id = Uuid::new_v4(); + let public_key = mesh.encryption.public_key(); + + // Add a node + mesh.handle_join_request(node_id, public_key).await.unwrap(); + + // Create a test data chunk + let chunk = DataChunk { + id: Uuid::new_v4(), + content: vec![1, 2, 3, 4], + checksum: "test_checksum".to_string(), + version: 1, + last_modified: chrono::Utc::now(), + author: node_id, + parent_versions: vec![], + }; + + // Sync the data + mesh.handle_data_sync(node_id, vec![chunk.clone()]) + .await + .unwrap(); + + // Verify the data was synced + let nodes = mesh.nodes.read().await; + let node = nodes.iter().find(|n| n.id == node_id).unwrap(); + let node_data = node.data.read().await; + assert!(node_data.contains_key(&chunk.id)); + } + + #[tokio::test] + async fn test_conflict_resolution() { + let mesh = setup_test_mesh().await; + let node1_id = Uuid::new_v4(); + let node2_id = Uuid::new_v4(); + let public_key = mesh.encryption.public_key(); + + // Add two nodes + mesh.handle_join_request(node1_id, public_key) + .await + .unwrap(); + mesh.handle_join_request(node2_id, public_key) + .await + .unwrap(); + + // Create conflicting data chunks + let chunk_id = Uuid::new_v4(); + let chunk1 = DataChunk { + id: chunk_id, + content: vec![1, 2, 3], + checksum: "checksum1".to_string(), + version: 1, + last_modified: chrono::Utc::now(), + author: node1_id, + parent_versions: vec![], + }; + let chunk2 = DataChunk { + id: chunk_id, + content: vec![4, 5, 6], + checksum: "checksum2".to_string(), + version: 1, + last_modified: chrono::Utc::now(), + author: node2_id, + parent_versions: vec![], + }; + + // Sync conflicting data + mesh.handle_data_sync(node1_id, vec![chunk1]).await.unwrap(); + mesh.handle_data_sync(node2_id, vec![chunk2]).await.unwrap(); + + // Verify conflict resolution + let nodes = mesh.nodes.read().await; + for node in nodes.iter() { + let node_data = node.data.read().await; + let resolved_chunk = node_data.get(&chunk_id).unwrap(); + assert_eq!(resolved_chunk.version, 2); + assert_eq!(resolved_chunk.parent_versions.len(), 2); + } + } +} diff --git a/src/utilities/mod.rs b/src/utilities/mod.rs new file mode 100644 index 0000000..89db166 --- /dev/null +++ b/src/utilities/mod.rs @@ -0,0 +1 @@ +pub mod utilities; diff --git a/src/utilities/utilities.rs b/src/utilities/utilities.rs new file mode 100644 index 0000000..645d241 --- /dev/null +++ b/src/utilities/utilities.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use ed25519_dalek::PublicKey; +use serde::{Deserialize, Serialize}; + +impl Serialize for PublicKey { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + serializer.serialize_bytes(&self.to_bytes()) + } +} + +impl<'de> Deserialize<'de> for PublicKey { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + let bytes: [u8; 32] = Deserialize::deserialize(deserializer)?; + PublicKey::from_bytes(&bytes).map_err(serde::de::Error::custom) + } +}