latest pushes

This commit is contained in:
Ronaldson Bellande 2024-10-23 17:18:01 -04:00
parent 10107d60b5
commit f8c3b3e408
29 changed files with 1904 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
target
git_scripts
.env
Cargo.lock

23
Cargo.toml Normal file
View File

@ -0,0 +1,23 @@
[package]
name = "bellande_mesh_sync"
version = "0.0.1"
edition = "2021"
authors = ["Ronaldson Bellande <ronaldsonbellande@gmail.com>"]
description = "Advanced peer-to-peer data synchronization system for distributed applications"
license = "GPL-3.0-or-later"
repository = "https://github.com/Architecture-Mechanism/bellande_mesh_sync"
documentation = "https://bellande-architecture-mechanism-research-innovation-center.org/bellande_mesh_sync/docs"
readme = "README.md"
keywords = ["p2p", "synchronization", "distributed-systems", "mesh-network", "bellande_mesh_sync"]
categories = ["network-programming", "asynchronous"]
[lib]
path = "src/bellande_mesh_sync.rs"
[dependencies]
tokio = { version = "1.28", features = ["rt", "net", "time", "sync", "macros"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
rand = "0.8"
uuid = { version = "1.3", features = ["v4", "serde"] }

View File

@ -1,5 +1,6 @@
# Bellande Mesh Sync # Bellande Mesh Sync
- a comprehensive data synchronization system
## License ## License
Bellande Mesh Sync is distributed under the [GNU General Public License v3.0](https://www.gnu.org/licenses/gpl-3.0.en.html), see [LICENSE](https://github.com/Architecture-Mechanism/bellande_mesh_sync/blob/main/LICENSE) and [NOTICE](https://github.com/Architecture-Mechanism/bellande_mesh_sync/blob/main/LICENSE) for more information. Bellande Mesh Sync is distributed under the [GNU General Public License v3.0](https://www.gnu.org/licenses/gpl-3.0.en.html), see [LICENSE](https://github.com/Architecture-Mechanism/bellande_mesh_sync/blob/main/LICENSE) and [NOTICE](https://github.com/Architecture-Mechanism/bellande_mesh_sync/blob/main/LICENSE) for more information.

17
dependencies.bellande Normal file
View File

@ -0,0 +1,17 @@
tokio: "1.28"
features:
- rt
- net
- time
- sync
- macros
serde: "1.0"
features:
- derive
serde_json: "1.0"
bincode: "1.3"
rand: "0.8"
uuid: "1.3"
features:
- v4
- serde

1
make_rust_executable.bellos Executable file
View File

@ -0,0 +1 @@
bellande_rust_executable -d dependencies.bellande -s src -m bellande_mesh_sync.rs -o executable/bellande_mesh_sync

1
make_rust_executable.sh Executable file
View File

@ -0,0 +1 @@
bellande_rust_executable -d dependencies.bellande -s src -m bellande_mesh_sync.rs -o executable/bellande_mesh_sync

42
src/bellande_mesh_sync.rs Normal file
View File

@ -0,0 +1,42 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
mod config;
mod data;
mod dht;
mod encryption;
mod error;
mod mesh;
mod metrics;
mod node;
mod persistence;
pub use crate::config::config::Config;
pub use crate::error::error::BellandeMeshError;
pub use crate::mesh::mesh::BellandeMeshSync;
/// Initialize the BellandeMeshSync System
/// This function takes a configuration and sets up the BellandeMeshSync System.
/// It returns a BellandeMeshSync instance that can be used to interact with the mesh network.
pub async fn init(config: Config) -> Result<BellandeMeshSync, BellandeMeshError> {
let bellande_mesh = BellandeMeshSync::new(&config)?;
Ok(bellande_mesh)
}
/// Start the BellandeMeshSync System
/// This function takes a BellandeMeshSync instance and starts the mesh network operations.
pub async fn start(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> {
bellande_mesh.start()
}

40
src/config/config.rs Normal file
View File

@ -0,0 +1,40 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use serde::{Deserialize, Serialize};
use std::fs;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
pub db_url: String,
pub listen_address: String,
pub bootstrap_nodes: Vec<String>,
pub sync_interval: u64,
pub node_timeout: u64,
}
impl Config {
pub fn load(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let config_str = fs::read_to_string(path)?;
Ok(serde_json::from_str(&config_str)?)
}
pub fn sync_interval(&self) -> Option<u64> {
Some(self.sync_interval)
}
pub fn node_timeout(&self) -> Option<u64> {
Some(self.node_timeout)
}
}

1
src/config/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod config;

49
src/data/data.rs Normal file
View File

@ -0,0 +1,49 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
use uuid::Uuid;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct DataChunk {
pub id: Uuid,
pub content: Vec<u8>,
pub checksum: String,
pub version: u64,
pub last_modified: SystemTime,
pub author: Uuid,
pub parent_versions: Vec<u64>,
}
impl DataChunk {
pub fn new(
content: Vec<u8>,
checksum: String,
version: u64,
author: Uuid,
parent_versions: Vec<u64>,
) -> Self {
Self {
id: Uuid::new_v4(),
content,
checksum,
version,
last_modified: SystemTime::now(),
author,
parent_versions,
}
}
}

1
src/data/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod data;

392
src/dht/dht.rs Normal file
View File

@ -0,0 +1,392 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::error::error::BellandeMeshError;
use crate::node::node::{Message, Node, NodeId, PublicKey};
use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::io;
use std::net::{SocketAddr, UdpSocket};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant, SystemTime};
const ALPHA: usize = 3;
const K: usize = 20;
const BUCKET_REFRESH_INTERVAL: Duration = Duration::from_secs(3600);
const REPLICATION_INTERVAL: Duration = Duration::from_secs(3600);
const NODE_TIMEOUT: Duration = Duration::from_secs(900);
const PING_INTERVAL: Duration = Duration::from_secs(300);
#[derive(Clone, Debug)]
struct KBucket {
nodes: Vec<Node>,
last_updated: Instant,
}
impl KBucket {
fn new() -> Self {
Self {
nodes: Vec::with_capacity(K),
last_updated: Instant::now(),
}
}
fn add_node(&mut self, node: Node) {
if let Some(existing) = self.nodes.iter_mut().find(|n| n.id == node.id) {
existing.last_seen = node.last_seen;
existing.rtt = node.rtt;
return;
}
if self.nodes.len() < K {
self.nodes.push(node);
} else if let Some(pos) = self.nodes.iter().position(|n| n.failed_queries > 0) {
self.nodes[pos] = node;
}
self.last_updated = Instant::now();
self.nodes.sort_by_key(|n| n.rtt);
}
fn remove_node(&mut self, id: &NodeId) {
self.nodes.retain(|n| n.id != *id);
}
fn get_nodes(&self) -> Vec<Node> {
self.nodes.clone()
}
}
#[derive(Clone, Debug)]
pub struct StorageValue {
pub data: Vec<u8>,
pub timestamp: SystemTime,
pub ttl: Duration,
}
impl StorageValue {
pub fn new(data: Vec<u8>, ttl: Duration) -> Self {
Self {
data,
timestamp: SystemTime::now(),
ttl,
}
}
pub fn is_expired(&self) -> bool {
SystemTime::now()
.duration_since(self.timestamp)
.map(|elapsed| elapsed >= self.ttl)
.unwrap_or(true)
}
}
struct RoutingTable {
buckets: Vec<KBucket>,
local_id: NodeId,
}
impl RoutingTable {
fn new(local_id: NodeId) -> Self {
Self {
buckets: (0..256).map(|_| KBucket::new()).collect(),
local_id,
}
}
fn add_node(&mut self, node: Node) {
let bucket_idx = self.bucket_index(&node.id);
self.buckets[bucket_idx].add_node(node);
}
fn get_closest_nodes(&self, target: &NodeId) -> Vec<Node> {
let mut nodes = Vec::new();
for bucket in &self.buckets {
nodes.extend(bucket.get_nodes());
}
nodes.sort_by_cached_key(|node| node.id.distance(target));
nodes.truncate(K);
nodes
}
fn bucket_index(&self, id: &NodeId) -> usize {
let distance = self.local_id.distance(id);
let distance_bytes = distance.as_ref();
let mut leading_zeros = 0;
for &byte in distance_bytes {
if byte == 0 {
leading_zeros += 8;
} else {
leading_zeros += byte.leading_zeros() as usize;
break;
}
}
(255 - leading_zeros).min(255)
}
}
pub struct DhtWrapper {
routing_table: Arc<RwLock<RoutingTable>>,
storage: Arc<RwLock<HashMap<Vec<u8>, StorageValue>>>,
socket: Arc<UdpSocket>,
pending_requests: Arc<Mutex<HashMap<u64, Instant>>>,
}
impl DhtWrapper {
pub fn new(bind_addr: SocketAddr, local_key: &[u8]) -> Result<Self, BellandeMeshError> {
let socket = UdpSocket::bind(bind_addr)?;
socket.set_nonblocking(true)?;
let local_id = NodeId::from_bytes(local_key);
let routing_table = RoutingTable::new(local_id);
let wrapper = Self {
routing_table: Arc::new(RwLock::new(routing_table)),
storage: Arc::new(RwLock::new(HashMap::new())),
socket: Arc::new(socket),
pending_requests: Arc::new(Mutex::new(HashMap::new())),
};
wrapper.start_background_tasks();
Ok(wrapper)
}
fn start_background_tasks(&self) {
let dht = self.clone();
thread::spawn(move || loop {
thread::sleep(PING_INTERVAL);
if let Err(e) = dht.maintenance_task() {
eprintln!("Maintenance error: {}", e);
}
});
let dht = self.clone();
thread::spawn(move || loop {
if let Err(e) = dht.handle_incoming_messages() {
eprintln!("Message handling error: {}", e);
thread::sleep(Duration::from_millis(100));
}
});
}
fn maintenance_task(&self) -> Result<(), BellandeMeshError> {
// Clean up expired requests
{
let mut requests = self
.pending_requests
.lock()
.map_err(|_| BellandeMeshError::LockError)?;
requests.retain(|_, time| time.elapsed() < Duration::from_secs(60));
}
// Get outdated buckets
let outdated_buckets = {
let table = self
.routing_table
.read()
.map_err(|_| BellandeMeshError::LockError)?;
table
.buckets
.iter()
.enumerate()
.filter(|(_, bucket)| bucket.last_updated.elapsed() > BUCKET_REFRESH_INTERVAL)
.map(|(i, _)| i)
.collect::<Vec<_>>()
};
// Refresh outdated buckets
for i in outdated_buckets {
let target_id = NodeId::new();
self.find_node(&target_id)?;
}
Ok(())
}
fn handle_incoming_messages(&self) -> Result<(), BellandeMeshError> {
let mut buf = vec![0u8; 65536];
loop {
match self.socket.recv_from(&mut buf) {
Ok((len, src)) => {
let message = bincode::deserialize(&buf[..len])
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?;
self.handle_message(message, src)?;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
continue;
}
Err(e) => return Err(BellandeMeshError::IoError(e)),
}
}
}
fn handle_message(&self, message: Message, src: SocketAddr) -> Result<(), BellandeMeshError> {
match message {
Message::Ping { sender, token } => {
let response = Message::Pong {
sender: self.get_local_id()?,
token,
};
self.send_message(&src, &response)?;
}
Message::Pong { sender, token } => {
if let Some(start_time) = self
.pending_requests
.lock()
.map_err(|_| BellandeMeshError::LockError)?
.remove(&token)
{
let rtt = start_time.elapsed();
let mut node = Node::new(sender, src, PublicKey::new([0; 32]));
node.update_rtt(rtt);
self.routing_table
.write()
.map_err(|_| BellandeMeshError::LockError)?
.add_node(node);
}
}
Message::FindNode {
sender,
target,
token,
} => {
let nodes = self.get_closest_nodes(&target)?;
let response = Message::Nodes {
sender: self.get_local_id()?,
nodes,
token,
};
self.send_message(&src, &response)?;
}
Message::Store {
sender,
key,
value,
token,
} => {
self.store(&key, &value, REPLICATION_INTERVAL)?;
let response = Message::Pong {
sender: self.get_local_id()?,
token,
};
self.send_message(&src, &response)?;
}
_ => {}
}
Ok(())
}
fn send_message(&self, addr: &SocketAddr, msg: &Message) -> Result<(), BellandeMeshError> {
let data =
bincode::serialize(msg).map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
self.socket.send_to(&data, addr)?;
Ok(())
}
fn get_local_id(&self) -> Result<NodeId, BellandeMeshError> {
Ok(self
.routing_table
.read()
.map_err(|_| BellandeMeshError::LockError)?
.local_id
.clone())
}
fn get_closest_nodes(&self, target: &NodeId) -> Result<Vec<Node>, BellandeMeshError> {
Ok(self
.routing_table
.read()
.map_err(|_| BellandeMeshError::LockError)?
.get_closest_nodes(target))
}
pub fn store(&self, key: &[u8], value: &[u8], ttl: Duration) -> Result<(), BellandeMeshError> {
let storage_value = StorageValue::new(value.to_vec(), ttl);
let mut storage = self
.storage
.write()
.map_err(|_| BellandeMeshError::LockError)?;
storage.insert(key.to_vec(), storage_value);
let target = NodeId::from_bytes(key);
let closest_nodes = self.get_closest_nodes(&target)?;
for node in closest_nodes {
let token = rand::thread_rng().gen();
let msg = Message::Store {
sender: self.get_local_id()?,
key: key.to_vec(),
value: value.to_vec(),
token,
};
let _ = self.send_message(&node.address, &msg);
}
Ok(())
}
fn cleanup_storage(&self) -> Result<(), BellandeMeshError> {
let mut storage = self
.storage
.write()
.map_err(|_| BellandeMeshError::LockError)?;
storage.retain(|_, value| !value.is_expired());
Ok(())
}
pub fn find_node(&self, target: &NodeId) -> Result<Vec<Node>, BellandeMeshError> {
let closest = self.get_closest_nodes(target)?;
let mut queried_nodes = HashSet::new();
let mut results = Vec::new();
for node in closest {
if queried_nodes.contains(&node.id) {
continue;
}
let token = rand::thread_rng().gen();
let msg = Message::FindNode {
sender: self.get_local_id()?,
target: target.clone(),
token,
};
if let Ok(()) = self.send_message(&node.address, &msg) {
queried_nodes.insert(node.id);
results.push(node);
}
}
results.sort_by_cached_key(|node| node.id.distance(target));
results.truncate(K);
Ok(results)
}
}
impl Clone for DhtWrapper {
fn clone(&self) -> Self {
Self {
routing_table: Arc::clone(&self.routing_table),
storage: Arc::clone(&self.storage),
socket: Arc::clone(&self.socket),
pending_requests: Arc::clone(&self.pending_requests),
}
}
}

1
src/dht/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod dht;

View File

@ -0,0 +1,169 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::fmt;
use std::hash::{Hash, Hasher};
#[derive(Debug)]
pub enum EncryptionError {
InvalidKeyLength,
InvalidDataFormat,
EncryptionError,
DecryptionError,
}
impl fmt::Display for EncryptionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
EncryptionError::InvalidKeyLength => write!(f, "Invalid key length"),
EncryptionError::InvalidDataFormat => write!(f, "Invalid data format"),
EncryptionError::EncryptionError => write!(f, "Encryption error"),
EncryptionError::DecryptionError => write!(f, "Decryption error"),
}
}
}
impl Error for EncryptionError {}
#[derive(Clone)]
pub struct PublicKey([u8; 32]);
pub struct PrivateKey([u8; 32]);
pub struct Signature([u8; 64]);
impl PublicKey {
pub fn to_bytes(&self) -> [u8; 32] {
self.0
}
}
pub struct EncryptionManager {
public_key: PublicKey,
private_key: PrivateKey,
}
impl EncryptionManager {
pub fn new() -> Self {
let private_key = Self::generate_random_bytes();
let public_key = Self::derive_public_key(&private_key);
Self {
public_key: PublicKey(public_key),
private_key: PrivateKey(private_key),
}
}
pub fn encrypt(&self, data: &[u8], shared_secret: &[u8]) -> Result<Vec<u8>, EncryptionError> {
let key = self.derive_key(shared_secret)?;
let nonce = Self::generate_random_bytes();
let mut result = nonce.to_vec();
result.extend_from_slice(data);
for (i, byte) in result.iter_mut().enumerate().skip(12) {
*byte ^= key[i % key.len()];
}
Ok(result)
}
pub fn decrypt(
&self,
encrypted_data: &[u8],
shared_secret: &[u8],
) -> Result<Vec<u8>, EncryptionError> {
if encrypted_data.len() < 12 {
return Err(EncryptionError::InvalidDataFormat);
}
let key = self.derive_key(shared_secret)?;
let mut decrypted = encrypted_data[12..].to_vec();
for (i, byte) in decrypted.iter_mut().enumerate() {
*byte ^= key[i % key.len()];
}
Ok(decrypted)
}
pub fn sign(&self, data: &[u8]) -> Signature {
let mut hasher = DefaultHasher::new();
data.hash(&mut hasher);
self.private_key.0.hash(&mut hasher);
let hash = hasher.finish();
let mut signature = [0u8; 64];
signature[..8].copy_from_slice(&hash.to_be_bytes());
Signature(signature)
}
pub fn verify(&self, public_key: &PublicKey, data: &[u8], signature: &Signature) -> bool {
let mut hasher = DefaultHasher::new();
data.hash(&mut hasher);
public_key.0.hash(&mut hasher);
let hash = hasher.finish();
hash.to_be_bytes() == signature.0[..8]
}
pub fn public_key(&self) -> PublicKey {
self.public_key.clone()
}
fn derive_key(&self, shared_secret: &[u8]) -> Result<Vec<u8>, EncryptionError> {
if shared_secret.len() < 32 {
return Err(EncryptionError::InvalidKeyLength);
}
let mut hasher = DefaultHasher::new();
shared_secret.hash(&mut hasher);
let hash = hasher.finish();
Ok(hash.to_be_bytes().to_vec())
}
fn derive_public_key(private_key: &[u8; 32]) -> [u8; 32] {
let mut hasher = DefaultHasher::new();
private_key.hash(&mut hasher);
let hash = hasher.finish();
let mut public_key = [0u8; 32];
public_key[..8].copy_from_slice(&hash.to_be_bytes());
public_key
}
fn generate_random_bytes() -> [u8; 32] {
let mut bytes = [0u8; 32];
for byte in &mut bytes {
*byte = Self::generate_random_byte();
}
bytes
}
fn generate_random_byte() -> u8 {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.subsec_nanos();
(nanos & 0xFF) as u8
}
}
impl Default for EncryptionManager {
fn default() -> Self {
Self::new()
}
}

