Module: nodalync-net
Source: Protocol Specification §11
Overview
P2P networking using libp2p. Handles peer discovery, DHT, and message routing.
Key Design Decisions:
-
Hash-Only Lookup for MVP: The protocol supports hash-based content discovery only. Keyword/semantic search is an application-layer concern and out of scope for the core protocol. Users discover content via external channels (social media, links, recommendations) and use the protocol to query by hash.
-
DHT stores:
content_hash -> AnnouncePayloadmapping. This allows anyone with a hash to find the content owner’s addresses and metadata. -
No search index: The DHT is NOT an inverted index. Future application-layer services can build search functionality on top of the protocol.
Dependencies
nodalync-types— All data structuresnodalync-wire— Message encodingnodalync-ops— Operation handlerslibp2p— P2P networking stack
§11.1 Transport
#![allow(unused)]
fn main() {
pub fn build_transport(identity: &Keypair) -> Boxed<(PeerId, StreamMuxerBox)> {
let tcp = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true));
let transport = tcp
.upgrade(Version::V1)
.authenticate(noise::Config::new(&identity).unwrap())
.multiplex(yamux::Config::default())
.boxed();
transport
}
}
Supported transports:
- TCP (primary)
- QUIC (optional, for better performance)
- WebSocket (optional, for browser nodes)
Security:
- Noise protocol (XX handshake pattern)
Multiplexing:
- yamux (primary)
- mplex (fallback)
§11.2 Discovery (DHT)
Kademlia Configuration
#![allow(unused)]
fn main() {
pub fn build_kademlia(peer_id: PeerId) -> Kademlia<MemoryStore> {
let mut config = KademliaConfig::default();
config.set_query_timeout(Duration::from_secs(60));
config.set_replication_factor(NonZeroUsize::new(DHT_REPLICATION).unwrap());
let store = MemoryStore::new(peer_id);
Kademlia::with_config(peer_id, store, config)
}
// Constants from spec
const DHT_BUCKET_SIZE: usize = 20;
const DHT_ALPHA: usize = 3;
const DHT_REPLICATION: usize = 20;
}
Content Announcement
#![allow(unused)]
fn main() {
/// Announce content availability to DHT
/// Stores: content_hash -> AnnouncePayload
pub async fn dht_announce(&mut self, hash: &Hash, payload: AnnouncePayload) -> Result<()> {
let key = Key::new(&hash.0);
let value = encode_payload(&payload)?;
self.kademlia.put_record(Record::new(key, value), Quorum::Majority).await?;
Ok(())
}
/// Lookup content by hash (the ONLY lookup mechanism in protocol)
/// Returns owner's addresses and metadata if found
pub async fn dht_get(&mut self, hash: &Hash) -> Result<Option<AnnouncePayload>> {
let key = Key::new(&hash.0);
match self.kademlia.get_record(key).await {
Ok(record) => {
let payload: AnnouncePayload = decode_payload(&record.value)?;
Ok(Some(payload))
}
Err(GetRecordError::NotFound) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Remove content announcement from DHT
pub async fn dht_remove(&mut self, hash: &Hash) -> Result<()> {
let key = Key::new(&hash.0);
self.kademlia.remove_record(&key).await?;
Ok(())
}
}
Note on Search:
The protocol does NOT include keyword search. The DHT only supports exact hash lookups. Content discovery happens through application-layer mechanisms:
- External search services (could index L1 summaries)
- Social sharing (users share links containing hashes)
- Recommendations (applications can build on provenance data)
- Curated directories (third parties can maintain topic indexes)
This keeps the protocol minimal and focused on trustless content exchange.
---
## §11.3 Peer Discovery
### Bootstrap
```rust
const BOOTSTRAP_NODES: &[&str] = &[
"/dns4/bootstrap1.nodalync.io/tcp/9000/p2p/12D3KooW...",
"/dns4/bootstrap2.nodalync.io/tcp/9000/p2p/12D3KooW...",
];
pub async fn bootstrap(&mut self) -> Result<()> {
for addr in BOOTSTRAP_NODES {
let addr: Multiaddr = addr.parse()?;
self.swarm.dial(addr)?;
}
// Bootstrap Kademlia
self.kademlia.bootstrap()?;
Ok(())
}
Peer Exchange
#![allow(unused)]
fn main() {
/// Exchange peer lists with connected peers
pub async fn exchange_peers(&mut self) -> Result<()> {
let my_peers: Vec<PeerInfo> = self.connected_peers()
.iter()
.map(|p| self.get_peer_info(p))
.collect();
for peer in self.connected_peers() {
let msg = Message::new(
MessageType::PeerInfo,
encode_payload(&PeerInfoPayload {
peer_id: self.peer_id(),
public_key: self.public_key(),
addresses: self.listen_addresses(),
capabilities: self.capabilities(),
content_count: self.content_count(),
uptime: self.uptime(),
})?,
&self.identity,
);
self.send(&peer, msg).await?;
}
Ok(())
}
}
§11.4 Message Routing
Request-Response Protocol
#![allow(unused)]
fn main() {
#[derive(NetworkBehaviour)]
pub struct NodalyncBehaviour {
kademlia: Kademlia<MemoryStore>,
request_response: request_response::Behaviour<NodalyncCodec>,
gossipsub: gossipsub::Behaviour,
identify: identify::Behaviour,
}
pub struct NodalyncCodec;
impl request_response::Codec for NodalyncCodec {
type Protocol = &'static str;
type Request = Message;
type Response = Message;
fn protocol(&self) -> Self::Protocol {
"/nodalync/1.0.0"
}
async fn read_request(&mut self, io: &mut impl AsyncRead) -> io::Result<Self::Request> {
let bytes = read_length_prefixed(io, MAX_MESSAGE_SIZE).await?;
decode_message(&bytes).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
async fn write_response(&mut self, io: &mut impl AsyncWrite, msg: Self::Response) -> io::Result<()> {
let bytes = encode_message(&msg)?;
write_length_prefixed(io, &bytes).await
}
}
}
Send/Receive
#![allow(unused)]
fn main() {
/// Send message to specific peer
pub async fn send(&mut self, peer: &PeerId, message: Message) -> Result<Message> {
let response = self.request_response
.send_request(peer, message)
.await
.map_err(|e| Error::Network(e.to_string()))?;
Ok(response)
}
/// Broadcast announcement via GossipSub
pub async fn broadcast(&mut self, message: Message) -> Result<()> {
let topic = gossipsub::IdentTopic::new("/nodalync/announce/1.0.0");
let bytes = encode_message(&message)?;
self.gossipsub.publish(topic, bytes)?;
Ok(())
}
}
Timeouts and Retries
#![allow(unused)]
fn main() {
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
const MAX_RETRIES: usize = 3;
pub async fn send_with_retry(&mut self, peer: &PeerId, message: Message) -> Result<Message> {
let mut last_error = None;
for attempt in 0..MAX_RETRIES {
match timeout(MESSAGE_TIMEOUT, self.send(peer, message.clone())).await {
Ok(Ok(response)) => return Ok(response),
Ok(Err(e)) => {
last_error = Some(e);
// Exponential backoff
tokio::time::sleep(Duration::from_millis(100 * 2_u64.pow(attempt as u32))).await;
}
Err(_) => {
last_error = Some(Error::Timeout);
}
}
}
Err(last_error.unwrap())
}
}
Network Trait
#![allow(unused)]
fn main() {
#[async_trait]
pub trait Network {
// Discovery (hash-based only)
async fn dht_announce(&mut self, hash: &Hash, payload: AnnouncePayload) -> Result<()>;
async fn dht_get(&mut self, hash: &Hash) -> Result<Option<AnnouncePayload>>;
async fn dht_remove(&mut self, hash: &Hash) -> Result<()>;
// Messaging
async fn send(&mut self, peer: &PeerId, message: Message) -> Result<Message>;
async fn broadcast(&mut self, message: Message) -> Result<()>;
// Specific message helpers
async fn send_preview_request(&mut self, peer: &PeerId, hash: &Hash) -> Result<PreviewResponsePayload>;
async fn send_query(&mut self, peer: &PeerId, request: QueryRequestPayload) -> Result<QueryResponsePayload>;
async fn send_channel_open(&mut self, peer: &PeerId, request: ChannelOpenPayload) -> Result<ChannelAcceptPayload>;
async fn send_channel_close(&mut self, peer: &PeerId, request: ChannelClosePayload) -> Result<ChannelClosePayload>;
async fn broadcast_settlement_confirm(&mut self, confirm: SettleConfirmPayload) -> Result<()>;
// Peer management
fn connected_peers(&self) -> Vec<PeerId>;
fn listen_addresses(&self) -> Vec<Multiaddr>;
async fn dial(&mut self, addr: Multiaddr) -> Result<()>;
// Event loop
async fn next_event(&mut self) -> NetworkEvent;
}
pub enum NetworkEvent {
MessageReceived { peer: PeerId, message: Message },
PeerConnected(PeerId),
PeerDisconnected(PeerId),
DhtPutComplete { key: Hash, success: bool },
DhtGetResult { key: Hash, value: Option<Vec<u8>> },
}
}
Test Cases
- Bootstrap: Connect to bootstrap nodes
- DHT announce/lookup: Announce content, find it from another node by hash
- DHT remove: Remove announcement, no longer findable
- Request-response: Send query, receive response
- Timeout: Slow peer triggers timeout
- Retry: Failed request retries
- Peer discovery: Find peers through DHT
- GossipSub: Broadcast reaches subscribers
- Channel messages: Open/close flow works
- Settlement broadcast: Confirm reaches all peers