diff --git a/core/archipelago/src/api/rpc/mesh/typed_messages.rs b/core/archipelago/src/api/rpc/mesh/typed_messages.rs index 07e70357..0b60546b 100644 --- a/core/archipelago/src/api/rpc/mesh/typed_messages.rs +++ b/core/archipelago/src/api/rpc/mesh/typed_messages.rs @@ -725,34 +725,29 @@ impl RpcHandler { })); } - // Reach the sender over Tor. Onion host is used verbatim; cap/exp/peer - // match what the sender signed in handle_mesh_send_content. - let url = format!( - "http://{}/blob/{}?cap={}&exp={}&peer={}", - sender_onion - .trim_start_matches("http://") - .trim_start_matches("https://"), - cid, - cap_token, - cap_exp, - self_pubkey_hex, + // Reach the sender: FIPS preferred when the sender is federated + // and has advertised a FIPS npub, Tor fallback otherwise. + // Cap/exp/peer in the query string match what the sender signed in + // handle_mesh_send_content — signature domain unchanged. + let onion_bare = sender_onion + .trim_start_matches("http://") + .trim_start_matches("https://") + .to_string(); + let path = format!( + "/blob/{}?cap={}&exp={}&peer={}", + cid, cap_token, cap_exp, self_pubkey_hex ); + let fips_npub = + crate::federation::fips_npub_for_onion(&self.config.data_dir, &onion_bare).await; - let socks_proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) - .map_err(|e| anyhow::anyhow!("SOCKS proxy setup failed: {}", e))?; - let client = reqwest::Client::builder() - .proxy(socks_proxy) - .timeout(std::time::Duration::from_secs(120)) - .build() - .map_err(|e| anyhow::anyhow!("HTTP client build failed: {}", e))?; - - let resp = client - .get(&url) - .send() - .await - .map_err(|e| anyhow::anyhow!("Fetch failed: {}", e))?; + let (resp, transport) = + crate::fips::dial::PeerRequest::new(fips_npub.as_deref(), &onion_bare, &path) + .timeout(std::time::Duration::from_secs(120)) + .send_get() + .await + .map_err(|e| anyhow::anyhow!("Fetch failed: {}", e))?; if !resp.status().is_success() { - anyhow::bail!("Blob fetch HTTP {}", resp.status()); + anyhow::bail!("Blob fetch HTTP {} (via {})", resp.status(), transport); } let mime = resp .headers() @@ -777,7 +772,7 @@ impl RpcHandler { "/blob/{}?cap={}&exp={}&peer={}", meta.cid, local_cap, local_exp, self_pubkey_hex ); - info!(cid = %cid, size = meta.size, "Fetched content_ref blob via tor"); + info!(cid = %cid, size = meta.size, transport = %transport, "Fetched content_ref blob"); Ok(serde_json::json!({ "fetched": true, "cached": false, diff --git a/core/archipelago/src/api/rpc/network.rs b/core/archipelago/src/api/rpc/network.rs index b88b7a86..c07cbc85 100644 --- a/core/archipelago/src/api/rpc/network.rs +++ b/core/archipelago/src/api/rpc/network.rs @@ -127,8 +127,11 @@ impl RpcHandler { "message": message, }); + let to_fips_npub = + crate::federation::fips_npub_for_onion(&self.config.data_dir, to_onion).await; crate::node_message::send_to_peer( to_onion, + to_fips_npub.as_deref(), my_pubkey, &req_msg.to_string(), None, diff --git a/core/archipelago/src/api/rpc/peers.rs b/core/archipelago/src/api/rpc/peers.rs index 66b9bfa2..b3053346 100644 --- a/core/archipelago/src/api/rpc/peers.rs +++ b/core/archipelago/src/api/rpc/peers.rs @@ -114,20 +114,20 @@ impl RpcHandler { let fed_nodes = federation::load_nodes(&self.config.data_dir) .await .unwrap_or_default(); - let recipient_pubkey = fed_nodes - .iter() - .find(|n| { - n.onion == onion - || n.onion == format!("{}.onion", onion) - || format!("{}.onion", n.onion) == onion - }) - .map(|n| n.pubkey.clone()); + let recipient = fed_nodes.iter().find(|n| { + n.onion == onion + || n.onion == format!("{}.onion", onion) + || format!("{}.onion", n.onion) == onion + }); + let recipient_pubkey = recipient.map(|n| n.pubkey.clone()); + let recipient_fips_npub = recipient.and_then(|n| n.fips_npub.clone()); // Include our node name so the recipient can display it let node_name = data.server_info.name.clone(); node_message::send_to_peer( onion, + recipient_fips_npub.as_deref(), &pubkey, message, Some(node_id.signing_key()), @@ -147,7 +147,9 @@ impl RpcHandler { .get("onion") .and_then(|v| v.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing onion"))?; - let reachable = node_message::check_peer_reachable(onion) + let fips_npub = + crate::federation::fips_npub_for_onion(&self.config.data_dir, onion).await; + let reachable = node_message::check_peer_reachable(onion, fips_npub.as_deref()) .await .unwrap_or(false); Ok(serde_json::json!({ "onion": onion, "reachable": reachable })) diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 6e97b1f5..85faf38e 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -827,16 +827,10 @@ impl MeshService { use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; use ed25519_dalek::Signer; - let host = if peer_onion.ends_with(".onion") { - peer_onion.to_string() - } else { - format!("{}.onion", peer_onion.trim_end_matches('/')) - }; - let url = format!("http://{}/archipelago/mesh-typed", host); - // Sign the raw wire bytes so the receiver can attribute the envelope - // to our pubkey even when it arrives over federation/Tor rather than - // the radio. Signature covers the wire only — the receiver re-hashes. + // to our pubkey even when it arrives over federation/FIPS/Tor rather + // than the radio. Signature covers the wire only — the receiver + // re-hashes. let signature = hex::encode(self.signing_key.sign(&wire).to_bytes()); let wire_b64 = BASE64.encode(&wire); let body = serde_json::json!({ @@ -857,33 +851,29 @@ impl MeshService { ) .await; - // Fire the Tor POST in the background. Failures are logged but do - // not propagate — the caller has already been handed the Sent + // Fire the send in the background. FIPS is preferred when the peer + // is federated and running fips; Tor is the fallback. Failures are + // logged but do not propagate — caller already has the Sent // MeshMessage and the UI's delivery indicator tracks the receipt. + let peer_onion_owned = peer_onion.to_string(); + let data_dir_owned = self.data_dir.clone(); tokio::spawn(async move { - let proxy = match reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) { - Ok(p) => p, - Err(e) => { - warn!(contact_id, "Invalid Tor proxy: {}", e); - return; + let fips_npub = + crate::federation::fips_npub_for_onion(&data_dir_owned, &peer_onion_owned).await; + let req = crate::fips::dial::PeerRequest::new( + fips_npub.as_deref(), + &peer_onion_owned, + "/archipelago/mesh-typed", + ) + .timeout(std::time::Duration::from_secs(120)); + match req.send_json(&body).await { + Ok((resp, transport)) if resp.status().is_success() => { + tracing::debug!(contact_id, transport = %transport, "Federation envelope delivered"); } - }; - let client = match reqwest::Client::builder() - .proxy(proxy) - .timeout(std::time::Duration::from_secs(120)) - .build() - { - Ok(c) => c, - Err(e) => { - warn!(contact_id, "HTTP client build failed: {}", e); - return; - } - }; - match client.post(&url).json(&body).send().await { - Ok(resp) if resp.status().is_success() => {} - Ok(resp) => warn!( + Ok((resp, transport)) => warn!( contact_id, status = %resp.status(), + transport = %transport, "Peer rejected federation-routed envelope" ), Err(e) => warn!(contact_id, "Federation POST failed: {}", e), diff --git a/core/archipelago/src/network/dwn_sync.rs b/core/archipelago/src/network/dwn_sync.rs index c033628c..7f0c9b58 100644 --- a/core/archipelago/src/network/dwn_sync.rs +++ b/core/archipelago/src/network/dwn_sync.rs @@ -104,16 +104,6 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result< state.status = SyncStatus::Syncing; save_sync_state(data_dir, &state).await?; - let socks_proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY) - .context("Failed to create SOCKS proxy")?; - - let client = reqwest::Client::builder() - .proxy(socks_proxy) - .connect_timeout(std::time::Duration::from_secs(15)) - .timeout(std::time::Duration::from_secs(30)) - .build() - .context("Failed to build Tor HTTP client")?; - let store = DwnStore::new(data_dir).await?; let mut synced_count = 0u64; @@ -142,7 +132,15 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result< // Overall sync timeout: 90 seconds let sync_future = async { for onion in &unique_onions { - match sync_single_peer(&client, &store, onion, &local_messages, &state.last_sync).await + let fips_npub = crate::federation::fips_npub_for_onion(data_dir, onion).await; + match sync_single_peer( + fips_npub.as_deref(), + &store, + onion, + &local_messages, + &state.last_sync, + ) + .await { Ok(count) => { debug!(peer = %onion, messages = count, "Peer sync complete"); @@ -173,29 +171,28 @@ pub async fn sync_with_peers(data_dir: &Path, peer_onions: &[String]) -> Result< } /// Sync with a single peer: pull their messages and push ours. +/// Each HTTP call picks FIPS when a npub is known, otherwise Tor. async fn sync_single_peer( - client: &reqwest::Client, + fips_npub: Option<&str>, store: &crate::network::dwn_store::DwnStore, onion: &str, local_messages: &[crate::network::dwn_store::DwnMessage], last_sync: &Option, ) -> Result { - let base_url = format!("http://{}", onion); + use crate::fips::dial::PeerRequest; let mut imported = 0u64; // Step 1: Check peer health - let health_url = format!("{}/dwn/health", base_url); - let res = client - .get(&health_url) - .send() + let (health_resp, _) = PeerRequest::new(fips_npub, onion, "/dwn/health") + .timeout(std::time::Duration::from_secs(30)) + .send_get() .await .context("Peer DWN unreachable")?; - if !res.status().is_success() { + if !health_resp.status().is_success() { return Err(anyhow::anyhow!("Peer DWN not healthy")); } // Step 2: Pull — query peer for messages since our last sync - let dwn_url = format!("{}/dwn", base_url); let mut query_filter = serde_json::json!({}); if let Some(ref since) = last_sync { query_filter = serde_json::json!({ "dateSort": "createdAscending", "dateFrom": since }); @@ -210,10 +207,9 @@ async fn sync_single_peer( }] }); - let pull_res = client - .post(&dwn_url) - .json(&pull_body) - .send() + let (pull_res, _) = PeerRequest::new(fips_npub, onion, "/dwn") + .timeout(std::time::Duration::from_secs(30)) + .send_json(&pull_body) .await .context("Failed to query peer DWN")?; @@ -267,10 +263,14 @@ async fn sync_single_peer( let push_body = serde_json::json!({ "messages": messages }); - // Best-effort push — don't fail the whole sync if a batch fails - match client.post(&dwn_url).json(&push_body).send().await { - Ok(_) => { - debug!(count = chunk.len(), "Pushed message batch to peer"); + // Best-effort push — don't fail the whole sync if a batch fails. + match PeerRequest::new(fips_npub, onion, "/dwn") + .timeout(std::time::Duration::from_secs(30)) + .send_json(&push_body) + .await + { + Ok((_, t)) => { + debug!(count = chunk.len(), transport = %t, "Pushed message batch to peer"); } Err(e) => { debug!(error = %e, "Failed to push message batch to peer"); diff --git a/core/archipelago/src/node_message.rs b/core/archipelago/src/node_message.rs index d8d5ac74..5f585223 100644 --- a/core/archipelago/src/node_message.rs +++ b/core/archipelago/src/node_message.rs @@ -244,6 +244,7 @@ fn validate_onion(onion: &str) -> Result<()> { /// derived from both nodes' ed25519 keys. pub async fn send_to_peer( onion: &str, + fips_npub: Option<&str>, from_pubkey: &str, message: &str, signing_key: Option<&ed25519_dalek::SigningKey>, @@ -252,13 +253,6 @@ pub async fn send_to_peer( ) -> Result<()> { validate_onion(onion)?; - let host = if onion.ends_with(".onion") { - onion.to_string() - } else { - format!("{}.onion", onion) - }; - let url = format!("http://{}/archipelago/node-message", host); - // Encrypt message if we have both keys let (payload_message, encrypted) = match (signing_key, recipient_pubkey) { (Some(sk), Some(rpk)) => match encrypt_for_peer(sk, rpk, message) { @@ -281,57 +275,46 @@ pub async fn send_to_peer( body["from_name"] = serde_json::Value::String(name.to_string()); } - let proxy = - reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; - let client = reqwest::Client::builder() - .proxy(proxy) - .timeout(std::time::Duration::from_secs(60)) - .build() - .context("Failed to build HTTP client")?; - - let resp = client.post(&url).json(&body).send().await.map_err(|e| { + let (resp, transport) = crate::fips::dial::PeerRequest::new( + fips_npub, + onion, + "/archipelago/node-message", + ) + .timeout(std::time::Duration::from_secs(60)) + .send_json(&body) + .await + .map_err(|e| { let msg = e.to_string(); if msg.contains("connection refused") || msg.contains("Connection refused") { - anyhow::anyhow!("Tor not reachable at 127.0.0.1:9050. Is Tor running?") + anyhow::anyhow!("Peer unreachable. Check Tor (127.0.0.1:9050) and FIPS daemon status.") } else if msg.contains("timeout") || msg.contains("timed out") { - anyhow::anyhow!( - "Connection timed out. The peer may be offline or unreachable over Tor." - ) + anyhow::anyhow!("Connection timed out. The peer may be offline.") } else { - anyhow::anyhow!("Failed to send over Tor: {}", msg) + anyhow::anyhow!("Failed to send: {}", msg) } })?; if !resp.status().is_success() { anyhow::bail!( - "Peer returned {} {}. The peer may need /archipelago/ in its nginx config.", + "Peer returned {} {} (via {}). The peer may need /archipelago/ in its nginx config.", resp.status().as_u16(), - resp.status().canonical_reason().unwrap_or("") + resp.status().canonical_reason().unwrap_or(""), + transport, ); } Ok(()) } -/// Check if a peer is reachable (ping over Tor). -pub async fn check_peer_reachable(onion: &str) -> Result { +/// Check if a peer is reachable (ping). FIPS is preferred when an npub +/// is known, Tor is the fallback. +pub async fn check_peer_reachable(onion: &str, fips_npub: Option<&str>) -> Result { validate_onion(onion)?; - - let host = if onion.ends_with(".onion") { - onion.to_string() - } else { - format!("{}.onion", onion) - }; - let url = format!("http://{}/health", host); - let proxy = - reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?; - let client = reqwest::Client::builder() - .proxy(proxy) + match crate::fips::dial::PeerRequest::new(fips_npub, onion, "/health") .timeout(std::time::Duration::from_secs(30)) - .build() - .context("Failed to build HTTP client")?; - - match client.get(&url).send().await { - Ok(resp) => Ok(resp.status().is_success()), + .send_get() + .await + { + Ok((resp, _)) => Ok(resp.status().is_success()), Err(_) => Ok(false), } } diff --git a/core/archipelago/src/server.rs b/core/archipelago/src/server.rs index 30f0d489..fe61e093 100644 --- a/core/archipelago/src/server.rs +++ b/core/archipelago/src/server.rs @@ -727,9 +727,11 @@ async fn check_peer_health(state: &StateManager, data_dir: &std::path::Path) -> let mut new_health = std::collections::HashMap::new(); for peer in &known_peers { - let reachable = node_message::check_peer_reachable(&peer.onion) - .await - .unwrap_or(false); + let fips_npub = crate::federation::fips_npub_for_onion(data_dir, &peer.onion).await; + let reachable = + node_message::check_peer_reachable(&peer.onion, fips_npub.as_deref()) + .await + .unwrap_or(false); new_health.insert(peer.onion.clone(), reachable); }