1
src/encryption/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod encryption;

84
src/error/error.rs Normal file
View File

@ -0,0 +1,84 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::array::TryFromSliceError;
use std::error::Error;
use std::fmt;
#[derive(Debug)]
pub enum BellandeMeshError {
IoError(std::io::Error),
LockError,
ConversionError,
PersistenceError(String),
InvalidAddress,
ProtocolError(String),
Serialization(String),
Database(String),
Encryption(String),
Authentication,
NodeNotFound,
Dht(String),
RateLimitExceeded,
ConflictResolution,
Migration(String),
Deserialization(String),
NetworkError(String),
ArrayConversionError(TryFromSliceError),
Timeout,
}
impl fmt::Display for BellandeMeshError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BellandeMeshError::IoError(e) => write!(f, "IO error: {}", e),
BellandeMeshError::LockError => write!(f, "Lock acquisition failed"),
BellandeMeshError::ConversionError => write!(f, "Type conversion error"),
BellandeMeshError::PersistenceError(e) => write!(f, "Persistence error: {}", e),
BellandeMeshError::InvalidAddress => write!(f, "Invalid address"),
BellandeMeshError::ProtocolError(e) => write!(f, "Protocol error: {}", e),
BellandeMeshError::Serialization(err) => write!(f, "Serialization error: {}", err),
BellandeMeshError::Database(err) => write!(f, "Database error: {}", err),
BellandeMeshError::Encryption(err) => write!(f, "Encryption error: {}", err),
BellandeMeshError::Authentication => write!(f, "Authentication error"),
BellandeMeshError::NodeNotFound => write!(f, "Node not found"),
BellandeMeshError::Dht(err) => write!(f, "DHT error: {}", err),
BellandeMeshError::RateLimitExceeded => write!(f, "Rate limit exceeded"),
BellandeMeshError::ConflictResolution => write!(f, "Conflict resolution error"),
BellandeMeshError::Migration(err) => write!(f, "Migration error: {}", err),
BellandeMeshError::Deserialization(err) => write!(f, "Deserialization error: {}", err),
BellandeMeshError::NetworkError(e) => write!(f, "Network error: {}", e),
BellandeMeshError::ArrayConversionError(e) => {
write!(f, "Array conversion error: {}", e)
}
BellandeMeshError::Timeout => write!(f, "Operation timed out"),
}
}
}
impl Error for BellandeMeshError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
BellandeMeshError::IoError(err) => Some(err),
_ => None,
}
}
}
impl From<std::io::Error> for BellandeMeshError {
fn from(err: std::io::Error) -> Self {
BellandeMeshError::IoError(err)
}
}

