feat(federation): route state-sync / invites / notifications via FIPS first
Every federation peer-to-peer call now prefers FIPS (direct ULA dial over `fips0`, ~LAN latency) and falls back to Tor only on network failure. Per-method ed25519 signatures are preserved on both transports so authenticity doesn't change. - fips::dial::PeerRequest — fluent builder that owns transport selection. Returns the Response plus the TransportKind that carried it, so handlers can log or expose which path was used. - fips::dial::is_service_active — free-standing async probe used by migration sites (the transport::fips::is_available cache is keyed to a `&self`, not usable from static contexts). - federation/sync.rs: sync_with_peer + deploy_to_peer drop the hand-rolled reqwest::Proxy dance, call PeerRequest instead. - federation/invites.rs: notify_join takes the remote's fips_npub (already parsed out of the invite code since v1.4) and dials over FIPS when available. The "peer-joined" signature domain is unchanged. - api/rpc/federation/handlers.rs: DID rotation broadcast loops over federated peers through PeerRequest; the per-peer result payload gains a `transport` field so the UI can surface mesh vs. onion. - api/rpc/tor/mod.rs: onion-address-change propagation is now the most useful FIPS-first call — fips_npub is stable across onion rotation, so peers get the new address even when the old onion is already dead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,7 +6,7 @@ use crate::identity;
|
||||
use crate::mesh;
|
||||
use crate::network::dwn_store::DwnStore;
|
||||
use crate::nostr_handshake;
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::Result;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const FEDERATION_PROTOCOL: &str = "https://archipelago.dev/protocols/federation/v1";
|
||||
@@ -630,14 +630,6 @@ impl RpcHandler {
|
||||
|
||||
let nodes = federation::load_nodes(&self.config.data_dir).await?;
|
||||
|
||||
let proxy =
|
||||
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
|
||||
let client = reqwest::Client::builder()
|
||||
.proxy(proxy)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.build()
|
||||
.context("Failed to build HTTP client")?;
|
||||
|
||||
let mut notified = 0u32;
|
||||
let mut failed = 0u32;
|
||||
let mut results = Vec::new();
|
||||
@@ -648,13 +640,6 @@ impl RpcHandler {
|
||||
continue;
|
||||
}
|
||||
|
||||
let host = if node.onion.ends_with(".onion") {
|
||||
node.onion.clone()
|
||||
} else {
|
||||
format!("{}.onion", node.onion)
|
||||
};
|
||||
let url = format!("http://{}/rpc/v1", host);
|
||||
|
||||
let body = serde_json::json!({
|
||||
"method": "federation.peer-did-changed",
|
||||
"params": {
|
||||
@@ -666,23 +651,31 @@ impl RpcHandler {
|
||||
}
|
||||
});
|
||||
|
||||
match client.post(&url).json(&body).send().await {
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
let req = crate::fips::dial::PeerRequest::new(
|
||||
node.fips_npub.as_deref(),
|
||||
&node.onion,
|
||||
"/rpc/v1",
|
||||
)
|
||||
.timeout(std::time::Duration::from_secs(30));
|
||||
|
||||
match req.send_json(&body).await {
|
||||
Ok((resp, transport)) if resp.status().is_success() => {
|
||||
notified += 1;
|
||||
results.push(serde_json::json!({
|
||||
"did": node.did,
|
||||
"status": "ok",
|
||||
"transport": transport.to_string(),
|
||||
}));
|
||||
info!(peer_did = %node.did, "Notified peer of DID rotation");
|
||||
info!(peer_did = %node.did, transport = %transport, "Notified peer of DID rotation");
|
||||
}
|
||||
Ok(resp) => {
|
||||
Ok((resp, transport)) => {
|
||||
failed += 1;
|
||||
results.push(serde_json::json!({
|
||||
"did": node.did,
|
||||
"status": "error",
|
||||
"error": format!("Peer returned {}", resp.status()),
|
||||
"error": format!("Peer returned {} (via {})", resp.status(), transport),
|
||||
}));
|
||||
warn!(peer_did = %node.did, status = %resp.status(), "Peer rejected DID rotation notification");
|
||||
warn!(peer_did = %node.did, status = %resp.status(), transport = %transport, "Peer rejected DID rotation notification");
|
||||
}
|
||||
Err(e) => {
|
||||
failed += 1;
|
||||
|
||||
@@ -417,11 +417,13 @@ pub(super) async fn notify_federation_peers_address_change(
|
||||
return;
|
||||
}
|
||||
};
|
||||
let proxy = tor_proxy.unwrap_or("127.0.0.1:9050");
|
||||
// `tor_proxy` is retained for API compat but unused — the FIPS
|
||||
// fallback dial uses constants::TOR_SOCKS_PROXY internally.
|
||||
let _ = tor_proxy;
|
||||
match federation::load_nodes(data_dir).await {
|
||||
Ok(peers) => {
|
||||
for peer in peers {
|
||||
if peer.onion.is_empty() {
|
||||
if peer.onion.is_empty() && peer.fips_npub.is_none() {
|
||||
continue;
|
||||
}
|
||||
let payload = serde_json::json!({
|
||||
@@ -432,24 +434,19 @@ pub(super) async fn notify_federation_peers_address_change(
|
||||
"old_onion": old_onion,
|
||||
}
|
||||
});
|
||||
let url = format!("http://{}/rpc/v1", &peer.onion);
|
||||
let client = match reqwest::Client::builder()
|
||||
.proxy(
|
||||
match reqwest::Proxy::all(format!("socks5h://{}", proxy)).or_else(
|
||||
|_| reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY),
|
||||
) {
|
||||
Ok(p) => p,
|
||||
Err(_) => continue,
|
||||
},
|
||||
)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.build()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(_) => continue,
|
||||
};
|
||||
match client.post(&url).json(&payload).send().await {
|
||||
Ok(_) => info!(peer_did = %peer.did, "Notified peer of address change"),
|
||||
// FIPS-preferred: peer's fips_npub is stable across
|
||||
// onion rotation, so this notification reaches them
|
||||
// even when their (or our) old onion is now stale.
|
||||
let req = crate::fips::dial::PeerRequest::new(
|
||||
peer.fips_npub.as_deref(),
|
||||
&peer.onion,
|
||||
"/rpc/v1",
|
||||
)
|
||||
.timeout(std::time::Duration::from_secs(30));
|
||||
match req.send_json(&payload).await {
|
||||
Ok((_, transport)) => {
|
||||
info!(peer_did = %peer.did, transport = %transport, "Notified peer of address change")
|
||||
}
|
||||
Err(e) => warn!(peer_did = %peer.did, "Failed to notify peer: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,9 +180,10 @@ pub async fn accept_invite(
|
||||
});
|
||||
save_invites(data_dir, &invites).await?;
|
||||
|
||||
// Notify remote node (best-effort over Tor)
|
||||
// Notify remote node (best-effort, FIPS-first → Tor fallback)
|
||||
let _ = notify_join(
|
||||
&node.onion,
|
||||
node.fips_npub.as_deref(),
|
||||
local_did,
|
||||
local_onion,
|
||||
local_pubkey,
|
||||
@@ -195,22 +196,18 @@ pub async fn accept_invite(
|
||||
}
|
||||
|
||||
/// Best-effort notification to the remote node that we joined their federation.
|
||||
/// Signs the message with our ed25519 key so the remote peer can verify authenticity.
|
||||
/// Prefers FIPS (if the remote advertised an npub in their invite) and
|
||||
/// falls back to Tor. Signs the message with our ed25519 key so the
|
||||
/// remote peer can verify authenticity regardless of transport.
|
||||
async fn notify_join(
|
||||
remote_onion: &str,
|
||||
remote_fips_npub: Option<&str>,
|
||||
local_did: &str,
|
||||
local_onion: &str,
|
||||
local_pubkey: &str,
|
||||
local_fips_npub: Option<&str>,
|
||||
sign_fn: impl FnOnce(&[u8]) -> String,
|
||||
) -> Result<()> {
|
||||
let host = if remote_onion.ends_with(".onion") {
|
||||
remote_onion.to_string()
|
||||
} else {
|
||||
format!("{}.onion", remote_onion)
|
||||
};
|
||||
let url = format!("http://{}/rpc/v1", host);
|
||||
|
||||
// Sign the canonical message: "peer-joined:{did}:{onion}:{pubkey}"
|
||||
// Signature domain intentionally unchanged — fips_npub is carried
|
||||
// as an unsigned informational field. The FIPS daemon's own Noise
|
||||
@@ -234,15 +231,10 @@ async fn notify_join(
|
||||
"params": params,
|
||||
});
|
||||
|
||||
let proxy =
|
||||
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
|
||||
let client = reqwest::Client::builder()
|
||||
.proxy(proxy)
|
||||
let _ = crate::fips::dial::PeerRequest::new(remote_fips_npub, remote_onion, "/rpc/v1")
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.build()
|
||||
.context("Failed to build HTTP client")?;
|
||||
|
||||
let _ = client.post(&url).json(&body).send().await;
|
||||
.send_json(&body)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,26 +1,24 @@
|
||||
//! Federation state sync and remote deployment.
|
||||
//!
|
||||
//! Requests prefer FIPS (direct ULA dial, ~LAN latency) and fall back to
|
||||
//! Tor on any network failure. See `crate::fips::dial::PeerRequest` for
|
||||
//! the fallback mechanics.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use std::path::Path;
|
||||
|
||||
use super::storage::update_node_state;
|
||||
use super::types::{AppStatus, FederatedNode, NodeStateSnapshot, TrustLevel};
|
||||
use crate::fips::dial::PeerRequest;
|
||||
|
||||
/// Sync state with a single federated peer over Tor.
|
||||
/// Sync state with a single federated peer. Tries FIPS first; falls back
|
||||
/// to Tor on any transport-level failure.
|
||||
pub async fn sync_with_peer(
|
||||
data_dir: &Path,
|
||||
peer: &FederatedNode,
|
||||
local_did: &str,
|
||||
sign_fn: impl FnOnce(&[u8]) -> String,
|
||||
) -> Result<NodeStateSnapshot> {
|
||||
let host = if peer.onion.ends_with(".onion") {
|
||||
peer.onion.clone()
|
||||
} else {
|
||||
format!("{}.onion", peer.onion)
|
||||
};
|
||||
let url = format!("http://{}/rpc/v1", host);
|
||||
|
||||
// Sign current timestamp for authentication
|
||||
let timestamp = chrono::Utc::now().to_rfc3339();
|
||||
let signature = sign_fn(timestamp.as_bytes());
|
||||
|
||||
@@ -29,26 +27,17 @@ pub async fn sync_with_peer(
|
||||
"params": {}
|
||||
});
|
||||
|
||||
let proxy =
|
||||
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
|
||||
let client = reqwest::Client::builder()
|
||||
.proxy(proxy)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.build()
|
||||
.context("Failed to build HTTP client")?;
|
||||
|
||||
let resp = client
|
||||
.post(&url)
|
||||
let (resp, transport) = PeerRequest::new(peer.fips_npub.as_deref(), &peer.onion, "/rpc/v1")
|
||||
.header("X-Federation-DID", local_did)
|
||||
.header("X-Federation-Sig", &signature)
|
||||
.header("X-Federation-Timestamp", ×tamp)
|
||||
.json(&body)
|
||||
.send()
|
||||
.header("X-Federation-Sig", signature)
|
||||
.header("X-Federation-Timestamp", timestamp)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.send_json(&body)
|
||||
.await
|
||||
.context("Failed to reach federated peer")?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("Peer returned {}", resp.status());
|
||||
anyhow::bail!("Peer returned {} (via {})", resp.status(), transport);
|
||||
}
|
||||
|
||||
let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?;
|
||||
@@ -109,13 +98,6 @@ pub async fn deploy_to_peer(
|
||||
);
|
||||
}
|
||||
|
||||
let host = if peer.onion.ends_with(".onion") {
|
||||
peer.onion.clone()
|
||||
} else {
|
||||
format!("{}.onion", peer.onion)
|
||||
};
|
||||
let url = format!("http://{}/rpc/v1", host);
|
||||
|
||||
let timestamp = chrono::Utc::now().to_rfc3339();
|
||||
let signature = sign_fn(timestamp.as_bytes());
|
||||
|
||||
@@ -128,26 +110,17 @@ pub async fn deploy_to_peer(
|
||||
}
|
||||
});
|
||||
|
||||
let proxy =
|
||||
reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY).context("Invalid Tor proxy")?;
|
||||
let client = reqwest::Client::builder()
|
||||
.proxy(proxy)
|
||||
.timeout(std::time::Duration::from_secs(120))
|
||||
.build()
|
||||
.context("Failed to build HTTP client")?;
|
||||
|
||||
let resp = client
|
||||
.post(&url)
|
||||
let (resp, transport) = PeerRequest::new(peer.fips_npub.as_deref(), &peer.onion, "/rpc/v1")
|
||||
.header("X-Federation-DID", local_did)
|
||||
.header("X-Federation-Sig", &signature)
|
||||
.header("X-Federation-Timestamp", ×tamp)
|
||||
.json(&body)
|
||||
.send()
|
||||
.header("X-Federation-Sig", signature)
|
||||
.header("X-Federation-Timestamp", timestamp)
|
||||
.timeout(std::time::Duration::from_secs(120))
|
||||
.send_json(&body)
|
||||
.await
|
||||
.context("Failed to reach federated peer for deploy")?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("Remote node returned HTTP {}", resp.status());
|
||||
anyhow::bail!("Remote node returned HTTP {} (via {})", resp.status(), transport);
|
||||
}
|
||||
|
||||
let result: serde_json::Value = resp.json().await.context("Invalid response from peer")?;
|
||||
|
||||
@@ -209,6 +209,194 @@ pub fn as_ip_addr(v6: Ipv6Addr) -> IpAddr {
|
||||
IpAddr::V6(v6)
|
||||
}
|
||||
|
||||
// ── High-level peer request helpers ────────────────────────────────────
|
||||
|
||||
/// Quick poll: is the FIPS daemon (archipelago-supervised OR upstream)
|
||||
/// currently `systemctl is-active`? Async wrapper intended for the
|
||||
/// migration call sites; unlike `FipsTransport::is_available` this does
|
||||
/// not maintain a cache, so callers that poll frequently should cache
|
||||
/// themselves.
|
||||
pub async fn is_service_active() -> bool {
|
||||
for unit in [
|
||||
crate::fips::SERVICE_UNIT,
|
||||
crate::fips::UPSTREAM_SERVICE_UNIT,
|
||||
] {
|
||||
if crate::fips::service::unit_state(unit).await == "active" {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Builder for a peer request that may be sent over FIPS (preferred) or
|
||||
/// Tor (fallback). The call sites migrating off direct-Tor dialing build
|
||||
/// one of these and call [`send_json`] / [`send_get`]; the helper handles
|
||||
/// dial, timeout, fallback, and cross-transport auth headers.
|
||||
pub struct PeerRequest<'a> {
|
||||
pub fips_npub: Option<&'a str>,
|
||||
pub onion_host: &'a str,
|
||||
pub path: &'a str,
|
||||
pub headers: Vec<(&'a str, String)>,
|
||||
pub timeout: std::time::Duration,
|
||||
}
|
||||
|
||||
impl<'a> PeerRequest<'a> {
|
||||
pub fn new(
|
||||
fips_npub: Option<&'a str>,
|
||||
onion_host: &'a str,
|
||||
path: &'a str,
|
||||
) -> Self {
|
||||
Self {
|
||||
fips_npub,
|
||||
onion_host,
|
||||
path,
|
||||
headers: Vec::new(),
|
||||
timeout: std::time::Duration::from_secs(30),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn header(mut self, name: &'a str, value: impl Into<String>) -> Self {
|
||||
self.headers.push((name, value.into()));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn timeout(mut self, t: std::time::Duration) -> Self {
|
||||
self.timeout = t;
|
||||
self
|
||||
}
|
||||
|
||||
/// POST a JSON body. Returns the `reqwest::Response` — caller decides
|
||||
/// how to interpret the status code.
|
||||
pub async fn send_json<B: serde::Serialize>(
|
||||
&self,
|
||||
body: &B,
|
||||
) -> Result<(reqwest::Response, crate::transport::TransportKind)> {
|
||||
if let Some(resp) = self.try_fips_post_json(body).await? {
|
||||
return Ok((resp, crate::transport::TransportKind::Fips));
|
||||
}
|
||||
let resp = self.send_tor_post_json(body).await?;
|
||||
Ok((resp, crate::transport::TransportKind::Tor))
|
||||
}
|
||||
|
||||
/// GET with optional header-based auth.
|
||||
pub async fn send_get(
|
||||
&self,
|
||||
) -> Result<(reqwest::Response, crate::transport::TransportKind)> {
|
||||
if let Some(resp) = self.try_fips_get().await? {
|
||||
return Ok((resp, crate::transport::TransportKind::Fips));
|
||||
}
|
||||
let resp = self.send_tor_get().await?;
|
||||
Ok((resp, crate::transport::TransportKind::Tor))
|
||||
}
|
||||
|
||||
async fn try_fips_post_json<B: serde::Serialize>(
|
||||
&self,
|
||||
body: &B,
|
||||
) -> Result<Option<reqwest::Response>> {
|
||||
let Some(npub) = self.fips_npub else {
|
||||
return Ok(None);
|
||||
};
|
||||
if !is_service_active().await {
|
||||
return Ok(None);
|
||||
}
|
||||
let base = match peer_base_url(npub).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
tracing::debug!("FIPS resolve for {} failed: {}", npub, e);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
let url = format!("{}{}", base, self.path);
|
||||
let c = client();
|
||||
let mut rb = c.post(&url).json(body);
|
||||
for (k, v) in &self.headers {
|
||||
rb = rb.header(*k, v);
|
||||
}
|
||||
match rb.send().await {
|
||||
Ok(r) => Ok(Some(r)),
|
||||
Err(e) => {
|
||||
tracing::debug!("FIPS POST {} failed: {}, falling back to Tor", url, e);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_fips_get(&self) -> Result<Option<reqwest::Response>> {
|
||||
let Some(npub) = self.fips_npub else {
|
||||
return Ok(None);
|
||||
};
|
||||
if !is_service_active().await {
|
||||
return Ok(None);
|
||||
}
|
||||
let base = match peer_base_url(npub).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
tracing::debug!("FIPS resolve for {} failed: {}", npub, e);
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
let url = format!("{}{}", base, self.path);
|
||||
let c = client();
|
||||
let mut rb = c.get(&url);
|
||||
for (k, v) in &self.headers {
|
||||
rb = rb.header(*k, v);
|
||||
}
|
||||
match rb.send().await {
|
||||
Ok(r) => Ok(Some(r)),
|
||||
Err(e) => {
|
||||
tracing::debug!("FIPS GET {} failed: {}, falling back to Tor", url, e);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_tor_post_json<B: serde::Serialize>(
|
||||
&self,
|
||||
body: &B,
|
||||
) -> Result<reqwest::Response> {
|
||||
let url = self.tor_url();
|
||||
let client = self.tor_client()?;
|
||||
let mut rb = client.post(&url).json(body);
|
||||
for (k, v) in &self.headers {
|
||||
rb = rb.header(*k, v);
|
||||
}
|
||||
rb.send()
|
||||
.await
|
||||
.with_context(|| format!("Tor POST {}", url))
|
||||
}
|
||||
|
||||
async fn send_tor_get(&self) -> Result<reqwest::Response> {
|
||||
let url = self.tor_url();
|
||||
let client = self.tor_client()?;
|
||||
let mut rb = client.get(&url);
|
||||
for (k, v) in &self.headers {
|
||||
rb = rb.header(*k, v);
|
||||
}
|
||||
rb.send()
|
||||
.await
|
||||
.with_context(|| format!("Tor GET {}", url))
|
||||
}
|
||||
|
||||
fn tor_url(&self) -> String {
|
||||
let host = if self.onion_host.ends_with(".onion") {
|
||||
self.onion_host.to_string()
|
||||
} else {
|
||||
format!("{}.onion", self.onion_host)
|
||||
};
|
||||
format!("http://{}{}", host, self.path)
|
||||
}
|
||||
|
||||
fn tor_client(&self) -> Result<reqwest::Client> {
|
||||
let proxy = reqwest::Proxy::all(crate::constants::TOR_SOCKS_PROXY)
|
||||
.context("Invalid Tor SOCKS proxy URL")?;
|
||||
reqwest::Client::builder()
|
||||
.proxy(proxy)
|
||||
.timeout(self.timeout)
|
||||
.build()
|
||||
.context("Build Tor HTTP client")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user