1
src/error/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod error;

332
src/mesh/mesh.rs Normal file
View File

@ -0,0 +1,332 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::config::config::Config;
use crate::error::error::BellandeMeshError;
use crate::node::node::{DataChunk, Message, Node, NodeId, PublicKey};
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
pub struct BellandeMeshSync {
config: Arc<Config>,
nodes: Arc<RwLock<Vec<Node>>>,
running: Arc<RwLock<bool>>,
}
impl BellandeMeshSync {
pub fn new(config: &Config) -> Result<Self, BellandeMeshError> {
Ok(Self {
config: Arc::new(config.clone()),
nodes: Arc::new(RwLock::new(Vec::new())),
running: Arc::new(RwLock::new(true)),
})
}
pub fn start(&self) -> Result<(), BellandeMeshError> {
let mut running = self
.running
.write()
.map_err(|_| BellandeMeshError::LockError)?;
*running = true;
drop(running);
self.start_listener()?;
self.start_maintenance_tasks();
Ok(())
}
pub fn stop(&self) -> Result<(), BellandeMeshError> {
let mut running = self
.running
.write()
.map_err(|_| BellandeMeshError::LockError)?;
*running = false;
Ok(())
}
fn is_running(&self) -> Result<bool, BellandeMeshError> {
self.running
.read()
.map_err(|_| BellandeMeshError::LockError)
.map(|guard| *guard)
}
fn start_listener(&self) -> Result<(), BellandeMeshError> {
let listener = TcpListener::bind(&self.config.listen_address)?;
let mesh = self.clone();
thread::spawn(move || {
for stream in listener.incoming() {
if !mesh.is_running().unwrap_or(false) {
break;
}
match stream {
Ok(stream) => {
let mesh_clone = mesh.clone();
thread::spawn(move || {
if let Err(e) = mesh_clone.handle_connection(stream) {
eprintln!("Connection error: {}", e);
}
});
}
Err(e) => eprintln!("Accept error: {}", e),
}
}
});
Ok(())
}
fn start_maintenance_tasks(&self) {
// Start sync task
let mesh = self.clone();
thread::spawn(move || {
while let Ok(true) = mesh.is_running() {
if let Err(e) = mesh.sync_with_peers() {
eprintln!("Sync error: {}", e);
}
thread::sleep(Duration::from_secs(60));
}
});
// Start cleanup task
let mesh = self.clone();
thread::spawn(move || {
while let Ok(true) = mesh.is_running() {
if let Err(e) = mesh.cleanup_dead_nodes() {
eprintln!("Cleanup error: {}", e);
}
thread::sleep(Duration::from_secs(300));
}
});
}
fn handle_connection(&self, mut stream: TcpStream) -> Result<(), BellandeMeshError> {
stream.set_read_timeout(Some(Duration::from_secs(30)))?;
stream.set_write_timeout(Some(Duration::from_secs(30)))?;
let running = self
.running
.read()
.map_err(|_| BellandeMeshError::LockError)?;
while *running {
let message = self.read_message(&mut stream)?;
self.handle_message(message, &mut stream)?;
}
Ok(())
}
fn read_message(&self, stream: &mut TcpStream) -> Result<Message, BellandeMeshError> {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf)?;
let len = u32::from_be_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; len];
stream.read_exact(&mut msg_buf)?;
bincode::deserialize(&msg_buf)
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))
}
fn write_message(
&self,
stream: &mut TcpStream,
message: &Message,
) -> Result<(), BellandeMeshError> {
let data = bincode::serialize(message)
.map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
let len = (data.len() as u32).to_be_bytes();
stream.write_all(&len)?;
stream.write_all(&data)?;
stream.flush()?;
Ok(())
}
fn handle_message(
&self,
message: Message,
stream: &mut TcpStream,
) -> Result<(), BellandeMeshError> {
match message {
Message::JoinRequest { id, public_key } => {
let peer_addr = stream.peer_addr()?;
self.handle_join_request(id, public_key, peer_addr)?;
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let response = Message::JoinResponse {
accepted: true,
nodes: nodes.clone(),
};
self.write_message(stream, &response)?;
}
Message::DataSync { chunks } => {
self.handle_data_sync(chunks)?;
}
Message::DataRequest { ids } => {
self.handle_data_request(&ids, stream)?;
}
Message::Heartbeat => {
let addr = stream.peer_addr()?;
self.update_node_last_seen(addr)?;
}
_ => {}
}
Ok(())
}
fn handle_join_request(
&self,
id: NodeId,
public_key: PublicKey,
addr: SocketAddr,
) -> Result<(), BellandeMeshError> {
let new_node = Node::new(id, addr, public_key);
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
if !nodes.iter().any(|n| n.id == new_node.id) {
nodes.push(new_node);
}
Ok(())
}
fn handle_data_sync(&self, chunks: Vec<DataChunk>) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
for chunk in chunks {
if let Some(node) = nodes.iter().find(|n| n.id == chunk.author) {
let _ = node.add_data_chunk(chunk);
}
}
Ok(())
}
fn handle_data_request(
&self,
ids: &[NodeId],
stream: &mut TcpStream,
) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let mut chunks = Vec::new();
for node in nodes.iter() {
for id in ids {
if let Some(chunk) = node.get_data_chunk(id) {
chunks.push(chunk);
}
}
}
let response = Message::DataSync { chunks };
self.write_message(stream, &response)?;
Ok(())
}
fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> {
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
if let Some(node) = nodes.iter_mut().find(|n| n.address == addr) {
node.update_last_seen();
}
Ok(())
}
fn sync_with_peers(&self) -> Result<(), BellandeMeshError> {
// Take a snapshot of current nodes to avoid holding the lock
let nodes = {
let nodes_guard = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
nodes_guard.clone()
};
for node in nodes {
if let Ok(mut stream) = TcpStream::connect(node.address) {
let chunks = {
let data = node.data.read().map_err(|_| BellandeMeshError::LockError)?;
data.keys().cloned().collect::<Vec<_>>()
};
let request = Message::DataRequest { ids: chunks };
if let Err(e) = self.write_message(&mut stream, &request) {
eprintln!("Failed to sync with {}: {}", node.address, e);
continue;
}
// Handle response
if let Ok(Message::DataSync { chunks }) = self.read_message(&mut stream) {
self.handle_data_sync(chunks)?;
}
}
}
Ok(())
}
fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> {
let timeout = Duration::from_secs(self.config.node_timeout);
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
nodes.retain(|node| {
let is_alive = node.is_alive(timeout);
if !is_alive {
eprintln!("Removing dead node: {}", node.address);
}
is_alive
});
Ok(())
}
}
impl Clone for BellandeMeshSync {
fn clone(&self) -> Self {
Self {
config: Arc::clone(&self.config),
nodes: Arc::clone(&self.nodes),
running: Arc::clone(&self.running),
}
}
}

1
src/mesh/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod mesh;

141
src/metrics/metrics.rs Normal file
View File

@ -0,0 +1,141 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
#[derive(Clone)]
pub struct Counter {
value: Arc<AtomicI64>,
}
impl Counter {
fn new() -> Self {
Self {
value: Arc::new(AtomicI64::new(0)),
}
}
pub fn inc(&self) {
self.value.fetch_add(1, Ordering::Relaxed);
}
pub fn get(&self) -> i64 {
self.value.load(Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct Gauge {
value: Arc<AtomicI64>,
}
impl Gauge {
fn new() -> Self {
Self {
value: Arc::new(AtomicI64::new(0)),
}
}
pub fn set(&self, v: i64) {
self.value.store(v, Ordering::Relaxed);
}
pub fn get(&self) -> i64 {
self.value.load(Ordering::Relaxed)
}
}
pub struct MetricsManager {
counters: HashMap<String, Counter>,
gauges: HashMap<String, Gauge>,
}
impl MetricsManager {
pub fn new() -> Self {
let mut manager = Self {
counters: HashMap::new(),
gauges: HashMap::new(),
};
// Initialize metrics
manager.create_gauge("active_nodes");
manager.create_gauge("data_chunks");
manager.create_counter("sync_operations");
manager.create_counter("conflicts_resolved");
manager.create_counter("network_errors");
manager
}
fn create_counter(&mut self, name: &str) {
self.counters.insert(name.to_string(), Counter::new());
}
fn create_gauge(&mut self, name: &str) {
self.gauges.insert(name.to_string(), Gauge::new());
}
pub fn update_active_nodes(&self, count: i64) {
if let Some(gauge) = self.gauges.get("active_nodes") {
gauge.set(count);
}
}
pub fn update_data_chunks(&self, count: i64) {
if let Some(gauge) = self.gauges.get("data_chunks") {
gauge.set(count);
}
}
pub fn increment_sync_operations(&self) {
if let Some(counter) = self.counters.get("sync_operations") {
counter.inc();
}
}
pub fn increment_conflicts_resolved(&self) {
if let Some(counter) = self.counters.get("conflicts_resolved") {
counter.inc();
}
}
pub fn increment_network_errors(&self) {
if let Some(counter) = self.counters.get("network_errors") {
counter.inc();
}
}
pub fn get_metrics(&self) -> String {
let mut result = String::new();
for (name, gauge) in &self.gauges {
result.push_str(&format!("{} {}\n", name, gauge.get()));
}
for (name, counter) in &self.counters {
result.push_str(&format!("{} {}\n", name, counter.get()));
}
result
}
}
impl Default for MetricsManager {
fn default() -> Self {
Self::new()
}
}

1
src/metrics/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod metrics;

1
src/node/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod node;

242
src/node/node.rs Normal file
View File

@ -0,0 +1,242 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Serialize, Deserialize, PartialOrd, Ord)]
pub struct NodeId([u8; 32]);
impl NodeId {
pub fn new() -> Self {
let mut id = [0u8; 32];
thread_rng().fill_bytes(&mut id);
NodeId(id)
}
pub fn from_bytes(bytes: &[u8]) -> Self {
let mut id = [0u8; 32];
let len = std::cmp::min(bytes.len(), 32);
id[..len].copy_from_slice(&bytes[..len]);
NodeId(id)
}
pub fn distance(&self, other: &NodeId) -> NodeId {
let mut result = [0u8; 32];
for i in 0..32 {
result[i] = self.0[i] ^ other.0[i];
}
NodeId(result)
}
pub fn as_ref(&self) -> &[u8] {
&self.0
}
pub fn to_hex(&self) -> String {
self.0.iter().map(|b| format!("{:02x}", b)).collect()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublicKey([u8; 32]);
impl PublicKey {
pub fn new(bytes: [u8; 32]) -> Self {
PublicKey(bytes)
}
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DataChunk {
pub id: NodeId,
pub content: Vec<u8>,
pub timestamp: SystemTime,
pub author: NodeId,
}
impl DataChunk {
pub fn new(id: NodeId, content: Vec<u8>, author: NodeId) -> Self {
Self {
id,
content,
timestamp: SystemTime::now(),
author,
}
}
pub fn size(&self) -> usize {
std::mem::size_of::<NodeId>() * 2 + self.content.len() + std::mem::size_of::<SystemTime>()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
Ping {
sender: NodeId,
token: u64,
},
Pong {
sender: NodeId,
token: u64,
},
Store {
sender: NodeId,
key: Vec<u8>,
value: Vec<u8>,
token: u64,
},
FindNode {
sender: NodeId,
target: NodeId,
token: u64,
},
FindValue {
sender: NodeId,
key: Vec<u8>,
token: u64,
},
Nodes {
sender: NodeId,
nodes: Vec<Node>,
token: u64,
},
Value {
sender: NodeId,
key: Vec<u8>,
value: Vec<u8>,
token: u64,
},
JoinRequest {
id: NodeId,
public_key: PublicKey,
},
JoinResponse {
accepted: bool,
nodes: Vec<Node>,
},
DataSync {
chunks: Vec<DataChunk>,
},
DataRequest {
ids: Vec<NodeId>,
},
Heartbeat,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Node {
pub id: NodeId,
pub address: SocketAddr,
pub public_key: PublicKey,
pub last_seen: SystemTime,
pub rtt: Duration,
pub failed_queries: u32,
#[serde(skip)]
pub data: Arc<RwLock<HashMap<NodeId, DataChunk>>>,
}
impl Node {
pub fn new(id: NodeId, address: SocketAddr, public_key: PublicKey) -> Self {
Self {
id,
address,
public_key,
last_seen: SystemTime::now(),
rtt: Duration::from_secs(0),
failed_queries: 0,
data: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn update_last_seen(&mut self) {
self.last_seen = SystemTime::now();
}
pub fn mark_failed(&mut self) {
self.failed_queries += 1;
}
pub fn update_rtt(&mut self, rtt: Duration) {
self.rtt = rtt;
self.last_seen = SystemTime::now();
self.failed_queries = 0;
}
pub fn add_data_chunk(&self, chunk: DataChunk) -> bool {
match self.data.write() {
Ok(mut data) => {
data.insert(chunk.id, chunk);
true
}
Err(_) => false,
}
}
pub fn get_data_chunk(&self, id: &NodeId) -> Option<DataChunk> {
self.data.read().ok()?.get(id).cloned()
}
pub fn remove_old_data(&self, max_age: Duration) -> bool {
match self.data.write() {
Ok(mut data) => {
let now = SystemTime::now();
data.retain(|_, chunk| {
now.duration_since(chunk.timestamp)
.map(|age| age <= max_age)
.unwrap_or(false)
});
true
}
Err(_) => false,
}
}
pub fn is_alive(&self, timeout: Duration) -> bool {
SystemTime::now()
.duration_since(self.last_seen)
.map(|duration| duration < timeout)
.unwrap_or(false)
}
}
impl PartialEq for Node {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for Node {}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.id.cmp(&other.id))
}
}
impl Ord for Node {
fn cmp(&self, other: &Self) -> Ordering {
self.id.cmp(&other.id)
}
}

1
src/persistence/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod persistence;

View File

@ -0,0 +1,192 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::error::error::BellandeMeshError;
use crate::node::node::{DataChunk, Node, NodeId, PublicKey};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::time::SystemTime;
#[derive(Serialize, Deserialize)]
struct SerializableNode {
id: NodeId,
address: std::net::SocketAddr,
public_key: PublicKey,
last_seen: SystemTime,
}
impl From<&Node> for SerializableNode {
fn from(node: &Node) -> Self {
SerializableNode {
id: node.id,
address: node.address,
public_key: node.public_key.clone(),
last_seen: node.last_seen,
}
}
}
impl SerializableNode {
fn into_node(self) -> Node {
Node {
id: self.id,
address: self.address,
public_key: self.public_key,
last_seen: self.last_seen,
rtt: Default::default(),
failed_queries: 0,
data: Arc::new(RwLock::new(HashMap::new())),
}
}
}
pub struct PersistenceManager {
data_dir: PathBuf,
connections: Arc<Mutex<Vec<FileConnection>>>,
}
struct FileConnection {
file: File,
}
impl PersistenceManager {
pub fn new(data_dir: &str) -> Result<Self, BellandeMeshError> {
let path = Path::new(data_dir);
std::fs::create_dir_all(path)?;
Ok(Self {
data_dir: path.to_path_buf(),
connections: Arc::new(Mutex::new(Vec::new())),
})
}
pub fn save_node(&self, node: &Node) -> Result<(), BellandeMeshError> {
let path = self
.data_dir
.join("nodes")
.join(format!("{}.dat", node.id.to_hex()));
std::fs::create_dir_all(path.parent().unwrap())?;
let file = self.get_connection(&path)?;
let writer = BufWriter::new(file);
let serializable_node = SerializableNode::from(node);
bincode::serialize_into(writer, &serializable_node)
.map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
Ok(())
}
pub fn load_nodes(&self) -> Result<Vec<Node>, BellandeMeshError> {
let nodes_dir = self.data_dir.join("nodes");
if !nodes_dir.exists() {
return Ok(Vec::new());
}
let mut nodes = Vec::new();
for entry in std::fs::read_dir(nodes_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("dat") {
let file = self.get_connection(&path)?;
let reader = BufReader::new(file);
let serializable_node: SerializableNode = bincode::deserialize_from(reader)
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?;
nodes.push(serializable_node.into_node());
}
}
Ok(nodes)
}
pub fn save_data_chunk(
&self,
node_id: &NodeId,
chunk: &DataChunk,
) -> Result<(), BellandeMeshError> {
let path = self
.data_dir
.join("data_chunks")
.join(node_id.to_hex())
.join(format!("{}.dat", chunk.id.to_hex()));
std::fs::create_dir_all(path.parent().unwrap())?;
let file = self.get_connection(&path)?;
let writer = BufWriter::new(file);
bincode::serialize_into(writer, chunk)
.map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
Ok(())
}
pub fn load_data_chunks(
&self,
node_id: &NodeId,
) -> Result<HashMap<NodeId, DataChunk>, BellandeMeshError> {
let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex());
if !chunks_dir.exists() {
return Ok(HashMap::new());
}
let mut chunks = HashMap::new();
for entry in std::fs::read_dir(chunks_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("dat") {
let file = self.get_connection(&path)?;
let reader = BufReader::new(file);
let chunk: DataChunk = bincode::deserialize_from(reader)
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?;
chunks.insert(chunk.id, chunk);
}
}
Ok(chunks)
}
fn get_connection(&self, path: &Path) -> Result<File, BellandeMeshError> {
let mut connections = self
.connections
.lock()
.map_err(|_| BellandeMeshError::LockError)?;
if let Some(conn) = connections
.iter_mut()
.find(|c| c.file.metadata().unwrap().ino() == path.metadata().unwrap().ino())
{
Ok(conn.file.try_clone()?)
} else {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
connections.push(FileConnection {
file: file.try_clone()?,
});
Ok(file)
}
}
}

1
src/tests/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod tests;

127
src/tests/tests.rs Normal file
View File

@ -0,0 +1,127 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#[cfg(test)]
mod tests {
use super::*;
use crate::config::config::Config;
use crate::mesh::mesh::BellandeMeshSync;
use tokio::net::TcpStream;
use uuid::Uuid;
async fn setup_test_mesh() -> BellandeMeshSync {
let config = Config {
db_url: ":memory:".to_string(),
listen_address: "127.0.0.1:0".to_string(),
bootstrap_nodes: vec![],
};
BellandeMeshSync::new(&config).await.unwrap()
}
#[tokio::test]
async fn test_node_join() {
let mesh = setup_test_mesh().await;
let node_id = Uuid::new_v4();
let public_key = mesh.encryption.public_key();
// Simulate a node joining
mesh.handle_join_request(node_id, public_key).await.unwrap();
let nodes = mesh.nodes.read().await;
assert!(nodes.iter().any(|node| node.id == node_id));
}
#[tokio::test]
async fn test_data_sync() {
let mesh = setup_test_mesh().await;
let node_id = Uuid::new_v4();
let public_key = mesh.encryption.public_key();
// Add a node
mesh.handle_join_request(node_id, public_key).await.unwrap();
// Create a test data chunk
let chunk = DataChunk {
id: Uuid::new_v4(),
content: vec![1, 2, 3, 4],
checksum: "test_checksum".to_string(),
version: 1,
last_modified: chrono::Utc::now(),
author: node_id,
parent_versions: vec![],
};
// Sync the data
mesh.handle_data_sync(node_id, vec![chunk.clone()])
.await
.unwrap();
// Verify the data was synced
let nodes = mesh.nodes.read().await;
let node = nodes.iter().find(|n| n.id == node_id).unwrap();
let node_data = node.data.read().await;
assert!(node_data.contains_key(&chunk.id));
}
#[tokio::test]
async fn test_conflict_resolution() {
let mesh = setup_test_mesh().await;
let node1_id = Uuid::new_v4();
let node2_id = Uuid::new_v4();
let public_key = mesh.encryption.public_key();
// Add two nodes
mesh.handle_join_request(node1_id, public_key)
.await
.unwrap();
mesh.handle_join_request(node2_id, public_key)
.await
.unwrap();
// Create conflicting data chunks
let chunk_id = Uuid::new_v4();
let chunk1 = DataChunk {
id: chunk_id,
content: vec![1, 2, 3],
checksum: "checksum1".to_string(),
version: 1,
last_modified: chrono::Utc::now(),
author: node1_id,
parent_versions: vec![],
};
let chunk2 = DataChunk {
id: chunk_id,
content: vec![4, 5, 6],
checksum: "checksum2".to_string(),
version: 1,
last_modified: chrono::Utc::now(),
author: node2_id,
parent_versions: vec![],
};
// Sync conflicting data
mesh.handle_data_sync(node1_id, vec![chunk1]).await.unwrap();
mesh.handle_data_sync(node2_id, vec![chunk2]).await.unwrap();
// Verify conflict resolution
let nodes = mesh.nodes.read().await;
for node in nodes.iter() {
let node_data = node.data.read().await;
let resolved_chunk = node_data.get(&chunk_id).unwrap();
assert_eq!(resolved_chunk.version, 2);
assert_eq!(resolved_chunk.parent_versions.len(), 2);
}
}
}

1
src/utilities/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod utilities;

View File

@ -0,0 +1,36 @@
// Copyright (C) 2024 Bellande Architecture Mechanism Research Innovation Center, Ronaldson Bellande
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use ed25519_dalek::PublicKey;
use serde::{Deserialize, Serialize};
impl Serialize for PublicKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_bytes(&self.to_bytes())
}
}
impl<'de> Deserialize<'de> for PublicKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let bytes: [u8; 32] = Deserialize::deserialize(deserializer)?;
PublicKey::from_bytes(&bytes).map_err(serde::de::Error::custom)
}
}