refactor: split package.rs, mod.rs, listener.rs, and lnd.rs into focused submodules

- R35: Split package.rs (1794 lines) into package/{mod,config,validation,lifecycle}.rs
- R36: Split mesh/listener.rs (1799 lines) into listener/{mod,session,frames,decode,dispatch,bitcoin}.rs
- R37: Split rpc/mod.rs into mod.rs + dispatcher.rs, middleware.rs, response.rs (54% reduction)
- R38: Split lnd.rs (1064 lines) into lnd/{mod,info,channels,wallet,payments}.rs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-03-21 02:26:28 +00:00
parent 8e4d352393
commit 77f550fb5e
21 changed files with 4780 additions and 4374 deletions

View File

@@ -0,0 +1,395 @@
use super::RpcHandler;
use anyhow::Result;
impl RpcHandler {
/// Route an RPC method name to its handler, returning the result value.
pub(super) async fn dispatch(
&self,
method: &str,
params: Option<serde_json::Value>,
session_token: &Option<String>,
) -> Result<serde_json::Value> {
match method {
"echo" => self.handle_echo(params).await,
"server.echo" => self.handle_echo(params).await,
"health" => self.handle_health().await,
"auth.login" => self.handle_auth_login(params).await,
"auth.logout" => self.handle_auth_logout().await,
"auth.changePassword" => self.handle_auth_change_password(params, session_token).await,
"auth.onboardingComplete" => self.handle_auth_onboarding_complete().await,
"auth.isOnboardingComplete" => self.handle_auth_is_onboarding_complete().await,
"auth.resetOnboarding" => self.handle_auth_reset_onboarding(params).await,
// Container orchestration (for Archipelago-managed containers)
"container-install" => self.handle_container_install(params).await,
"container-start" => self.handle_container_start(params).await,
"container-stop" => self.handle_container_stop(params).await,
"container-remove" => self.handle_container_remove(params).await,
"container-list" => self.handle_container_list().await,
"container-status" => self.handle_container_status(params).await,
"container-logs" => self.handle_container_logs(params).await,
"container-health" => self.handle_container_health(params).await,
// Package management (for docker-compose apps)
"package.install" => self.handle_package_install(params).await,
"package.start" => self.handle_package_start(params).await,
"package.stop" => self.handle_package_stop(params).await,
"package.restart" => self.handle_package_restart(params).await,
"package.uninstall" => self.handle_package_uninstall(params).await,
// Bundled app management (for pre-loaded container images)
"bundled-app-start" => self.handle_bundled_app_start(params).await,
"bundled-app-stop" => self.handle_bundled_app_stop(params).await,
// Node identity and P2P peers
"node-add-peer" => self.handle_node_add_peer(params).await,
"node-list-peers" => self.handle_node_list_peers().await,
"node-remove-peer" => self.handle_node_remove_peer(params).await,
"node-send-message" => self.handle_node_send_message(params).await,
"node-check-peer" => self.handle_node_check_peer(params).await,
"node-messages-received" => self.handle_node_messages_received().await,
"node-store-sent" => self.handle_node_store_sent(params).await,
"node-nostr-discover" => self.handle_node_nostr_discover().await,
"node.did" => self.handle_node_did().await,
"node.signChallenge" => self.handle_node_sign_challenge(params).await,
"node.createBackup" => self.handle_node_create_backup(params).await,
"node.tor-address" => self.handle_node_tor_address().await,
"node.nostr-publish" => self.handle_node_nostr_publish().await,
"node.nostr-pubkey" => self.handle_node_nostr_pubkey().await,
"node.nostr-sign" => self.handle_node_nostr_sign(params).await,
"node-nostr-verify-revoked" => self.handle_node_nostr_verify_revoked().await,
"node.rotate-did" => self.handle_node_rotate_did(params).await,
// Encrypted peer handshake (NIP-44)
"handshake.discover" => self.handle_handshake_discover().await,
"handshake.connect" => self.handle_handshake_connect(params).await,
"handshake.poll" => self.handle_handshake_poll().await,
// TOTP 2FA
"auth.totp.setup.begin" => self.handle_totp_setup_begin(params).await,
"auth.totp.setup.confirm" => self.handle_totp_setup_confirm(params).await,
"auth.totp.disable" => self.handle_totp_disable(params).await,
"auth.totp.status" => self.handle_totp_status().await,
"auth.login.totp" => self.handle_login_totp(params, session_token).await,
"auth.login.backup" => self.handle_login_backup(params, session_token).await,
// Bitcoin & Lightning deep data
"bitcoin.getinfo" => self.handle_bitcoin_getinfo().await,
"lnd.getinfo" => self.handle_lnd_getinfo().await,
"lnd.listchannels" => self.handle_lnd_listchannels().await,
"lnd.openchannel" => self.handle_lnd_openchannel(params).await,
"lnd.closechannel" => self.handle_lnd_closechannel(params).await,
"lnd.newaddress" => self.handle_lnd_newaddress().await,
"lnd.sendcoins" => self.handle_lnd_sendcoins(params).await,
"lnd.createinvoice" => self.handle_lnd_createinvoice(params).await,
"lnd.payinvoice" => self.handle_lnd_payinvoice(params).await,
"lnd.create-psbt" => self.handle_lnd_create_psbt(params).await,
"lnd.finalize-psbt" => self.handle_lnd_finalize_psbt(params).await,
"lnd.create-raw-tx" => self.handle_lnd_create_raw_tx(params).await,
"lnd.gettransactions" => self.handle_lnd_gettransactions().await,
"lnd.connect-info" => self.handle_lnd_connect_info().await,
"lnd.export-channel-backup" => self.handle_lnd_export_channel_backup().await,
// Multi-identity management
"identity.list" => self.handle_identity_list(params).await,
"identity.create" => self.handle_identity_create(params).await,
"identity.get" => self.handle_identity_get(params).await,
"identity.delete" => self.handle_identity_delete(params).await,
"identity.set-default" => self.handle_identity_set_default(params).await,
"identity.sign" => self.handle_identity_sign(params).await,
"identity.verify" => self.handle_identity_verify(params).await,
"identity.resolve-did" => self.handle_identity_resolve_did(params).await,
"identity.resolve-remote-did" => self.handle_identity_resolve_remote_did(params).await,
"identity.verify-did-document" => self.handle_identity_verify_did_document(params).await,
"identity.create-dht-did" => self.handle_identity_create_dht_did(params).await,
"identity.resolve-dht-did" => self.handle_identity_resolve_dht_did(params).await,
"identity.refresh-dht-did" => self.handle_identity_refresh_dht_did(params).await,
"identity.dht-status" => self.handle_identity_dht_status(params).await,
"identity.update-profile" => self.handle_identity_update_profile(params).await,
"identity.publish-profile" => self.handle_identity_publish_profile(params).await,
"identity.export-keys" => self.handle_identity_export_keys(params).await,
"identity.create-nostr-key" => self.handle_identity_create_nostr_key(params).await,
"identity.nostr-sign" => self.handle_identity_nostr_sign(params).await,
"identity.nostr-encrypt-nip04" => self.handle_identity_nostr_encrypt_nip04(params).await,
"identity.nostr-decrypt-nip04" => self.handle_identity_nostr_decrypt_nip04(params).await,
"identity.nostr-encrypt-nip44" => self.handle_identity_nostr_encrypt_nip44(params).await,
"identity.nostr-decrypt-nip44" => self.handle_identity_nostr_decrypt_nip44(params).await,
// Bitcoin domain names (NIP-05)
"identity.register-name" => self.handle_identity_register_name(params).await,
"identity.remove-name" => self.handle_identity_remove_name(params).await,
"identity.resolve-name" => self.handle_identity_resolve_name(params).await,
"identity.list-names" => self.handle_identity_list_names(params).await,
"identity.link-name" => self.handle_identity_link_name(params).await,
// Verifiable Credentials
"identity.issue-credential" => self.handle_identity_issue_credential(params).await,
"identity.verify-credential" => self.handle_identity_verify_credential(params).await,
"identity.list-credentials" => self.handle_identity_list_credentials(params).await,
"identity.revoke-credential" => self.handle_identity_revoke_credential(params).await,
"identity.create-presentation" => self.handle_identity_create_presentation(params).await,
"identity.verify-presentation" => self.handle_identity_verify_presentation(params).await,
// Network overlay
"network.get-visibility" => self.handle_network_get_visibility().await,
"network.set-visibility" => self.handle_network_set_visibility(params).await,
"network.request-connection" => self.handle_network_request_connection(params).await,
"network.list-requests" => self.handle_network_list_requests().await,
"network.accept-request" => self.handle_network_accept_request(params).await,
"network.reject-request" => self.handle_network_reject_request(params).await,
// Tor hidden services
"tor.list-services" => self.handle_tor_list_services().await,
"tor.create-service" => self.handle_tor_create_service(params).await,
"tor.delete-service" => self.handle_tor_delete_service(params).await,
"tor.get-onion-address" => self.handle_tor_get_onion_address(params).await,
"tor.rotate-service" => self.handle_tor_rotate_service(params).await,
"tor.cleanup-rotated" => self.handle_tor_cleanup_rotated().await,
"tor.toggle-app" => self.handle_tor_toggle_app(params).await,
"tor.restart" => self.handle_tor_restart().await,
// Nostr relay management
"nostr.list-relays" => self.handle_nostr_list_relays().await,
"nostr.add-relay" => self.handle_nostr_add_relay(params).await,
"nostr.remove-relay" => self.handle_nostr_remove_relay(params).await,
"nostr.toggle-relay" => self.handle_nostr_toggle_relay(params).await,
"nostr.get-stats" => self.handle_nostr_get_stats().await,
// Router / UPnP
"router.discover" => self.handle_router_discover().await,
"router.list-forwards" => self.handle_router_list_forwards().await,
"router.add-forward" => self.handle_router_add_forward(params).await,
"router.remove-forward" => self.handle_router_remove_forward(params).await,
"network.diagnostics" => self.handle_network_diagnostics().await,
"network.list-interfaces" => self.handle_network_list_interfaces().await,
"network.scan-wifi" => self.handle_network_scan_wifi().await,
"network.configure-wifi" => self.handle_network_configure_wifi(params).await,
"network.configure-ethernet" => self.handle_network_configure_ethernet(params).await,
"network.dns-status" => self.handle_network_dns_status().await,
"network.configure-dns" => self.handle_network_configure_dns(params).await,
"router.detect" => self.handle_router_detect(params).await,
"router.info" => self.handle_router_info().await,
"router.configure" => self.handle_router_configure(params).await,
// Ecash wallet
"wallet.ecash-balance" => self.handle_wallet_ecash_balance().await,
"wallet.ecash-mint" => self.handle_wallet_ecash_mint(params).await,
"wallet.ecash-melt" => self.handle_wallet_ecash_melt(params).await,
"wallet.ecash-send" => self.handle_wallet_ecash_send(params).await,
"wallet.ecash-receive" => self.handle_wallet_ecash_receive(params).await,
"wallet.ecash-history" => self.handle_wallet_ecash_history().await,
"wallet.networking-profits" => self.handle_wallet_networking_profits().await,
// Content catalog management
"content.list-mine" => self.handle_content_list_mine().await,
"content.add" => self.handle_content_add(params).await,
"content.remove" => self.handle_content_remove(params).await,
"content.set-pricing" => self.handle_content_set_pricing(params).await,
"content.set-availability" => self.handle_content_set_availability(params).await,
"content.browse-peer" => self.handle_content_browse_peer(params).await,
"content.download-peer" => self.handle_content_download_peer(params).await,
// DWN (Decentralized Web Node)
"dwn.status" => self.handle_dwn_status().await,
"dwn.sync" => self.handle_dwn_sync().await,
"dwn.register-protocol" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_register_protocol(&p).await
}
"dwn.list-protocols" => self.handle_dwn_list_protocols().await,
"dwn.remove-protocol" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_remove_protocol(&p).await
}
"dwn.query-messages" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_query_messages(&p).await
}
"dwn.write-message" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_dwn_write_message(&p).await
}
// Federation
"federation.invite" => self.handle_federation_invite().await,
"federation.join" => self.handle_federation_join(params).await,
"federation.list-nodes" => self.handle_federation_list_nodes().await,
"federation.remove-node" => self.handle_federation_remove_node(params).await,
"federation.set-trust" => self.handle_federation_set_trust(params).await,
"federation.sync-state" => self.handle_federation_sync_state().await,
"federation.get-state" => self.handle_federation_get_state().await,
"federation.peer-joined" => self.handle_federation_peer_joined(params).await,
"federation.deploy-app" => self.handle_federation_deploy_app(params).await,
"federation.peer-address-changed" => self.handle_federation_peer_address_changed(params).await,
"federation.notify-did-change" => self.handle_federation_notify_did_change(params).await,
"federation.peer-did-changed" => self.handle_federation_peer_did_changed(params).await,
// VPN & Remote Access
"vpn.status" => self.handle_vpn_status().await,
"vpn.configure" => self.handle_vpn_configure(params).await,
"vpn.disconnect" => self.handle_vpn_disconnect().await,
"remote.setup" => self.handle_remote_setup(params).await,
// Marketplace
"marketplace.discover" => self.handle_marketplace_discover().await,
"marketplace.publish" => self.handle_marketplace_publish(params).await,
"marketplace.get-manifest" => self.handle_marketplace_get_manifest(params).await,
"marketplace.list-published" => self.handle_marketplace_list_published().await,
"marketplace.verify" => self.handle_marketplace_verify(params).await,
"marketplace.create-invoice" => self.handle_marketplace_create_invoice(params).await,
"marketplace.check-payment" => self.handle_marketplace_check_payment(params).await,
// Mesh networking (Meshcore LoRa)
"mesh.status" => self.handle_mesh_status().await,
"mesh.peers" => self.handle_mesh_peers().await,
"mesh.messages" => self.handle_mesh_messages(params).await,
"mesh.send" => self.handle_mesh_send(params).await,
"mesh.broadcast" => self.handle_mesh_broadcast().await,
"mesh.configure" => self.handle_mesh_configure(params).await,
"mesh.send-invoice" => self.handle_mesh_send_invoice(params).await,
"mesh.send-coordinate" => self.handle_mesh_send_coordinate(params).await,
"mesh.send-alert" => self.handle_mesh_send_alert(params).await,
"mesh.outbox" => self.handle_mesh_outbox(params).await,
"mesh.session-status" => self.handle_mesh_session_status(params).await,
"mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await,
// Phase 4: Off-grid Bitcoin operations
"mesh.relay-tx" => self.handle_mesh_relay_tx(params).await,
"mesh.relay-status" => self.handle_mesh_relay_status(params).await,
"mesh.block-headers" => self.handle_mesh_block_headers(params).await,
"mesh.relay-lightning" => self.handle_mesh_relay_lightning(params).await,
"mesh.deadman-status" => self.handle_mesh_deadman_status().await,
"mesh.deadman-configure" => self.handle_mesh_deadman_configure(params).await,
"mesh.deadman-checkin" => self.handle_mesh_deadman_checkin().await,
"mesh.test-send" => self.handle_mesh_test_send(params).await,
// Transport layer (unified routing)
"transport.status" => self.handle_transport_status().await,
"transport.peers" => self.handle_transport_peers().await,
"transport.send" => self.handle_transport_send(params).await,
"transport.set-mode" => self.handle_transport_set_mode(params).await,
// Server settings
"server.set-name" => self.handle_server_set_name(params).await,
// System monitoring
"system.stats" => self.handle_system_stats().await,
"system.processes" => self.handle_system_processes().await,
"system.temperature" => self.handle_system_temperature().await,
"system.detect-usb-devices" => self.handle_system_detect_usb_devices().await,
"system.disk-status" => self.handle_system_disk_status().await,
"system.disk-cleanup" => self.handle_system_disk_cleanup().await,
"system.reboot" => self.handle_system_reboot(params).await,
"system.factory-reset" => self.handle_system_factory_reset(params).await,
// Opt-in anonymous analytics
"analytics.get-status" => self.handle_analytics_get_status().await,
"analytics.enable" => self.handle_analytics_enable().await,
"analytics.disable" => self.handle_analytics_disable().await,
"analytics.get-snapshot" => self.handle_analytics_get_snapshot().await,
"telemetry.report" => self.handle_telemetry_report().await,
"telemetry.ingest" => self.handle_telemetry_ingest(params).await,
"telemetry.fleet-status" => self.handle_telemetry_fleet_status().await,
"telemetry.fleet-node-history" => self.handle_telemetry_fleet_node_history(params).await,
"telemetry.fleet-alerts" => self.handle_telemetry_fleet_alerts().await,
// Real-time metrics monitoring
"monitoring.current" => self.handle_monitoring_current().await,
"monitoring.history" => self.handle_monitoring_history(params).await,
"monitoring.containers" => self.handle_monitoring_containers().await,
"monitoring.alerts" => self.handle_monitoring_alerts(params).await,
"monitoring.alert-rules" => self.handle_monitoring_alert_rules().await,
"monitoring.configure-alert" => self.handle_monitoring_configure_alert(params).await,
"monitoring.acknowledge-alert" => self.handle_monitoring_acknowledge_alert(params).await,
"monitoring.export" => self.handle_monitoring_export(params).await,
// System updates
"update.check" => self.handle_update_check().await,
"update.status" => self.handle_update_status().await,
"update.dismiss" => self.handle_update_dismiss().await,
"update.download" => self.handle_update_download().await,
"update.apply" => self.handle_update_apply().await,
"update.rollback" => self.handle_update_rollback().await,
"update.get-schedule" => self.handle_update_get_schedule().await,
"update.set-schedule" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_update_set_schedule(&p).await
}
// Backup & Restore
"backup.create" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_create(&p).await
}
"backup.list" => self.handle_backup_list().await,
"backup.verify" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_verify(&p).await
}
"backup.restore" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_restore(&p).await
}
"backup.restore-identity" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_restore_identity(&p).await
}
"backup.delete" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_delete(&p).await
}
"backup.list-drives" => self.handle_backup_list_drives().await,
"backup.to-usb" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_to_usb(&p).await
}
"backup.upload-s3" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_upload_s3(&p).await
}
"backup.download-s3" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_backup_download_s3(&p).await
}
// Security / secrets
"security.rotate-secrets" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_security_rotate_secrets(&p).await
}
"security.list-expiring" => {
let p = params.unwrap_or(serde_json::json!({}));
self.handle_security_list_expiring(&p).await
}
// Webhooks
"webhook.get-config" => self.handle_webhook_get_config().await,
"webhook.configure" => self.handle_webhook_configure(params).await,
"webhook.test" => self.handle_webhook_test().await,
_ => {
Err(anyhow::anyhow!("Unknown method: {}", method))
}
}
}
pub(super) async fn handle_echo(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
if let Some(p) = params {
if let Some(msg) = p.get("message").and_then(|v| v.as_str()) {
return Ok(serde_json::json!({ "message": msg }));
}
}
Ok(serde_json::json!({ "message": "Hello from Archipelago!" }))
}
pub(super) async fn handle_health(&self) -> Result<serde_json::Value> {
let recovery_complete = crate::crash_recovery::is_recovery_complete();
let uptime = crate::crash_recovery::uptime_seconds();
let status = if recovery_complete { "ok" } else { "degraded" };
Ok(serde_json::json!({
"status": status,
"crash_recovery_complete": recovery_complete,
"uptime_seconds": uptime,
"version": env!("CARGO_PKG_VERSION"),
}))
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,251 @@
use crate::api::rpc::RpcHandler;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tracing::info;
#[derive(Debug, Serialize)]
struct ChannelInfo {
chan_id: String,
remote_pubkey: String,
capacity: i64,
local_balance: i64,
remote_balance: i64,
active: bool,
status: String,
channel_point: String,
}
#[derive(Debug, Serialize)]
struct ChannelListResult {
channels: Vec<ChannelInfo>,
total_inbound: i64,
total_outbound: i64,
}
#[derive(Debug, Deserialize)]
struct LndListChannelsResponse {
channels: Option<Vec<LndChannel>>,
}
#[derive(Debug, Deserialize)]
struct LndChannel {
chan_id: Option<String>,
remote_pubkey: Option<String>,
capacity: Option<String>,
local_balance: Option<String>,
remote_balance: Option<String>,
active: Option<bool>,
channel_point: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
struct LndPendingChannelsResponse {
pending_open_channels: Option<Vec<LndPendingOpenChannel>>,
}
#[derive(Debug, Deserialize)]
struct LndPendingOpenChannel {
channel: Option<LndPendingChannel>,
}
#[derive(Debug, Deserialize)]
struct LndPendingChannel {
remote_node_pub: Option<String>,
capacity: Option<String>,
local_balance: Option<String>,
remote_balance: Option<String>,
channel_point: Option<String>,
}
impl RpcHandler {
pub(in crate::api::rpc) async fn handle_lnd_listchannels(&self) -> Result<serde_json::Value> {
let (client, macaroon_hex) = self.lnd_client().await?;
let channels_resp: LndListChannelsResponse = client
.get("https://127.0.0.1:8080/v1/channels")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
.context("LND REST connection failed")?
.json()
.await
.context("Failed to parse LND channels response")?;
let pending_resp: LndPendingChannelsResponse = match client
.get("https://127.0.0.1:8080/v1/channels/pending")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
{
Ok(resp) => resp.json().await.unwrap_or_default(),
Err(_) => LndPendingChannelsResponse::default(),
};
let channels: Vec<ChannelInfo> = channels_resp
.channels
.unwrap_or_default()
.into_iter()
.map(|ch| {
let capacity: i64 = ch.capacity.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
let local: i64 = ch.local_balance.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
let remote: i64 = ch.remote_balance.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
ChannelInfo {
chan_id: ch.chan_id.unwrap_or_default(),
remote_pubkey: ch.remote_pubkey.unwrap_or_default(),
capacity,
local_balance: local,
remote_balance: remote,
active: ch.active.unwrap_or(false),
status: if ch.active.unwrap_or(false) { "active".into() } else { "inactive".into() },
channel_point: ch.channel_point.unwrap_or_default(),
}
})
.collect();
let mut pending_channels: Vec<ChannelInfo> = Vec::new();
for pch in pending_resp.pending_open_channels.unwrap_or_default() {
if let Some(ch) = pch.channel {
let capacity: i64 = ch.capacity.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
let local: i64 = ch.local_balance.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
let remote: i64 = ch.remote_balance.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0);
pending_channels.push(ChannelInfo {
chan_id: String::new(),
remote_pubkey: ch.remote_node_pub.unwrap_or_default(),
capacity,
local_balance: local,
remote_balance: remote,
active: false,
status: "pending_open".into(),
channel_point: ch.channel_point.unwrap_or_default(),
});
}
}
let total_local: i64 = channels.iter().map(|c| c.local_balance).sum();
let total_remote: i64 = channels.iter().map(|c| c.remote_balance).sum();
let mut all_channels = channels;
all_channels.extend(pending_channels);
let result = ChannelListResult {
channels: all_channels,
total_inbound: total_remote,
total_outbound: total_local,
};
Ok(serde_json::to_value(result)?)
}
pub(in crate::api::rpc) async fn handle_lnd_openchannel(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.unwrap_or_default();
let pubkey = params.get("pubkey")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'pubkey' parameter"))?;
let amount = params.get("amount")
.and_then(|v| v.as_i64())
.ok_or_else(|| anyhow::anyhow!("Missing 'amount' parameter (sats)"))?;
// Validate pubkey: must be 66-char hex (compressed secp256k1)
if pubkey.len() != 66 || !pubkey.chars().all(|c| c.is_ascii_hexdigit()) {
return Err(anyhow::anyhow!("Invalid pubkey: must be 66-character hex string"));
}
if amount < 20000 {
return Err(anyhow::anyhow!("Channel amount must be at least 20,000 sats"));
}
if amount > 16_777_215 {
return Err(anyhow::anyhow!("Channel amount exceeds maximum (16,777,215 sats)"));
}
info!(peer = pubkey, amount = amount, "Opening Lightning channel");
let (client, macaroon_hex) = self.lnd_client().await?;
// First connect to the peer if an address is provided
if let Some(addr) = params.get("address").and_then(|v| v.as_str()) {
// Validate peer address format (host:port)
if addr.len() > 256 || addr.contains('\0') || addr.contains(' ') {
return Err(anyhow::anyhow!("Invalid peer address format"));
}
let connect_body = serde_json::json!({
"addr": { "pubkey": pubkey, "host": addr },
"perm": true
});
let _ = client
.post("https://127.0.0.1:8080/v1/peers")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&connect_body)
.send()
.await;
}
let open_body = serde_json::json!({
"node_pubkey_string": pubkey,
"local_funding_amount": amount.to_string(),
});
let resp = client
.post("https://127.0.0.1:8080/v1/channels")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&open_body)
.send()
.await
.context("Failed to open channel")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await.context("Failed to parse open channel response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to open channel: {}", msg));
}
Ok(body)
}
pub(in crate::api::rpc) async fn handle_lnd_closechannel(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.unwrap_or_default();
let channel_point = params.get("channel_point")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'channel_point' parameter (txid:output_index)"))?;
let parts: Vec<&str> = channel_point.split(':').collect();
if parts.len() != 2 {
return Err(anyhow::anyhow!("Invalid channel_point format. Expected 'txid:output_index'"));
}
// Validate txid is 64-char hex and output_index is numeric
if parts[0].len() != 64 || !parts[0].chars().all(|c| c.is_ascii_hexdigit()) {
return Err(anyhow::anyhow!("Invalid txid in channel_point: must be 64-character hex"));
}
if parts[1].parse::<u32>().is_err() {
return Err(anyhow::anyhow!("Invalid output_index in channel_point: must be a number"));
}
let force = params.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
info!(channel_point = channel_point, force = force, "Closing Lightning channel");
let (client, macaroon_hex) = self.lnd_client().await?;
let url = format!(
"https://127.0.0.1:8080/v1/channels/{}/{}?force={}",
parts[0], parts[1], force
);
let resp = client
.delete(&url)
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
.context("Failed to close channel")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await.context("Failed to parse close channel response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to close channel: {}", msg));
}
Ok(serde_json::json!({ "success": true }))
}
}

View File

@@ -0,0 +1,228 @@
use crate::api::rpc::RpcHandler;
use anyhow::{Context, Result};
use base64::Engine;
use serde::{Deserialize, Serialize};
use super::{LndAmount, LndBalanceResponse};
#[derive(Debug, Serialize)]
struct LndInfo {
alias: String,
num_active_channels: u32,
num_peers: u32,
synced_to_chain: bool,
block_height: u64,
balance_sats: i64,
channel_balance_sats: i64,
pending_open_balance: i64,
}
#[derive(Debug, Deserialize)]
struct LndGetInfoResponse {
alias: Option<String>,
num_active_channels: Option<u32>,
num_peers: Option<u32>,
synced_to_chain: Option<bool>,
block_height: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct LndChannelBalanceResponse {
local_balance: Option<LndAmount>,
pending_open_local_balance: Option<LndAmount>,
}
impl RpcHandler {
pub(in crate::api::rpc) async fn handle_lnd_getinfo(&self) -> Result<serde_json::Value> {
let macaroon_path =
"/var/lib/archipelago/lnd/data/chain/bitcoin/mainnet/admin.macaroon";
let macaroon_bytes = tokio::fs::read(macaroon_path)
.await
.context("Failed to read LND admin macaroon — is LND installed?")?;
let macaroon_hex = hex::encode(&macaroon_bytes);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.danger_accept_invalid_certs(true)
.build()
.context("Failed to create HTTP client")?;
let get_info: LndGetInfoResponse = client
.get("https://127.0.0.1:8080/v1/getinfo")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
.context("LND REST connection failed")?
.json()
.await
.context("Failed to parse LND getinfo response")?;
let channel_balance: LndChannelBalanceResponse = match client
.get("https://127.0.0.1:8080/v1/balance/channels")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
{
Ok(resp) => resp.json().await.unwrap_or(LndChannelBalanceResponse {
local_balance: None,
pending_open_local_balance: None,
}),
Err(_) => LndChannelBalanceResponse {
local_balance: None,
pending_open_local_balance: None,
},
};
let wallet_balance: LndBalanceResponse = match client
.get("https://127.0.0.1:8080/v1/balance/blockchain")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
{
Ok(resp) => resp.json().await.unwrap_or(LndBalanceResponse {
total_balance: None,
}),
Err(_) => LndBalanceResponse {
total_balance: None,
},
};
let info = LndInfo {
alias: get_info.alias.unwrap_or_default(),
num_active_channels: get_info.num_active_channels.unwrap_or(0),
num_peers: get_info.num_peers.unwrap_or(0),
synced_to_chain: get_info.synced_to_chain.unwrap_or(false),
block_height: get_info.block_height.unwrap_or(0),
balance_sats: wallet_balance
.total_balance
.and_then(|s| s.parse().ok())
.unwrap_or(0),
channel_balance_sats: channel_balance
.local_balance
.and_then(|a| a.sat.and_then(|s| s.parse().ok()))
.unwrap_or(0),
pending_open_balance: channel_balance
.pending_open_local_balance
.and_then(|a| a.sat.and_then(|s| s.parse().ok()))
.unwrap_or(0),
};
Ok(serde_json::to_value(info)?)
}
/// Return LND connection info: base64url-encoded TLS cert and admin macaroon
/// for building lndconnect:// URIs in the frontend.
pub(crate) async fn handle_lnd_connect_info(&self) -> Result<serde_json::Value> {
let cert_path = "/var/lib/archipelago/lnd/tls.cert";
let macaroon_path =
"/var/lib/archipelago/lnd/data/chain/bitcoin/mainnet/admin.macaroon";
// Read and encode TLS cert (PEM -> DER -> base64url)
let cert_pem = tokio::fs::read_to_string(cert_path)
.await
.context("Failed to read LND TLS certificate")?;
let cert_der_b64: String = cert_pem
.lines()
.filter(|l| !l.starts_with("-----"))
.collect();
let cert_der = base64::engine::general_purpose::STANDARD
.decode(&cert_der_b64)
.context("Failed to decode PEM base64")?;
let cert_b64url = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&cert_der);
// Read and encode macaroon (binary -> base64url)
let macaroon_bytes = tokio::fs::read(macaroon_path)
.await
.context("Failed to read LND admin macaroon")?;
let macaroon_b64url =
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&macaroon_bytes);
// Read Tor onion address -- check system Tor path first, then legacy
let tor_onion = {
let mut onion = None;
for path in &[
"/var/lib/archipelago/tor-hostnames/lnd",
"/var/lib/tor/hidden_service_lnd/hostname",
"/var/lib/archipelago/tor/hidden_service_lnd/hostname",
] {
if let Ok(addr) = tokio::fs::read_to_string(path).await {
let addr = addr.trim().to_string();
if addr.ends_with(".onion") {
onion = Some(addr);
break;
}
}
// Try sudo for system Tor dirs (owned by debian-tor, 0700)
if let Ok(output) = tokio::process::Command::new("sudo")
.args(["cat", path])
.output()
.await
{
if output.status.success() {
let addr = String::from_utf8_lossy(&output.stdout).trim().to_string();
if addr.ends_with(".onion") {
onion = Some(addr);
break;
}
}
}
}
onion
};
Ok(serde_json::json!({
"cert_base64url": cert_b64url,
"macaroon_base64url": macaroon_b64url,
"tor_onion": tor_onion,
"rest_port": 8080,
"grpc_port": 10009,
}))
}
/// lnd.export-channel-backup -- Export all channel static backups (SCB).
/// Returns base64-encoded multi-channel backup that can restore channels on a new node.
pub(in crate::api::rpc) async fn handle_lnd_export_channel_backup(&self) -> Result<serde_json::Value> {
let macaroon_path = "/var/lib/archipelago/lnd/data/chain/bitcoin/mainnet/admin.macaroon";
let macaroon_bytes = tokio::fs::read(macaroon_path)
.await
.context("Failed to read LND admin macaroon")?;
let macaroon_hex = hex::encode(&macaroon_bytes);
let client = reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.timeout(std::time::Duration::from_secs(10))
.build()
.context("Failed to build HTTP client")?;
let resp = client
.get("https://127.0.0.1:8080/v1/channels/backup")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
.context("Failed to reach LND REST API")?;
if !resp.status().is_success() {
anyhow::bail!("LND returned {}", resp.status());
}
let data: serde_json::Value = resp.json().await.context("Invalid JSON from LND")?;
// Extract the multi_chan_backup bytes
let backup_b64 = data
.get("multi_chan_backup")
.and_then(|m| m.get("multi_chan_backup"))
.and_then(|b| b.as_str())
.unwrap_or("");
Ok(serde_json::json!({
"backup": backup_b64,
"channel_count": data.get("multi_chan_backup")
.and_then(|m| m.get("chan_points"))
.and_then(|c| c.as_array())
.map(|a| a.len())
.unwrap_or(0),
"timestamp": chrono::Utc::now().to_rfc3339(),
}))
}
}

View File

@@ -0,0 +1,38 @@
mod channels;
mod info;
mod payments;
mod wallet;
use crate::api::rpc::RpcHandler;
use anyhow::{Context, Result};
// Shared LND response types used by multiple submodules
#[derive(Debug, serde::Deserialize)]
pub(super) struct LndBalanceResponse {
pub total_balance: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
pub(super) struct LndAmount {
pub sat: Option<String>,
}
impl RpcHandler {
/// Helper: create an authenticated LND REST client.
/// Returns an HTTP client configured for LND's self-signed TLS and the
/// hex-encoded admin macaroon for request headers.
pub(crate) async fn lnd_client(&self) -> Result<(reqwest::Client, String)> {
let macaroon_path =
"/var/lib/archipelago/lnd/data/chain/bitcoin/mainnet/admin.macaroon";
let macaroon_bytes = tokio::fs::read(macaroon_path)
.await
.context("Failed to read LND admin macaroon — is LND installed?")?;
let macaroon_hex = hex::encode(&macaroon_bytes);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(15))
.danger_accept_invalid_certs(true)
.build()
.context("Failed to create HTTP client")?;
Ok((client, macaroon_hex))
}
}

View File

@@ -0,0 +1,191 @@
use crate::api::rpc::RpcHandler;
use anyhow::{Context, Result};
use tracing::info;
impl RpcHandler {
/// Pay a Lightning invoice.
pub(in crate::api::rpc) async fn handle_lnd_payinvoice(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.unwrap_or_default();
let payment_request = params.get("payment_request")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'payment_request' parameter"))?;
// Basic validation: Lightning invoices start with lnbc/lntb/lnbcrt
if payment_request.len() < 10 || payment_request.len() > 2048 {
return Err(anyhow::anyhow!("Invalid payment request length"));
}
let lower = payment_request.to_lowercase();
if !lower.starts_with("lnbc") && !lower.starts_with("lntb") && !lower.starts_with("lnbcrt") {
return Err(anyhow::anyhow!("Invalid payment request: must be a Lightning invoice (lnbc...)"));
}
info!("Paying Lightning invoice");
let (client, macaroon_hex) = self.lnd_client().await?;
let pay_body = serde_json::json!({
"payment_request": payment_request,
});
let resp = client
.post("https://127.0.0.1:8080/v1/channels/transactions")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&pay_body)
.send()
.await
.context("Failed to pay invoice")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse payment response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Payment failed: {}", msg));
}
let payment_error = body.get("payment_error").and_then(|v| v.as_str()).unwrap_or("");
if !payment_error.is_empty() {
return Err(anyhow::anyhow!("Payment failed: {}", payment_error));
}
let amount_sat = body.get("payment_route")
.and_then(|r| r.get("total_amt"))
.and_then(|v| v.as_str())
.and_then(|s| s.parse::<i64>().ok())
.unwrap_or(0);
let payment_hash = body.get("payment_hash")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
Ok(serde_json::json!({
"payment_hash": payment_hash,
"amount_sats": amount_sat,
}))
}
/// List on-chain transactions from LND.
/// Returns all transactions, with incoming (amount > 0) flagged.
pub(in crate::api::rpc) async fn handle_lnd_gettransactions(&self) -> Result<serde_json::Value> {
let (client, macaroon_hex) = self.lnd_client().await?;
let resp = client
.get("https://127.0.0.1:8080/v1/transactions")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
.context("LND REST connection failed")?;
let status = resp.status();
let body: serde_json::Value = resp
.json()
.await
.context("Failed to parse transactions response")?;
if !status.is_success() {
let msg = body
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to list transactions: {}", msg));
}
let empty_vec = vec![];
let raw_txs = body
.get("transactions")
.and_then(|v| v.as_array())
.unwrap_or(&empty_vec);
let mut transactions: Vec<serde_json::Value> = Vec::new();
for tx in raw_txs {
let amount: i64 = tx
.get("amount")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.or_else(|| tx.get("amount").and_then(|v| v.as_i64()))
.unwrap_or(0);
let num_confirmations: i64 = tx
.get("num_confirmations")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let tx_hash = tx
.get("tx_hash")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let time_stamp: i64 = tx
.get("time_stamp")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.or_else(|| tx.get("time_stamp").and_then(|v| v.as_i64()))
.unwrap_or(0);
let total_fees: i64 = tx
.get("total_fees")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.or_else(|| tx.get("total_fees").and_then(|v| v.as_i64()))
.unwrap_or(0);
let dest_addresses: Vec<String> = tx
.get("dest_addresses")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|a| a.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let label = tx
.get("label")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let block_height: i64 = tx
.get("block_height")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let direction = if amount > 0 { "incoming" } else { "outgoing" };
transactions.push(serde_json::json!({
"tx_hash": tx_hash,
"amount_sats": amount.abs(),
"direction": direction,
"num_confirmations": num_confirmations,
"time_stamp": time_stamp,
"total_fees": total_fees,
"dest_addresses": dest_addresses,
"label": label,
"block_height": block_height,
}));
}
// Sort by timestamp descending (most recent first)
transactions.sort_by(|a, b| {
let ta = a.get("time_stamp").and_then(|v| v.as_i64()).unwrap_or(0);
let tb = b.get("time_stamp").and_then(|v| v.as_i64()).unwrap_or(0);
tb.cmp(&ta)
});
let incoming_pending: usize = transactions
.iter()
.filter(|t| {
t.get("direction").and_then(|v| v.as_str()) == Some("incoming")
&& t.get("num_confirmations").and_then(|v| v.as_i64()) == Some(0)
})
.count();
Ok(serde_json::json!({
"transactions": transactions,
"incoming_pending_count": incoming_pending,
}))
}
}

View File

@@ -0,0 +1,384 @@
use crate::api::rpc::RpcHandler;
use anyhow::{Context, Result};
use base64::Engine;
use tracing::info;
impl RpcHandler {
/// Generate a new on-chain Bitcoin address.
pub(in crate::api::rpc) async fn handle_lnd_newaddress(&self) -> Result<serde_json::Value> {
let (client, macaroon_hex) = self.lnd_client().await?;
let resp = client
.get("https://127.0.0.1:8080/v1/newaddress")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.send()
.await
.context("LND REST connection failed")?;
let body: serde_json::Value = resp.json().await
.context("Failed to parse newaddress response")?;
let address = body.get("address")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
Ok(serde_json::json!({ "address": address }))
}
/// Send on-chain Bitcoin to an address.
pub(in crate::api::rpc) async fn handle_lnd_sendcoins(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.unwrap_or_default();
let addr = params.get("addr")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'addr' parameter"))?;
let amount = params.get("amount")
.and_then(|v| v.as_i64())
.ok_or_else(|| anyhow::anyhow!("Missing 'amount' parameter (sats)"))?;
if amount < 546 {
return Err(anyhow::anyhow!("Amount must be at least 546 sats (dust limit)"));
}
if amount > 21_000_000 * 100_000_000 {
return Err(anyhow::anyhow!("Amount exceeds maximum Bitcoin supply"));
}
// Validate Bitcoin address format (basic: length and allowed chars)
if addr.len() < 14 || addr.len() > 90 || !addr.chars().all(|c| c.is_ascii_alphanumeric()) {
return Err(anyhow::anyhow!("Invalid Bitcoin address format"));
}
info!(addr = addr, amount = amount, "Sending on-chain Bitcoin");
let (client, macaroon_hex) = self.lnd_client().await?;
let send_body = serde_json::json!({
"addr": addr,
"amount": amount.to_string(),
});
let resp = client
.post("https://127.0.0.1:8080/v1/transactions")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&send_body)
.send()
.await
.context("Failed to send on-chain transaction")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse send response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to send: {}", msg));
}
let txid = body.get("txid").and_then(|v| v.as_str()).unwrap_or("").to_string();
Ok(serde_json::json!({ "txid": txid }))
}
/// Create a Lightning invoice.
pub(in crate::api::rpc) async fn handle_lnd_createinvoice(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.unwrap_or_default();
let amount_sats = params.get("amount_sats")
.and_then(|v| v.as_i64())
.ok_or_else(|| anyhow::anyhow!("Missing 'amount_sats' parameter"))?;
let memo = params.get("memo")
.and_then(|v| v.as_str())
.unwrap_or("");
if amount_sats < 0 {
return Err(anyhow::anyhow!("Amount must be non-negative"));
}
if amount_sats > 21_000_000 * 100_000_000 {
return Err(anyhow::anyhow!("Amount exceeds maximum Bitcoin supply"));
}
// Limit memo length to prevent abuse
if memo.len() > 639 {
return Err(anyhow::anyhow!("Memo too long (max 639 bytes)"));
}
info!(amount_sats = amount_sats, "Creating Lightning invoice");
let (client, macaroon_hex) = self.lnd_client().await?;
let invoice_body = serde_json::json!({
"value": amount_sats.to_string(),
"memo": memo,
});
let resp = client
.post("https://127.0.0.1:8080/v1/invoices")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&invoice_body)
.send()
.await
.context("Failed to create invoice")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse invoice response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to create invoice: {}", msg));
}
let payment_request = body.get("payment_request")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
Ok(serde_json::json!({
"payment_request": payment_request,
"amount_sats": amount_sats,
}))
}
/// Create an unsigned PSBT for hardware wallet signing.
/// Uses LND's WalletKit.FundPsbt to select UTXOs and create a PSBT template.
pub(in crate::api::rpc) async fn handle_lnd_create_psbt(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let outputs = params.get("outputs")
.and_then(|v| v.as_array())
.ok_or_else(|| anyhow::anyhow!("Missing 'outputs' array (each: address + amount_sats)"))?;
if outputs.is_empty() {
return Err(anyhow::anyhow!("outputs must not be empty"));
}
// Build the outputs map for LND: { "address": "amount_sats_as_string" }
let mut lnd_outputs: serde_json::Map<String, serde_json::Value> = serde_json::Map::new();
let mut total_amount: i64 = 0;
for output in outputs {
let addr = output.get("address")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Each output must have an 'address'"))?;
// Validate Bitcoin address format
if addr.len() < 14 || addr.len() > 90 || !addr.chars().all(|c| c.is_ascii_alphanumeric()) {
return Err(anyhow::anyhow!("Invalid Bitcoin address format in output"));
}
let amount = output.get("amount_sats")
.and_then(|v| v.as_i64())
.ok_or_else(|| anyhow::anyhow!("Each output must have 'amount_sats'"))?;
if amount < 546 {
return Err(anyhow::anyhow!("Amount must be at least 546 sats (dust limit)"));
}
lnd_outputs.insert(addr.to_string(), serde_json::json!(amount));
total_amount += amount;
}
let sat_per_vbyte = params.get("fee_rate_sat_per_vbyte")
.and_then(|v| v.as_u64())
.unwrap_or(10);
info!(total_amount = total_amount, fee_rate = sat_per_vbyte, "Creating PSBT for hardware wallet signing");
let (client, macaroon_hex) = self.lnd_client().await?;
let fund_body = serde_json::json!({
"raw": {
"outputs": lnd_outputs,
},
"sat_per_vbyte": sat_per_vbyte,
"spend_unconfirmed": false,
});
let resp = client
.post("https://127.0.0.1:8080/v2/wallet/psbt/fund")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&fund_body)
.send()
.await
.context("Failed to create PSBT via LND")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse PSBT response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to create PSBT: {}", msg));
}
let funded_psbt = body.get("funded_psbt")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let change_output_index = body.get("change_output_index")
.and_then(|v| v.as_i64())
.unwrap_or(-1);
Ok(serde_json::json!({
"psbt_base64": funded_psbt,
"change_output_index": change_output_index,
"total_amount_sats": total_amount,
"fee_rate_sat_per_vbyte": sat_per_vbyte,
}))
}
/// Finalize a signed PSBT and broadcast the transaction.
/// Takes a PSBT that has been signed by a hardware wallet.
pub(in crate::api::rpc) async fn handle_lnd_finalize_psbt(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let signed_psbt = params.get("signed_psbt_base64")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'signed_psbt_base64'"))?;
info!("Finalizing signed PSBT from hardware wallet");
let (client, macaroon_hex) = self.lnd_client().await?;
let finalize_body = serde_json::json!({
"funded_psbt": signed_psbt,
});
let resp = client
.post("https://127.0.0.1:8080/v2/wallet/psbt/finalize")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&finalize_body)
.send()
.await
.context("Failed to finalize PSBT via LND")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse finalize response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to finalize PSBT: {}", msg));
}
let raw_final_tx = body.get("raw_final_tx")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
// Broadcast the finalized transaction
let publish_body = serde_json::json!({
"tx_hex": raw_final_tx,
});
let pub_resp = client
.post("https://127.0.0.1:8080/v2/wallet/tx")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&publish_body)
.send()
.await
.context("Failed to broadcast transaction")?;
let pub_status = pub_resp.status();
let pub_body: serde_json::Value = pub_resp.json().await
.context("Failed to parse broadcast response")?;
if !pub_status.is_success() {
let msg = pub_body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Transaction broadcast failed: {}", msg));
}
Ok(serde_json::json!({
"raw_final_tx": raw_final_tx,
"broadcast": true,
}))
}
/// Create a signed raw transaction WITHOUT broadcasting.
/// Used for mesh relay: create the TX locally, then relay the hex to an
/// internet-connected peer who broadcasts it.
pub(in crate::api::rpc) async fn handle_lnd_create_raw_tx(&self, params: Option<serde_json::Value>) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let addr = params.get("addr")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'addr'"))?;
let amount_sats = params.get("amount_sats")
.and_then(|v| v.as_u64())
.ok_or_else(|| anyhow::anyhow!("Missing 'amount_sats'"))?;
if amount_sats < 546 {
anyhow::bail!("Amount must be at least 546 sats (dust limit)");
}
if amount_sats > 2_100_000_000_000_000 {
anyhow::bail!("Amount exceeds 21M BTC");
}
let (client, macaroon_hex) = self.lnd_client().await?;
// Step 1: Fund a PSBT with the desired output
let fee_rate = params.get("fee_rate").and_then(|v| v.as_u64()).unwrap_or(5);
let fund_body = serde_json::json!({
"raw": {
"outputs": { addr: amount_sats }
},
"sat_per_vbyte": fee_rate,
"spend_unconfirmed": false,
});
let resp = client
.post("https://127.0.0.1:8080/v2/wallet/psbt/fund")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&fund_body)
.send()
.await
.context("Failed to fund PSBT via LND")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse fund response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to create TX: {}", msg));
}
let funded_psbt = body.get("funded_psbt")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("No funded_psbt in response"))?;
// Step 2: Finalize (LND auto-signs with hot wallet keys)
let finalize_body = serde_json::json!({
"funded_psbt": funded_psbt,
});
let resp = client
.post("https://127.0.0.1:8080/v2/wallet/psbt/finalize")
.header("Grpc-Metadata-macaroon", &macaroon_hex)
.json(&finalize_body)
.send()
.await
.context("Failed to finalize PSBT")?;
let status = resp.status();
let body: serde_json::Value = resp.json().await
.context("Failed to parse finalize response")?;
if !status.is_success() {
let msg = body.get("message").and_then(|v| v.as_str()).unwrap_or("Unknown error");
return Err(anyhow::anyhow!("Failed to sign TX: {}", msg));
}
// raw_final_tx from LND is base64-encoded -- decode to hex for Bitcoin RPC
let raw_final_tx_b64 = body.get("raw_final_tx")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("No raw_final_tx in response"))?;
let tx_bytes = base64::engine::general_purpose::STANDARD
.decode(raw_final_tx_b64)
.context("Failed to decode raw_final_tx base64")?;
let raw_tx_hex = hex::encode(&tx_bytes);
info!(addr, amount_sats, tx_len = raw_tx_hex.len(), "Created raw TX for mesh relay (NOT broadcast)");
Ok(serde_json::json!({
"raw_tx_hex": raw_tx_hex,
"amount_sats": amount_sats,
"addr": addr,
"broadcast": false,
}))
}
}

View File

@@ -0,0 +1,107 @@
use crate::session::SessionStore;
use std::net::IpAddr;
/// Methods that do not require a valid session cookie.
pub(super) const UNAUTHENTICATED_METHODS: &[&str] = &[
"auth.login",
"auth.login.totp",
"auth.login.backup",
"auth.isOnboardingComplete",
"auth.isSetup",
"health",
// Onboarding restore (before user account exists)
"backup.restore-identity",
// Inter-node RPC: called by federated peers over Tor, no session cookies
"federation.peer-joined",
"federation.peer-address-changed",
"federation.peer-did-changed",
"federation.get-state",
// Fleet telemetry ingest: called by remote nodes posting reports
"telemetry.ingest",
];
/// Methods whose responses can be cached for a few seconds.
pub(super) const CACHEABLE_METHODS: &[&str] = &[
"system.stats",
"federation.list-nodes",
];
/// Sanitize error messages before returning to clients.
/// Keeps user-facing validation errors but strips internal system details.
pub(super) fn sanitize_error_message(msg: &str) -> String {
// Allow known validation errors through (these are user-actionable)
let user_facing_prefixes = [
"Invalid",
"Missing",
"Not found",
"Already exists",
"Rate limit",
"Unauthorized",
"Forbidden",
"Not supported",
"requires",
"must be",
"cannot",
"Password",
"Session",
];
for prefix in &user_facing_prefixes {
if msg.starts_with(prefix) {
// Truncate long messages and strip file paths
let sanitized = msg.replace("/var/lib/archipelago/", "[data]/")
.replace("/usr/local/bin/", "[bin]/")
.replace("/etc/", "[config]/");
return if sanitized.len() > 200 {
format!("{}...", &sanitized[..200])
} else {
sanitized
};
}
}
// For all other errors, return a generic message
"Operation failed. Check server logs for details.".to_string()
}
/// Derive a CSRF token from the session token via HMAC.
/// Deterministic: same session token always produces the same CSRF token.
/// Survives backend restarts because it depends only on the session token
/// and the on-disk remember secret (not ephemeral state).
pub(super) async fn derive_csrf_token(session_token: &str) -> String {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let secret = SessionStore::load_or_create_remember_secret().await;
let mut mac = HmacSha256::new_from_slice(&secret).expect("HMAC key");
mac.update(format!("csrf:{}", session_token).as_bytes());
hex::encode(mac.finalize().into_bytes())
}
/// Extract a named cookie value from headers.
pub(super) fn extract_cookie(headers: &hyper::HeaderMap, name: &str) -> Option<String> {
let prefix = format!("{}=", name);
for value in headers.get_all("cookie") {
if let Ok(s) = value.to_str() {
for part in s.split(';') {
let part = part.trim();
if let Some(val) = part.strip_prefix(&prefix) {
let val = val.trim();
if !val.is_empty() {
return Some(val.to_string());
}
}
}
}
}
None
}
/// Extract the client IP from request headers (X-Real-IP or X-Forwarded-For).
pub(super) fn extract_client_ip(headers: &hyper::HeaderMap) -> IpAddr {
headers
.get("x-real-ip")
.or_else(|| headers.get("x-forwarded-for"))
.and_then(|v| v.to_str().ok())
.and_then(|s| s.split(',').next())
.and_then(|s| s.trim().parse::<IpAddr>().ok())
.unwrap_or(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST))
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,692 @@
use super::validation::validate_app_id;
use crate::port_allocator::PortAllocator;
use anyhow::{Context, Result};
/// Trusted Docker registries. Only images from these sources are allowed.
#[allow(dead_code)]
pub(super) const TRUSTED_REGISTRIES: &[&str] = &["docker.io/", "ghcr.io/", "localhost/"];
/// Detect which Bitcoin container is running on archy-net for DNS resolution.
/// Returns the container name to use as the RPC host (e.g., "bitcoin-knots").
pub(super) fn detect_bitcoin_container_name() -> String {
// Synchronous check — called from get_app_config which is sync
let output = std::process::Command::new("podman")
.args(["ps", "--format", "{{.Names}}"])
.output();
if let Ok(out) = output {
let names = String::from_utf8_lossy(&out.stdout);
for candidate in &["bitcoin-knots", "bitcoin-core", "bitcoin"] {
if names.lines().any(|l| l.trim() == *candidate) {
return candidate.to_string();
}
}
}
// Default to bitcoin-knots (most common)
"bitcoin-knots".to_string()
}
/// Validate Docker image against trusted registry allowlist.
pub(super) fn is_valid_docker_image(image: &str) -> bool {
if image.is_empty() || image.len() > 256 {
return false;
}
// Reject shell metacharacters
let dangerous_chars = ['&', '|', ';', '`', '$', '(', ')', '<', '>', '\n', '\r'];
if image.chars().any(|c| dangerous_chars.contains(&c)) {
return false;
}
// Must come from a trusted registry — match the exact domain, not just prefix
let registry = match image.split('/').next() {
Some(r) => r,
None => return false,
};
matches!(registry, "docker.io" | "ghcr.io" | "localhost")
}
/// Per-app Linux capabilities needed beyond the default cap-drop=ALL.
/// Most apps need CHOWN/SETUID/SETGID for internal user switching.
pub(super) fn get_app_capabilities(app_id: &str) -> Vec<String> {
match app_id {
// Apps that need user switching and file ownership changes
"nextcloud" | "homeassistant" | "home-assistant" | "btcpay-server" | "btcpayserver"
| "jellyfin" | "onlyoffice" | "onlyoffice-documentserver" | "portainer" => vec![
"--cap-add=CHOWN".to_string(),
"--cap-add=SETUID".to_string(),
"--cap-add=SETGID".to_string(),
"--cap-add=DAC_OVERRIDE".to_string(),
],
// Nginx Proxy Manager needs to bind low ports
"nginx-proxy-manager" => vec![
"--cap-add=CHOWN".to_string(),
"--cap-add=SETUID".to_string(),
"--cap-add=SETGID".to_string(),
"--cap-add=NET_BIND_SERVICE".to_string(),
],
// Bitcoin and Lightning need file ownership ops + DAC_OVERRIDE for data dir access
"bitcoin" | "bitcoin-core" | "bitcoin-knots" | "lnd" | "fedimint"
| "fedimint-gateway" => vec![
"--cap-add=CHOWN".to_string(),
"--cap-add=FOWNER".to_string(),
"--cap-add=SETUID".to_string(),
"--cap-add=SETGID".to_string(),
"--cap-add=DAC_OVERRIDE".to_string(),
],
// Vaultwarden needs file ownership + NET_BIND_SERVICE (binds port 80 internally)
"vaultwarden" => vec![
"--cap-add=CHOWN".to_string(),
"--cap-add=SETUID".to_string(),
"--cap-add=SETGID".to_string(),
"--cap-add=NET_BIND_SERVICE".to_string(),
],
// PhotoPrism uses s6-overlay which needs privilege ops
"photoprism" => vec![
"--cap-add=CHOWN".to_string(),
"--cap-add=SETUID".to_string(),
"--cap-add=SETGID".to_string(),
],
// Grafana runs as specific UID (472)
"grafana" => vec![
"--cap-add=CHOWN".to_string(),
"--cap-add=SETUID".to_string(),
"--cap-add=SETGID".to_string(),
],
// Uptime-kuma startup script needs chown/fowner for /app/data ownership
"uptime-kuma" => vec![
"--cap-add=CHOWN".to_string(),
"--cap-add=FOWNER".to_string(),
"--cap-add=SETUID".to_string(),
"--cap-add=SETGID".to_string(),
],
// Minimal apps (searxng, filebrowser, etc.) need no extra caps
_ => vec![],
}
}
/// Apps safe to run with --read-only root filesystem.
/// These work correctly with volume mounts + tmpfs for /tmp and /run.
pub(super) fn is_readonly_compatible(app_id: &str) -> bool {
matches!(
app_id,
"searxng"
| "grafana"
| "filebrowser"
| "electrumx"
| "mempool-electrs"
| "electrs"
| "nostr-rs-relay"
| "ollama"
| "indeedhub"
)
}
/// Get container health check arguments for podman run.
/// Returns (health-cmd, interval, retries) args to append to run_args.
pub(super) fn get_health_check_args(app_id: &str, rpc_pass: &str) -> Vec<String> {
let btc_health = format!(
"bitcoin-cli -rpcuser=archipelago -rpcpassword={} getblockchaininfo || exit 1",
rpc_pass
);
let (cmd, interval, retries) = match app_id {
"bitcoin" | "bitcoin-core" | "bitcoin-knots" => (btc_health.as_str(), "30s", "3"),
"lnd" => ("lncli getinfo || exit 1", "30s", "3"),
"btcpay-server" | "btcpayserver" => {
("curl -sf http://localhost:49392/ || exit 1", "30s", "3")
}
"mempool-api" => (
"curl -sf http://localhost:8999/api/v1/backend-info || exit 1",
"30s",
"3",
),
"mempool" | "mempool-web" | "archy-mempool-web" => {
("curl -sf http://localhost:8080/ || exit 1", "30s", "3")
}
"electrumx" | "mempool-electrs" | "electrs" => {
("curl -sf http://localhost:8000/ || exit 1", "60s", "3")
}
"nextcloud" => (
"curl -sf http://localhost:80/status.php || exit 1",
"30s",
"3",
),
"homeassistant" | "home-assistant" => (
"curl -sf http://localhost:8123/api/ || exit 1",
"30s",
"3",
),
"grafana" => (
"curl -sf http://localhost:3000/api/health || exit 1",
"30s",
"3",
),
"jellyfin" => (
"curl -sf http://localhost:8096/health || exit 1",
"30s",
"3",
),
"vaultwarden" => ("curl -sf http://localhost:80/alive || exit 1", "30s", "3"),
"uptime-kuma" => ("curl -sf http://localhost:3001/ || exit 1", "30s", "3"),
"filebrowser" => (
"curl -sf http://localhost:80/health || exit 1",
"30s",
"3",
),
"searxng" => ("curl -sf http://localhost:8080/ || exit 1", "30s", "3"),
"photoprism" => (
"curl -sf http://localhost:2342/api/v1/status || exit 1",
"60s",
"3",
),
"immich_server" | "immich" => (
"curl -sf http://localhost:2283/api/server/ping || exit 1",
"30s",
"3",
),
"dwn" => (
"curl -sf http://localhost:3000/health || exit 1",
"30s",
"3",
),
"portainer" => (
"curl -sf http://localhost:9000/api/status || exit 1",
"30s",
"3",
),
"ollama" => ("curl -sf http://localhost:11434/ || exit 1", "30s", "3"),
"fedimint" => (
"curl -sf http://localhost:8174/health || exit 1",
"60s",
"3",
),
"nostr-rs-relay" | "nostr-relay" => {
("curl -sf http://localhost:8080/ || exit 1", "30s", "3")
}
"nginx-proxy-manager" => (
"curl -sf http://localhost:81/api/ || exit 1",
"30s",
"3",
),
_ => return vec![],
};
vec![
format!("--health-cmd={}", cmd),
format!("--health-interval={}", interval),
format!("--health-retries={}", retries),
"--health-start-period=60s".to_string(),
]
}
/// Get per-app memory limit.
pub(super) fn get_memory_limit(app_id: &str) -> &'static str {
match app_id {
// Heavy apps
"bitcoin" | "bitcoin-core" | "bitcoin-knots" => "2g",
"onlyoffice" | "onlyoffice-documentserver" => "2g",
"ollama" => "4g",
// Medium apps
"lnd" => "512m",
"electrumx" | "mempool-electrs" | "electrs" => "1g",
"nextcloud" => "1g",
"immich_server" | "immich" => "1g",
"btcpay-server" | "btcpayserver" => "1g",
"homeassistant" | "home-assistant" => "512m",
"fedimint" => "512m",
"fedimint-gateway" => "512m",
"photoprism" => "1g",
// Light apps
"mempool-api" => "512m",
"mempool" | "mempool-web" | "archy-mempool-web" => "256m",
"grafana" => "256m",
"jellyfin" => "1g",
"vaultwarden" => "256m",
"uptime-kuma" => "256m",
"filebrowser" => "256m",
"searxng" => "512m",
"dwn" => "256m",
"portainer" => "256m",
"nostr-rs-relay" | "nostr-relay" => "256m",
"nginx-proxy-manager" => "256m",
// Databases
"archy-btcpay-db" | "archy-mempool-db" | "mysql-mempool" => "512m",
"immich_postgres" | "penpot-postgres" => "256m",
"immich_redis" | "penpot-valkey" => "128m",
// Default
_ => "512m",
}
}
/// Get all container names for an app (handles multi-container apps like mempool)
pub(super) async fn get_containers_for_app(package_id: &str) -> Result<Vec<String>> {
validate_app_id(package_id)?;
let output = tokio::process::Command::new("podman")
.args(["ps", "-a", "--format", "{{.Names}}"])
.output()
.await
.context("Failed to list containers")?;
let stdout = String::from_utf8_lossy(&output.stdout);
let all: Vec<&str> = stdout.lines().filter(|s| !s.is_empty()).collect();
let patterns: Vec<String> = match package_id {
"mempool" | "mempool-web" => {
vec![
"electrumx".into(),
"mempool-electrs".into(),
"mempool-api".into(),
"archy-mempool-api".into(),
"archy-mempool-web".into(),
"mempool".into(),
"archy-mempool-db".into(),
"mysql-mempool".into(),
]
}
"fedimint" => vec![
"fedimint".into(),
"fedimint-ui".into(),
"archy-fedimint".into(),
"fedimint-gateway".into(),
],
"fedimint-gateway" => vec!["fedimint-gateway".into()],
"immich" => vec![
"immich_postgres".into(),
"immich_redis".into(),
"immich_server".into(),
],
"penpot" | "penpot-frontend" => vec![
"penpot-postgres".into(),
"penpot-valkey".into(),
"penpot-backend".into(),
"penpot-exporter".into(),
"penpot-frontend".into(),
],
_ => vec![package_id.to_string(), format!("archy-{}", package_id)],
};
let mut result = Vec::new();
for name in all {
for pat in &patterns {
if name == pat {
result.push(name.to_string());
break;
}
}
}
Ok(result)
}
/// Get data directories to clean for an app.
/// Caller must validate package_id before calling.
pub(super) fn get_data_dirs_for_app(package_id: &str) -> Vec<String> {
let base = "/var/lib/archipelago";
match package_id {
"mempool" | "mempool-web" => vec![
format!("{}/mempool", base),
format!("{}/mysql-mempool", base),
format!("{}/electrumx", base),
format!("{}/mempool-electrs", base),
],
"fedimint" => vec![
format!("{}/fedimint", base),
format!("{}/fedimint-gateway", base),
],
"fedimint-gateway" => vec![format!("{}/fedimint-gateway", base)],
"immich" => vec![
format!("{}/immich", base),
format!("{}/immich-db", base),
],
"penpot" | "penpot-frontend" => vec![
format!("{}/penpot-assets", base),
format!("{}/penpot-postgres", base),
],
_ => vec![format!("{}/{}", base, package_id)],
}
}
/// Get app-specific configuration
/// Returns: (ports, volumes, env_vars, custom_command, custom_args)
pub(super) async fn get_app_config(
app_id: &str,
host_ip: &str,
allocator: &mut PortAllocator,
rpc_user: &str,
rpc_pass: &str,
) -> (
Vec<String>,
Vec<String>,
Vec<String>,
Option<String>,
Option<Vec<String>>,
) {
match app_id {
"homeassistant" | "home-assistant" => (
vec!["8123:8123".to_string()],
vec!["/var/lib/archipelago/home-assistant:/config".to_string()],
vec!["TZ=UTC".to_string()],
None,
None,
),
"bitcoin" | "bitcoin-core" | "bitcoin-knots" => (
vec!["8332:8332".to_string(), "8333:8333".to_string()],
vec!["/var/lib/archipelago/bitcoin:/home/bitcoin/.bitcoin".to_string()],
vec![],
None,
None,
),
"lnd" => (
vec![
"9735:9735".to_string(),
"10009:10009".to_string(),
"8080:8080".to_string(),
],
vec!["/var/lib/archipelago/lnd:/root/.lnd".to_string()],
vec!["BITCOIN_ACTIVE=1".to_string()],
None,
None,
),
"btcpay-server" | "btcpayserver" => (
vec!["23000:49392".to_string()],
vec!["/var/lib/archipelago/btcpay:/datadir".to_string()],
vec![
"ASPNETCORE_URLS=http://0.0.0.0:49392".to_string(),
"BTCPAY_PROTOCOL=http".to_string(),
format!("BTCPAY_HOST={}:23000", host_ip),
"BTCPAY_CHAINS=btc".to_string(),
format!("BTCPAY_BTCRPCURL=http://{}:8332", host_ip),
format!("BTCPAY_BTCRPCUSER={}", rpc_user),
format!("BTCPAY_BTCRPCPASSWORD={}", rpc_pass),
"BTCPAY_POSTGRES=User ID=btcpay;Password=btcpaypass;Host=archy-btcpay-db;Port=5432;Database=btcpay;Include Error Detail=true".to_string(),
],
None,
None,
),
"mempool" | "mempool-web" => (
vec!["4080:8080".to_string()],
vec![],
vec![format!("BACKEND_MAINNET_HTTP_HOST={}", host_ip)],
None,
None,
),
"mempool-api" => (
vec!["8999:8999".to_string()],
vec!["/var/lib/archipelago/mempool:/data".to_string()],
vec![
"MEMPOOL_BACKEND=electrum".to_string(),
"ELECTRUM_HOST=electrumx".to_string(),
"ELECTRUM_PORT=50001".to_string(),
"ELECTRUM_TLS_ENABLED=false".to_string(),
format!("CORE_RPC_HOST={}", host_ip),
"CORE_RPC_PORT=8332".to_string(),
format!("CORE_RPC_USERNAME={}", rpc_user),
format!("CORE_RPC_PASSWORD={}", rpc_pass),
"DATABASE_ENABLED=true".to_string(),
"DATABASE_HOST=archy-mempool-db".to_string(),
"DATABASE_DATABASE=mempool".to_string(),
"DATABASE_USERNAME=mempool".to_string(),
"DATABASE_PASSWORD=mempoolpass".to_string(),
],
None,
None,
),
"electrumx" | "mempool-electrs" | "electrs" => {
// Detect which bitcoin container is running for archy-net DNS resolution
let bitcoin_host = detect_bitcoin_container_name();
(
vec!["50001:50001".to_string()],
vec!["/var/lib/archipelago/electrumx:/data".to_string()],
vec![
format!(
"DAEMON_URL=http://{}:{}@{}:8332/",
rpc_user, rpc_pass, bitcoin_host
),
"COIN=Bitcoin".to_string(),
"DB_DIRECTORY=/data".to_string(),
"SERVICES=tcp://:50001,rpc://0.0.0.0:8000".to_string(),
],
None,
None,
)
}
"mysql-mempool" => (
vec![],
vec!["/var/lib/archipelago/mysql-mempool:/var/lib/mysql".to_string()],
vec![
"MYSQL_DATABASE=mempool".to_string(),
"MYSQL_USER=mempool".to_string(),
"MYSQL_PASSWORD=mempoolpass".to_string(),
"MYSQL_ROOT_PASSWORD=rootpass".to_string(),
],
None,
None,
),
"grafana" => (
vec!["3000:3000".to_string()],
vec!["/var/lib/archipelago/grafana:/var/lib/grafana".to_string()],
vec![
"GF_PATHS_DATA=/var/lib/grafana".to_string(),
"GF_USERS_ALLOW_SIGN_UP=false".to_string(),
],
None,
None,
),
"searxng" => (
vec!["8888:8080".to_string()],
vec![],
vec![],
None,
None,
),
"ollama" => (
vec!["11434:11434".to_string()],
vec!["/var/lib/archipelago/ollama:/root/.ollama".to_string()],
vec![],
None,
None,
),
"onlyoffice" | "onlyoffice-documentserver" => (
vec!["9980:80".to_string()],
vec![],
vec![],
None,
None,
),
"penpot" | "penpot-frontend" => (
vec!["9001:80".to_string()],
vec![],
vec![],
None,
None,
),
"nextcloud" => {
let host_port = allocator
.allocate_or_get(app_id, 8085, 80)
.await
.unwrap_or(8085);
(
vec![format!("{}:80", host_port)],
vec!["/var/lib/archipelago/nextcloud:/var/www/html".to_string()],
vec![],
None,
None,
)
}
"vaultwarden" => {
let host_port = allocator
.allocate_or_get(app_id, 8082, 80)
.await
.unwrap_or(8082);
(
vec![format!("{}:80", host_port)],
vec!["/var/lib/archipelago/vaultwarden:/data".to_string()],
vec![],
None,
None,
)
}
"jellyfin" => (
vec!["8096:8096".to_string()],
vec![
"/var/lib/archipelago/jellyfin/config:/config".to_string(),
"/var/lib/archipelago/jellyfin/cache:/cache".to_string(),
],
vec![],
None,
None,
),
"photoprism" => (
vec!["2342:2342".to_string()],
vec!["/var/lib/archipelago/photoprism:/photoprism/storage".to_string()],
vec![
"PHOTOPRISM_ADMIN_PASSWORD=archipelago".to_string(),
"PHOTOPRISM_DEFAULT_LOCALE=en".to_string(),
],
None,
None,
),
"immich" => (
vec!["2283:2283".to_string()],
vec!["/var/lib/archipelago/immich:/usr/src/app/upload".to_string()],
vec![
"DB_HOSTNAME=immich_postgres".to_string(),
"DB_USERNAME=postgres".to_string(),
"DB_PASSWORD=immichpass".to_string(),
"DB_DATABASE_NAME=immich".to_string(),
"REDIS_HOSTNAME=immich_redis".to_string(),
"UPLOAD_LOCATION=/usr/src/app/upload".to_string(),
],
None,
None,
),
"filebrowser" => {
let host_port = allocator
.allocate_or_get(app_id, 8083, 80)
.await
.unwrap_or(8083);
(
vec![format!("{}:80", host_port)],
vec!["/var/lib/archipelago/filebrowser:/srv".to_string()],
vec![],
None,
None,
)
}
"nginx-proxy-manager" => (
vec![
"81:81".to_string(),
"8084:80".to_string(),
"8443:443".to_string(),
],
vec![
"/var/lib/archipelago/nginx-proxy-manager/data:/data".to_string(),
"/var/lib/archipelago/nginx-proxy-manager/letsencrypt:/etc/letsencrypt".to_string(),
],
vec![],
None,
None,
),
"portainer" => (
vec!["9000:9000".to_string()],
vec![
"/var/lib/archipelago/portainer:/data".to_string(),
"/var/run/podman/podman.sock:/var/run/docker.sock".to_string(),
],
vec![],
None,
None,
),
"uptime-kuma" => (
vec!["3001:3001".to_string()],
vec!["/var/lib/archipelago/uptime-kuma:/app/data".to_string()],
vec!["TZ=UTC".to_string()],
None,
None,
),
"tailscale" => (
vec!["8240:8240".to_string()],
vec!["/var/lib/archipelago/tailscale:/var/lib/tailscale".to_string()],
vec!["TS_STATE_DIR=/var/lib/tailscale".to_string()],
Some(
"sh -c 'tailscale web --listen 0.0.0.0:8240 & exec tailscaled'".to_string(),
),
None,
),
"fedimint" => (
vec![
"8173:8173".to_string(),
"8174:8174".to_string(),
"8175:8175".to_string(),
],
vec!["/var/lib/archipelago/fedimint:/data".to_string()],
vec![
"FM_DATA_DIR=/data".to_string(),
format!("FM_BITCOIND_USERNAME={}", rpc_user),
format!("FM_BITCOIND_PASSWORD={}", rpc_pass),
"FM_BITCOIN_NETWORK=bitcoin".to_string(),
"FM_BIND_P2P=0.0.0.0:8173".to_string(),
"FM_BIND_API=0.0.0.0:8174".to_string(),
"FM_BIND_UI=0.0.0.0:8175".to_string(),
format!("FM_P2P_URL=fedimint://{}:8173", host_ip),
format!("FM_API_URL=ws://{}:8174", host_ip),
format!("FM_BITCOIND_URL=http://{}:8332", host_ip),
],
None,
None,
),
"fedimint-gateway" => (
vec!["8176:8176".to_string(), "9737:9737".to_string()],
vec!["/var/lib/archipelago/fedimint-gateway:/data".to_string()],
vec![],
None,
Some(vec![
"gatewayd".to_string(),
"--data-dir".to_string(),
"/data".to_string(),
"--listen".to_string(),
"0.0.0.0:8176".to_string(),
"--bcrypt-password-hash".to_string(),
"$2y$10$t9YjjxkiktrlYvjajB/zgOMDnSNVg4HqrbDqh47u7Jf42whNdxNqC".to_string(),
"--network".to_string(),
"bitcoin".to_string(),
"--bitcoind-url".to_string(),
format!("http://{}:8332", host_ip),
"--bitcoind-username".to_string(),
rpc_user.to_string(),
"--bitcoind-password".to_string(),
rpc_pass.to_string(),
"ldk".to_string(),
"--ldk-lightning-port".to_string(),
"9737".to_string(),
"--ldk-alias".to_string(),
"archipelago-gateway".to_string(),
]),
),
"indeedhub" => (
vec!["8190:3000".to_string()],
vec![],
vec![
"NODE_ENV=production".to_string(),
"NEXT_TELEMETRY_DISABLED=1".to_string(),
],
None,
None,
),
"nostr-rs-relay" => (
vec!["18081:8080".to_string()],
vec!["/var/lib/archipelago/nostr-rs-relay:/usr/src/app/db".to_string()],
vec![],
None,
None,
),
"dwn" => (
vec!["3100:3000".to_string()],
vec!["/var/lib/archipelago/dwn:/dwn/data".to_string()],
vec![
"DS_PORT=3000".to_string(),
"DS_MESSAGES_STORE_URI=level://data/messages".to_string(),
"DS_DATA_STORE_URI=level://data/data".to_string(),
"DS_EVENT_LOG_URI=level://data/events".to_string(),
],
None,
None,
),
_ => (vec![], vec![], vec![], None, None),
}
}

View File

@@ -0,0 +1,6 @@
mod config;
mod lifecycle;
mod validation;
// Re-export items needed by sibling modules (container.rs, security.rs)
pub(super) use validation::validate_app_id;

View File

@@ -0,0 +1,18 @@
use anyhow::Result;
/// Validate that a package/app ID is safe (lowercase alphanumeric + hyphens, 1-64 chars).
pub(in crate::api::rpc) fn validate_app_id(id: &str) -> Result<()> {
if id.is_empty() || id.len() > 64 {
anyhow::bail!("Invalid app id: must be 1-64 characters");
}
if !id
.bytes()
.all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
{
anyhow::bail!("Invalid app id: only lowercase letters, digits, and hyphens allowed");
}
if id.starts_with('-') {
anyhow::bail!("Invalid app id: must not start with a hyphen");
}
Ok(())
}

View File

@@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
pub(super) struct RpcRequest {
pub method: String,
pub params: Option<serde_json::Value>,
}
#[derive(Debug, Serialize)]
pub(super) struct RpcResponse {
pub result: Option<serde_json::Value>,
pub error: Option<RpcError>,
}
#[derive(Debug, Serialize)]
pub(super) struct RpcError {
pub code: i32,
pub message: String,
pub data: Option<serde_json::Value>,
}
/// Simple TTL cache for read-only RPC responses.
pub(super) struct ResponseCache {
entries: tokio::sync::RwLock<std::collections::HashMap<String, (std::time::Instant, serde_json::Value)>>,
ttl: std::time::Duration,
}
impl ResponseCache {
pub fn new(ttl_secs: u64) -> Self {
Self {
entries: tokio::sync::RwLock::new(std::collections::HashMap::new()),
ttl: std::time::Duration::from_secs(ttl_secs),
}
}
pub async fn get(&self, key: &str) -> Option<serde_json::Value> {
let entries = self.entries.read().await;
if let Some((ts, value)) = entries.get(key) {
if ts.elapsed() < self.ttl {
return Some(value.clone());
}
}
None
}
pub async fn set(&self, key: String, value: serde_json::Value) {
let mut entries = self.entries.write().await;
entries.insert(key, (std::time::Instant::now(), value));
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,374 @@
//! Bitcoin relay operations: TX broadcast, confirmation tracking, peer messaging.
use super::MeshCommand;
use super::MeshState;
use super::super::crypto;
use super::super::message_types;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};
/// Called on an internet-connected node when it receives a TxRelay request.
/// Broadcasts the raw TX to Bitcoin via RPC, sends the txid back, then
/// monitors for 3 confirmations and sends updates back via mesh.
pub(super) async fn handle_tx_relay_broadcast(
relay: message_types::TxRelayPayload,
sender_contact_id: u32,
state: &Arc<MeshState>,
) {
let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
{
Ok(c) => c,
Err(e) => {
warn!("Failed to create HTTP client for TX relay: {}", e);
return;
}
};
let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await;
// Pre-flight: check if Bitcoin Core is reachable and synced
if !preflight_check(&client, &rpc_user, &rpc_pass, &relay, sender_contact_id, state).await {
return;
}
// Step 1: Broadcast via Bitcoin Core RPC sendrawtransaction
let txid = match broadcast_transaction(&client, &rpc_user, &rpc_pass, &relay, sender_contact_id, state).await {
Some(id) => id,
None => return,
};
info!(request_id = relay.request_id, txid = %txid, "TX broadcast successful — tracking confirmations");
// Step 2: Send TxRelayResponse with txid back to originator
send_tx_relay_response(state, sender_contact_id, relay.request_id, Some(&txid), None, None).await;
// Step 3: Monitor confirmations (poll every 30s, up to 3 hours)
track_confirmations(&client, &txid, relay.request_id, sender_contact_id, state).await;
}
/// Pre-flight check: verify Bitcoin Core is reachable and synced.
/// Returns `true` if the node is ready, `false` if an error response was sent.
async fn preflight_check(
client: &reqwest::Client,
rpc_user: &str,
rpc_pass: &str,
relay: &message_types::TxRelayPayload,
sender_contact_id: u32,
state: &Arc<MeshState>,
) -> bool {
let preflight_body = serde_json::json!({
"jsonrpc": "1.0",
"id": "preflight",
"method": "getblockchaininfo",
"params": []
});
match client
.post(crate::constants::BITCOIN_RPC_URL)
.basic_auth(rpc_user, Some(rpc_pass))
.json(&preflight_body)
.send()
.await
{
Ok(resp) => {
if let Ok(rpc_resp) = resp.json::<serde_json::Value>().await {
if let Some(result) = rpc_resp.get("result") {
let ibd = result.get("initialblockdownload")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let progress = result.get("verificationprogress")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
if ibd || progress < 0.999 {
let pct = (progress * 100.0) as u32;
let msg = format!("Bitcoin node is syncing ({}%) — cannot broadcast yet", pct);
warn!(request_id = relay.request_id, "{}", msg);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&msg), Some("bitcoin_syncing")).await;
return false;
}
} else if let Some(err) = rpc_resp.get("error").and_then(|e| e.as_object()) {
let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("RPC error");
warn!(request_id = relay.request_id, "Bitcoin pre-flight failed: {}", msg);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&format!("Bitcoin node error: {}", msg)), Some("bitcoin_unreachable")).await;
return false;
}
}
}
Err(e) => {
let msg = format!("Bitcoin node unreachable — {}", if e.is_connect() {
"connection refused (node may be stopped)"
} else if e.is_timeout() {
"connection timed out"
} else {
"network error"
});
warn!(request_id = relay.request_id, "Pre-flight: {}: {}", msg, e);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&msg), Some("bitcoin_unreachable")).await;
return false;
}
}
true
}
/// Broadcast a raw transaction via Bitcoin Core RPC.
/// Returns the txid on success, or None if an error response was sent.
async fn broadcast_transaction(
client: &reqwest::Client,
rpc_user: &str,
rpc_pass: &str,
relay: &message_types::TxRelayPayload,
sender_contact_id: u32,
state: &Arc<MeshState>,
) -> Option<String> {
let body = serde_json::json!({
"jsonrpc": "1.0",
"id": "mesh-relay",
"method": "sendrawtransaction",
"params": [relay.tx_hex]
});
let txid = match client
.post(crate::constants::BITCOIN_RPC_URL)
.basic_auth(rpc_user, Some(rpc_pass))
.json(&body)
.send()
.await
{
Ok(resp) => {
match resp.json::<serde_json::Value>().await {
Ok(rpc_resp) => {
if let Some(err) = rpc_resp.get("error").and_then(|e| e.as_object()) {
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
let msg = err.get("message").and_then(|m| m.as_str()).unwrap_or("unknown");
let user_msg = match code {
-25 => format!("TX already in mempool or confirmed: {}", msg),
-26 => format!("TX rejected by mempool policy: {}", msg),
-27 => format!("TX already confirmed in a block"),
_ => format!("Bitcoin rejected TX (code {}): {}", code, msg),
};
warn!(request_id = relay.request_id, rpc_code = code, "sendrawtransaction: {}", msg);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&user_msg), Some(&format!("tx_rejected:{}", code))).await;
return None;
}
rpc_resp.get("result").and_then(|r| r.as_str()).map(|s| s.to_string())
}
Err(e) => {
warn!("Failed to parse Bitcoin RPC response: {}", e);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("Failed to parse Bitcoin node response"), Some("rpc_parse_error")).await;
return None;
}
}
}
Err(e) => {
let msg = format!("Bitcoin node unreachable during broadcast — {}", if e.is_connect() {
"connection refused"
} else if e.is_timeout() {
"timed out"
} else {
"network error"
});
warn!("Bitcoin Core RPC unreachable: {}", e);
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some(&msg), Some("bitcoin_unreachable")).await;
return None;
}
};
if txid.is_none() {
send_tx_relay_response(state, sender_contact_id, relay.request_id, None, Some("Bitcoin node returned no transaction ID"), Some("rpc_parse_error")).await;
}
txid
}
/// Monitor a transaction for confirmations (poll every 30s, up to 3 hours).
async fn track_confirmations(
client: &reqwest::Client,
txid: &str,
request_id: u64,
sender_contact_id: u32,
state: &Arc<MeshState>,
) {
let mut last_reported_confs: u32 = 0;
for _ in 0..360 {
tokio::time::sleep(Duration::from_secs(30)).await;
match check_tx_confirmations(client, txid).await {
Ok((confs, block_height)) => {
if confs > last_reported_confs && confs <= 3 {
info!(txid = %txid, confirmations = confs, "Sending confirmation update via mesh");
send_confirmation_update(state, sender_contact_id, request_id, txid, confs, block_height).await;
last_reported_confs = confs;
if confs >= 3 {
info!(txid = %txid, "TX fully confirmed (3/3) — done tracking");
return;
}
}
}
Err(e) => {
debug!(txid = %txid, "Confirmation check: {}", e);
}
}
}
}
/// Send a TxRelayResponse back to the originating peer.
async fn send_tx_relay_response(
state: &Arc<MeshState>,
dest_contact_id: u32,
request_id: u64,
txid: Option<&str>,
error: Option<&str>,
error_code: Option<&str>,
) {
let wire = match super::super::bitcoin_relay::build_tx_relay_response(request_id, txid, error, error_code) {
Ok(w) => w,
Err(e) => {
warn!("Failed to build TX relay response: {}", e);
return;
}
};
send_to_peer(state, dest_contact_id, wire).await;
}
/// Send a TxConfirmation update to the originator.
async fn send_confirmation_update(
state: &Arc<MeshState>,
dest_contact_id: u32,
request_id: u64,
txid: &str,
confirmations: u32,
block_height: u64,
) {
let conf = message_types::TxConfirmationPayload {
request_id,
txid: txid.to_string(),
confirmations,
block_height,
};
if let Ok(payload_bytes) = message_types::encode_payload(&conf) {
let envelope = message_types::TypedEnvelope::new(
message_types::MeshMessageType::TxConfirmation,
payload_bytes,
);
if let Ok(wire) = envelope.to_wire() {
send_to_peer(state, dest_contact_id, wire).await;
}
}
}
/// Encrypt a typed wire payload for a specific peer.
/// Attempts ratchet encryption first (forward secrecy), falls back to static
/// shared secret, falls back to plaintext if neither is available.
/// Respects the encrypt_relay config toggle for rollback.
async fn encrypt_for_peer(
state: &Arc<MeshState>,
contact_id: u32,
typed_wire: &[u8],
) -> Vec<u8> {
if !state.encrypt_relay {
return typed_wire.to_vec();
}
// Look up peer DID for ratchet session
let peer_did = state.peers.read().await
.get(&contact_id)
.and_then(|p| p.did.clone());
// Try ratchet encryption first (forward secrecy)
if let Some(ref did) = peer_did {
if state.session_manager.has_session(did).await {
match state.session_manager.encrypt_for_peer(did, typed_wire).await {
Ok(ratchet_msg) => {
let ratchet_bytes = ratchet_msg.to_bytes();
let mut buf = Vec::with_capacity(1 + ratchet_bytes.len());
buf.push(message_types::RATCHET_TYPED_MARKER);
buf.extend_from_slice(&ratchet_bytes);
debug!(contact_id, did = %did, "Encrypted with Double Ratchet (0xDD)");
return buf;
}
Err(e) => {
warn!(contact_id, did = %did, "Ratchet encrypt failed, trying static: {}", e);
}
}
}
}
// Fall back to static shared secret (0xEE)
let secrets = state.shared_secrets.read().await;
if let Some(secret) = secrets.get(&contact_id) {
match crypto::encrypt(secret, typed_wire) {
Ok(ciphertext) => {
let mut buf = Vec::with_capacity(1 + ciphertext.len());
buf.push(message_types::ENCRYPTED_TYPED_MARKER);
buf.extend_from_slice(&ciphertext);
debug!(contact_id, "Encrypted with static shared secret (0xEE)");
return buf;
}
Err(e) => {
warn!(contact_id, "Static encrypt failed, sending plaintext: {}", e);
}
}
}
// No encryption available — send plaintext
debug!(contact_id, "No encryption available, sending plaintext (0x02)");
typed_wire.to_vec()
}
/// Send raw wire bytes to a specific peer by contact_id.
/// Encrypts directed messages via ratchet or shared secret when available.
/// Falls back to channel 0 broadcast (plaintext) if peer's pubkey is unknown.
async fn send_to_peer(state: &Arc<MeshState>, contact_id: u32, typed_wire: Vec<u8>) {
let peers = state.peers.read().await;
if let Some(peer) = peers.get(&contact_id) {
if let Some(ref pk) = peer.pubkey_hex {
if let Ok(pk_bytes) = hex::decode(pk) {
if pk_bytes.len() >= 6 {
let mut prefix = [0u8; 6];
prefix.copy_from_slice(&pk_bytes[..6]);
drop(peers);
// Encrypt for this specific peer before sending
let payload = encrypt_for_peer(state, contact_id, &typed_wire).await;
let _ = state.cmd_tx.send(MeshCommand::SendRaw {
dest_pubkey_prefix: prefix,
payload,
}).await;
return;
}
}
}
}
drop(peers);
// Broadcast fallback — plaintext (no specific peer to encrypt for)
let _ = state.cmd_tx.send(MeshCommand::BroadcastChannel {
channel: 0,
payload: typed_wire,
}).await;
}
/// Check transaction confirmation count via Bitcoin Core RPC.
async fn check_tx_confirmations(client: &reqwest::Client, txid: &str) -> anyhow::Result<(u32, u64)> {
let body = serde_json::json!({
"jsonrpc": "1.0",
"id": "mesh-conf",
"method": "gettransaction",
"params": [txid]
});
let (rpc_user, rpc_pass) = crate::bitcoin_rpc::bitcoin_rpc_credentials().await;
let resp = client
.post(crate::constants::BITCOIN_RPC_URL)
.basic_auth(&rpc_user, Some(&rpc_pass))
.json(&body)
.send()
.await?;
let rpc_resp: serde_json::Value = resp.json().await?;
if let Some(result) = rpc_resp.get("result") {
let confs = result.get("confirmations").and_then(|c| c.as_u64()).unwrap_or(0) as u32;
let block_height = result.get("blockheight").and_then(|h| h.as_u64()).unwrap_or(0);
Ok((confs, block_height))
} else {
anyhow::bail!("gettransaction returned no result")
}
}

View File

@@ -0,0 +1,457 @@
//! Message decoding: base64, encryption, chunk reassembly, peer resolution.
use super::MeshState;
use super::super::crypto;
use super::super::message_types::{self, TypedEnvelope};
use super::super::types::*;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info, warn};
/// Try to base64-decode payload and check if the result is a typed envelope.
/// Handles: plain typed (0x02), steganographic (0xAA), and encrypted (0xEE).
/// Returns the decoded bytes if it's a valid base64-encoded TypedEnvelope.
pub(super) fn try_base64_typed(payload: &[u8]) -> Option<Vec<u8>> {
use base64::Engine;
if payload.is_empty() || payload[0] == message_types::TYPED_MESSAGE_MARKER {
return None;
}
let text = std::str::from_utf8(payload).ok()?;
let decoded = base64::engine::general_purpose::STANDARD.decode(text.trim()).ok()?;
unwrap_wire_layers(&decoded)
}
/// Try to base64-decode and decrypt an encrypted typed message.
/// Handles the common case where encrypted messages arrive as base64 text.
pub(super) async fn try_decrypt_base64(
payload: &[u8],
sender_contact_id: u32,
state: &Arc<MeshState>,
) -> Option<Vec<u8>> {
use base64::Engine;
let text = std::str::from_utf8(payload).ok()?;
let decoded = base64::engine::general_purpose::STANDARD.decode(text.trim()).ok()?;
if decoded.first() != Some(&message_types::ENCRYPTED_TYPED_MARKER) {
return None;
}
let secrets = state.shared_secrets.read().await;
try_decrypt_typed(&decoded, sender_contact_id, &secrets)
}
/// Try to decrypt a Double Ratchet encrypted message (0xDD prefix).
/// Format: [0xDD] [RatchetHeader(40) + nonce(12) + ciphertext + tag(16)]
/// Returns the decrypted typed wire bytes ([0x02][CBOR]) if successful.
async fn try_decrypt_ratchet(
decoded: &[u8],
sender_contact_id: u32,
state: &Arc<MeshState>,
) -> Option<Vec<u8>> {
if decoded.first() != Some(&message_types::RATCHET_TYPED_MARKER) {
return None;
}
let ratchet_bytes = &decoded[1..]; // skip 0xDD marker
let ratchet_msg = match super::super::ratchet::RatchetMessage::from_bytes(ratchet_bytes) {
Ok(msg) => msg,
Err(e) => {
warn!(contact_id = sender_contact_id, "Failed to parse ratchet message: {}", e);
return None;
}
};
// Look up peer DID for session manager
let peer_did = state.peers.read().await
.get(&sender_contact_id)
.and_then(|p| p.did.clone())?;
match state.session_manager.decrypt_from_peer(&peer_did, &ratchet_msg).await {
Ok(plaintext) => {
debug!(contact_id = sender_contact_id, did = %peer_did, "Decrypted ratchet message (0xDD)");
// The plaintext should be the original [0x02][CBOR] typed wire
if TypedEnvelope::is_typed(&plaintext) {
Some(plaintext)
} else {
// Could be nested stego -> typed
unwrap_wire_layers(&plaintext)
}
}
Err(e) => {
warn!(contact_id = sender_contact_id, "Ratchet decrypt failed: {}", e);
None
}
}
}
/// Try to base64-decode and decrypt a ratchet-encrypted message.
/// Handles the case where ratchet messages arrive as base64 text.
pub(super) async fn try_decrypt_ratchet_base64(
payload: &[u8],
sender_contact_id: u32,
state: &Arc<MeshState>,
) -> Option<Vec<u8>> {
use base64::Engine;
let text = std::str::from_utf8(payload).ok()?;
let decoded = base64::engine::general_purpose::STANDARD.decode(text.trim()).ok()?;
if decoded.first() != Some(&message_types::RATCHET_TYPED_MARKER) {
return None;
}
try_decrypt_ratchet(&decoded, sender_contact_id, state).await
}
/// Unwrap wire layers: encrypted (0xEE) -> stego (0xAA) -> typed (0x02).
/// Returns None if decoding fails at any layer (caller should use shared_secrets variant).
fn unwrap_wire_layers(decoded: &[u8]) -> Option<Vec<u8>> {
// Check for steganographic frame (0xAA prefix) — unwrap to typed envelope
if decoded.first() == Some(&super::super::steganography::STEGO_MARKER) {
match super::super::steganography::decode_typed_wire(decoded) {
Ok(typed_wire) => return Some(typed_wire),
Err(_) => return None,
}
}
if TypedEnvelope::is_typed(decoded) {
Some(decoded.to_vec())
} else {
None
}
}
/// Try to decrypt an encrypted typed message (0xEE prefix) using known shared secrets.
/// Format: [0xEE] [nonce: 12] [ciphertext + tag: 16]
fn try_decrypt_typed(
decoded: &[u8],
sender_contact_id: u32,
shared_secrets: &HashMap<u32, [u8; 32]>,
) -> Option<Vec<u8>> {
if decoded.first() != Some(&message_types::ENCRYPTED_TYPED_MARKER) {
return None;
}
let ciphertext = &decoded[1..]; // skip 0xEE marker
// Try sender's shared secret first (most likely)
if let Some(secret) = shared_secrets.get(&sender_contact_id) {
if let Ok(plaintext) = crypto::decrypt(secret, ciphertext) {
return unwrap_wire_layers(&plaintext);
}
}
// Fallback: try all known shared secrets (in case contact_id mapping is stale)
for (cid, secret) in shared_secrets {
if *cid == sender_contact_id { continue; } // already tried
if let Ok(plaintext) = crypto::decrypt(secret, ciphertext) {
return unwrap_wire_layers(&plaintext);
}
}
None
}
/// Check if payload is a mesh chunk ("MC" prefix) and try to reassemble.
/// Format: MC{msg_id:2hex}{chunk_idx:2hex}{total:2hex}{base64_data}
/// Returns Some(decoded_bytes) when all chunks have arrived.
pub(super) async fn try_chunk_reassemble(
payload: &[u8],
sender_contact_id: u32,
state: &Arc<MeshState>,
) -> Option<Vec<u8>> {
use base64::Engine;
let text = std::str::from_utf8(payload).ok()?;
if !text.starts_with("MC") || text.len() < 8 {
return None;
}
let msg_id = u8::from_str_radix(&text[2..4], 16).ok()?;
let chunk_idx = u8::from_str_radix(&text[4..6], 16).ok()?;
let total = u8::from_str_radix(&text[6..8], 16).ok()?;
let chunk_data = &text[8..];
if total == 0 || total > 20 {
return None; // sanity check
}
let key = (sender_contact_id, msg_id);
let mut buffer = state.chunk_buffer.write().await;
// Clean up stale entries (>120s old)
buffer.retain(|_, v| v.created.elapsed().as_secs() < 120);
let assembly = buffer.entry(key).or_insert_with(|| super::ChunkAssembly {
chunks: HashMap::new(),
total,
created: std::time::Instant::now(),
});
assembly.chunks.insert(chunk_idx, chunk_data.to_string());
assembly.total = total; // update in case first chunk had it wrong
debug!(msg_id, chunk_idx, total, received = assembly.chunks.len(), "Chunk received");
// Check if we have all chunks
if assembly.chunks.len() < total as usize {
return None;
}
// All chunks received — reassemble in order
let mut combined = String::new();
for i in 0..total {
match assembly.chunks.get(&i) {
Some(data) => combined.push_str(data),
None => {
warn!(msg_id, missing = i, "Chunk missing during reassembly");
return None;
}
}
}
if let Ok(decoded) = base64::engine::general_purpose::STANDARD.decode(&combined) {
// Check for ratchet-encrypted frame (0xDD) — decrypt then unwrap
if decoded.first() == Some(&message_types::RATCHET_TYPED_MARKER) {
// Must drop buffer lock before calling async try_decrypt_ratchet
let decoded_clone = decoded.clone();
drop(buffer);
if let Some(typed_wire) = try_decrypt_ratchet(&decoded_clone, sender_contact_id, state).await {
info!(msg_id, chunks = total, total_len = typed_wire.len(), "Reassembled ratchet-encrypted chunked message");
state.chunk_buffer.write().await.remove(&key);
return Some(typed_wire);
}
buffer = state.chunk_buffer.write().await;
}
// Check for static-encrypted frame (0xEE) — decrypt then unwrap
if decoded.first() == Some(&message_types::ENCRYPTED_TYPED_MARKER) {
let secrets = state.shared_secrets.read().await;
if let Some(typed_wire) = try_decrypt_typed(&decoded, sender_contact_id, &secrets) {
info!(msg_id, chunks = total, total_len = typed_wire.len(), "Reassembled encrypted chunked message");
buffer.remove(&key);
return Some(typed_wire);
}
}
// Check for stego frame — unwrap to typed envelope
if decoded.first() == Some(&super::super::steganography::STEGO_MARKER) {
if let Ok(typed_wire) = super::super::steganography::decode_typed_wire(&decoded) {
info!(msg_id, chunks = total, total_len = typed_wire.len(), "Reassembled stego chunked message");
buffer.remove(&key);
return Some(typed_wire);
}
}
if TypedEnvelope::is_typed(&decoded) {
info!(msg_id, chunks = total, total_len = decoded.len(), "Reassembled chunked message");
buffer.remove(&key);
return Some(decoded);
}
}
warn!(msg_id, "All chunks received but decode failed");
buffer.remove(&key);
None
}
/// Look up a peer by pubkey hex prefix. Returns (contact_id, display_name).
pub(super) async fn resolve_peer(state: &Arc<MeshState>, sender_prefix: &str) -> (u32, String) {
let peers = state.peers.read().await;
peers
.values()
.find(|p| {
p.pubkey_hex
.as_ref()
.map(|k| k.starts_with(sender_prefix))
.unwrap_or(false)
})
.map(|p| (p.contact_id, p.advert_name.clone()))
.unwrap_or((0, sender_prefix.to_string()))
}
/// Store a plain-text (non-typed) message and emit an event.
pub(super) async fn store_plain_message(
state: &Arc<MeshState>,
contact_id: u32,
peer_name: &str,
text: &str,
) {
let msg_id = state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name: Some(peer_name.to_string()),
plaintext: text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
}
/// Handle a received identity broadcast from a peer.
#[allow(dead_code)]
pub(super) async fn handle_identity_received(
contact_id: u32,
rssi: i16,
did: &str,
ed_pubkey_hex: &str,
x25519_pubkey_hex: &str,
state: &Arc<MeshState>,
our_x25519_secret: &[u8; 32],
) {
info!(
contact_id,
did = %did,
rssi,
"Archipelago peer discovered over mesh"
);
// Verify Ed25519 public key is valid
let ed_pubkey_bytes = match hex::decode(ed_pubkey_hex) {
Ok(b) if b.len() == 32 => {
let mut arr = [0u8; 32];
arr.copy_from_slice(&b);
arr
}
_ => {
warn!(contact_id, "Rejecting identity: invalid Ed25519 public key");
return;
}
};
if ed25519_dalek::VerifyingKey::from_bytes(&ed_pubkey_bytes).is_err() {
warn!(contact_id, "Rejecting identity: Ed25519 key is not a valid curve point");
return;
}
// Verify X25519 public key is consistent with Ed25519 key
let expected_x25519 = match crypto::ed25519_pubkey_to_x25519(&ed_pubkey_bytes) {
Ok(k) => k,
Err(e) => {
warn!(contact_id, "Rejecting identity: cannot derive X25519 from Ed25519: {}", e);
return;
}
};
// Decode X25519 public key
let x25519_bytes = match hex::decode(x25519_pubkey_hex) {
Ok(b) if b.len() == 32 => {
let mut arr = [0u8; 32];
arr.copy_from_slice(&b);
arr
}
_ => {
warn!(contact_id, "Rejecting identity: invalid X25519 public key");
return;
}
};
if x25519_bytes != expected_x25519 {
warn!(contact_id, did = %did, "Rejecting identity: X25519 key does not match Ed25519 key");
return;
}
// Derive shared secret for encrypted messaging
let shared_secret = crypto::x25519_shared_secret(our_x25519_secret, &x25519_bytes);
state
.shared_secrets
.write()
.await
.insert(contact_id, shared_secret);
// Update peer record
let peer = MeshPeer {
contact_id,
advert_name: format!("Archy-{}", &did[8..16.min(did.len())]),
did: Some(did.to_string()),
pubkey_hex: Some(ed_pubkey_hex.to_string()),
x25519_pubkey: Some(x25519_bytes),
rssi: Some(rssi),
snr: None,
last_heard: chrono::Utc::now().to_rfc3339(),
hops: 0,
};
let is_new = {
let mut peers = state.peers.write().await;
let is_new = !peers.contains_key(&contact_id);
peers.insert(contact_id, peer.clone());
is_new
};
state.update_peer_count().await;
let event = if is_new {
MeshEvent::PeerDiscovered(peer)
} else {
MeshEvent::PeerUpdated(peer)
};
let _ = state.event_tx.send(event);
let _ = state.event_tx.send(MeshEvent::IdentityReceived {
contact_id,
did: did.to_string(),
pubkey_hex: ed_pubkey_hex.to_string(),
x25519_pubkey: x25519_bytes,
});
}
/// Handle a received message (direct or channel).
#[allow(dead_code)]
pub(super) async fn handle_received_message(
contact_id: u32,
payload: &[u8],
rssi: i16,
is_channel: bool,
state: &Arc<MeshState>,
_our_x25519_secret: &[u8; 32],
) {
// Try to decrypt if we have a shared secret for this contact
let shared_secrets = state.shared_secrets.read().await;
let (plaintext, encrypted) = if let Some(secret) = shared_secrets.get(&contact_id) {
match crypto::decrypt(secret, payload) {
Ok(pt) => (String::from_utf8_lossy(&pt).to_string(), true),
Err(_) => {
// Not encrypted or wrong key — treat as plaintext
(String::from_utf8_lossy(payload).to_string(), false)
}
}
} else {
(String::from_utf8_lossy(payload).to_string(), false)
};
drop(shared_secrets);
// Update peer last_heard
{
let mut peers = state.peers.write().await;
if let Some(peer) = peers.get_mut(&contact_id) {
peer.last_heard = chrono::Utc::now().to_rfc3339();
peer.rssi = Some(rssi);
}
}
let peer_name = state
.peers
.read()
.await
.get(&contact_id)
.map(|p| p.advert_name.clone());
let msg_id = state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name,
plaintext: plaintext.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted,
};
state.store_message(msg.clone()).await;
{
let mut status = state.status.write().await;
status.messages_received += 1;
}
info!(
contact_id,
encrypted,
channel = is_channel,
"Received mesh message"
);
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
}

View File

@@ -0,0 +1,402 @@
//! Typed message dispatch — routes TypedEnvelope messages to type-specific handlers.
use super::bitcoin::handle_tx_relay_broadcast;
use super::decode::store_plain_message;
use super::MeshState;
use super::super::message_types::{self, MeshMessageType, TypedEnvelope};
use super::super::types::*;
use std::sync::Arc;
use tracing::{debug, info, warn};
/// Store a typed message with a type label for UI rendering.
async fn store_typed_message(
state: &Arc<MeshState>,
contact_id: u32,
peer_name: &str,
text: &str,
type_label: &str,
) {
let msg_id = state.next_id().await;
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Received,
peer_contact_id: contact_id,
peer_name: Some(peer_name.to_string()),
plaintext: format!("[{}] {}", type_label, text),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: true,
encrypted: false,
};
state.store_message(msg.clone()).await;
state.status.write().await.messages_received += 1;
let _ = state.event_tx.send(MeshEvent::MessageReceived(msg));
}
/// Handle a typed message envelope (0x02 prefix).
/// Dispatches to type-specific handlers: BlockHeader, Alert, TxRelay, etc.
pub(super) async fn handle_typed_message(
payload: &[u8],
sender_contact_id: u32,
sender_name: &str,
state: &Arc<MeshState>,
) {
let envelope = match TypedEnvelope::from_wire(payload) {
Ok(e) => e,
Err(e) => {
warn!(
payload_len = payload.len(),
first_bytes = %hex::encode(&payload[..payload.len().min(16)]),
"Failed to decode typed envelope: {}", e
);
return;
}
};
// Verify envelope signature if present, using the sender's known Ed25519 key
if envelope.sig.is_some() {
let peer_pubkey = state.peers.read().await
.get(&sender_contact_id)
.and_then(|p| p.pubkey_hex.as_ref())
.and_then(|hex_str| hex::decode(hex_str).ok())
.and_then(|bytes| {
if bytes.len() == 32 {
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
ed25519_dalek::VerifyingKey::from_bytes(&arr).ok()
} else {
None
}
});
if let Some(vk) = peer_pubkey {
match envelope.verify_signature(&vk) {
Ok(true) => {}
Ok(false) => {
warn!(peer = sender_contact_id, "Dropping message with invalid signature");
return;
}
Err(e) => {
warn!(peer = sender_contact_id, "Signature verification error: {}", e);
return;
}
}
}
}
let msg_type = envelope.message_type();
let type_label = msg_type.map(|t| t.label()).unwrap_or("unknown");
info!(
msg_type = type_label,
from = sender_contact_id,
"Received typed mesh message"
);
match msg_type {
Some(MeshMessageType::BlockHeader) => {
dispatch_block_header(&envelope, sender_contact_id, sender_name, state).await;
}
Some(MeshMessageType::Alert) => {
match message_types::decode_payload::<message_types::AlertPayload>(&envelope.v) {
Ok(alert) => {
let alert_type_str = format!("{:?}", alert.alert_type).to_lowercase();
info!(
alert_type = %alert_type_str,
from = sender_contact_id,
"Alert received via mesh: {}",
alert.message
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&alert.message,
"alert",
)
.await;
let _ = state.event_tx.send(MeshEvent::AlertReceived {
alert_type: alert_type_str,
message: alert.message,
from_contact_id: sender_contact_id,
});
}
Err(e) => warn!("Failed to decode alert payload: {}", e),
}
}
Some(MeshMessageType::TxRelay) => {
match message_types::decode_payload::<message_types::TxRelayPayload>(&envelope.v) {
Ok(relay) => {
// Validate transaction before relaying
if !super::super::bitcoin_relay::validate_raw_transaction(&relay.tx_hex) {
warn!(peer = sender_contact_id, "Rejected invalid TX relay");
return;
}
info!(
request_id = relay.request_id,
tx_len = relay.tx_hex.len(),
"TX relay request received — broadcasting to Bitcoin network"
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&format!("TX relay request #{} ({} hex chars)", relay.request_id, relay.tx_hex.len()),
"tx_relay",
)
.await;
// Spawn async task to broadcast via Bitcoin RPC and track confirmations
let relay_state = Arc::clone(state);
let relay_contact = sender_contact_id;
tokio::spawn(async move {
handle_tx_relay_broadcast(relay, relay_contact, &relay_state).await;
});
}
Err(e) => warn!("Failed to decode TX relay payload: {}", e),
}
}
Some(MeshMessageType::TxRelayResponse) => {
dispatch_tx_relay_response(&envelope, sender_contact_id, sender_name, state).await;
}
Some(MeshMessageType::LightningRelay) => {
match message_types::decode_payload::<message_types::LightningRelayPayload>(
&envelope.v,
) {
Ok(relay) => {
info!(
request_id = relay.request_id,
amount_sats = relay.amount_sats,
"Lightning relay request received"
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&format!("Lightning relay: {} sats", relay.amount_sats),
"lightning_relay",
)
.await;
// Will be wired to LND in Week 9
let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted {
request_id: relay.request_id,
payment_hash: None,
error: Some("Lightning relay processing not yet wired".to_string()),
});
}
Err(e) => warn!("Failed to decode Lightning relay payload: {}", e),
}
}
Some(MeshMessageType::LightningRelayResponse) => {
match message_types::decode_payload::<message_types::LightningRelayResponsePayload>(
&envelope.v,
) {
Ok(resp) => {
let status = if resp.payment_hash.is_some() { "paid" } else { "failed" };
info!(request_id = resp.request_id, status, "Lightning relay response");
let text = if let Some(ref hash) = resp.payment_hash {
format!("Lightning paid! hash: {}...", &hash[..16.min(hash.len())])
} else {
format!("Lightning failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
store_typed_message(state, sender_contact_id, sender_name, &text, "lightning_relay_response").await;
let _ = state.event_tx.send(MeshEvent::LightningRelayCompleted {
request_id: resp.request_id,
payment_hash: resp.payment_hash,
error: resp.error,
});
}
Err(e) => warn!("Failed to decode Lightning relay response: {}", e),
}
}
Some(MeshMessageType::Invoice) => {
match message_types::decode_payload::<message_types::InvoicePayload>(&envelope.v) {
Ok(invoice) => {
let text = format!(
"Invoice: {} sats{}",
invoice.amount_sats,
invoice.memo.as_ref().map(|m| format!("{}", m)).unwrap_or_default()
);
store_typed_message(state, sender_contact_id, sender_name, &text, "invoice").await;
}
Err(e) => warn!("Failed to decode invoice payload: {}", e),
}
}
Some(MeshMessageType::Coordinate) => {
match message_types::decode_payload::<message_types::Coordinate>(&envelope.v) {
Ok(coord) => {
let text = format!(
"Location: {:.6}, {:.6}{}",
coord.lat_degrees(),
coord.lng_degrees(),
coord.label.as_ref().map(|l| format!(" ({})", l)).unwrap_or_default()
);
store_typed_message(state, sender_contact_id, sender_name, &text, "coordinate").await;
}
Err(e) => warn!("Failed to decode coordinate payload: {}", e),
}
}
Some(MeshMessageType::TxConfirmation) => {
dispatch_tx_confirmation(&envelope, sender_contact_id, sender_name, state).await;
}
Some(MeshMessageType::Text) => {
// Typed text message — extract and store as plain text
let text = String::from_utf8_lossy(&envelope.v).to_string();
store_plain_message(state, sender_contact_id, sender_name, &text).await;
}
_ => {
debug!(
msg_type = ?msg_type,
"Unhandled typed message type"
);
}
}
}
/// Dispatch a BlockHeader typed message.
async fn dispatch_block_header(
envelope: &TypedEnvelope,
sender_contact_id: u32,
sender_name: &str,
state: &Arc<MeshState>,
) {
// Compact binary format: height(8) + hash(32) + timestamp(4)
match super::super::bitcoin_relay::decode_compact_block_header(&envelope.v) {
Ok((height, hash_hex, timestamp)) => {
// Validate header before accepting
let last_known = state.block_header_cache.latest_height().await;
if !super::super::bitcoin_relay::validate_block_header(height, &hash_hex, timestamp, last_known) {
warn!(peer = sender_contact_id, height, "Rejected invalid block header");
return;
}
info!(
height,
hash = %hash_hex,
"Block header received via mesh"
);
// Store in block header cache for the Off-Grid Bitcoin panel
let header_payload = message_types::BlockHeaderPayload {
height,
hash: hash_hex.clone(),
prev_hash: String::new(),
timestamp,
announced_by: sender_name.to_string(),
};
let _ = state.block_header_cache.store_header(header_payload).await;
let text = format!(
"Block #{}{}...{}",
height,
&hash_hex[..8.min(hash_hex.len())],
&hash_hex[hash_hex.len().saturating_sub(8)..]
);
store_typed_message(
state,
sender_contact_id,
sender_name,
&text,
"block_header",
)
.await;
let _ = state.event_tx.send(MeshEvent::BlockHeaderReceived {
height,
hash: hash_hex,
});
}
Err(e) => warn!("Failed to decode block header: {}", e),
}
}
/// Dispatch a TxRelayResponse typed message.
async fn dispatch_tx_relay_response(
envelope: &TypedEnvelope,
sender_contact_id: u32,
sender_name: &str,
state: &Arc<MeshState>,
) {
match message_types::decode_payload::<message_types::TxRelayResponsePayload>(&envelope.v) {
Ok(resp) => {
let status = if resp.txid.is_some() { "confirmed" } else { "failed" };
info!(
request_id = resp.request_id,
status,
error_code = resp.error_code.as_deref().unwrap_or("none"),
"TX relay response received"
);
let text = if let Some(ref txid) = resp.txid {
format!("TX relayed! txid: {}...{}", &txid[..8.min(txid.len())], &txid[txid.len().saturating_sub(8)..])
} else if let Some(ref code) = resp.error_code {
format!("TX relay failed [{}]: {}", code, resp.error.as_deref().unwrap_or("unknown"))
} else {
format!("TX relay failed: {}", resp.error.as_deref().unwrap_or("unknown"))
};
store_typed_message(state, sender_contact_id, sender_name, &text, "tx_relay_response").await;
// Store result for frontend polling
if let Some(ref tracker) = state.relay_tracker {
tracker.store_result(super::super::bitcoin_relay::RelayResult {
request_id: resp.request_id,
txid: resp.txid.clone(),
error: resp.error.clone(),
error_code: resp.error_code.clone(),
completed_at: chrono::Utc::now().to_rfc3339(),
}).await;
}
let _ = state.event_tx.send(MeshEvent::TxRelayCompleted {
request_id: resp.request_id,
txid: resp.txid,
error: resp.error,
});
}
Err(e) => warn!("Failed to decode TX relay response: {}", e),
}
}
/// Dispatch a TxConfirmation typed message.
async fn dispatch_tx_confirmation(
envelope: &TypedEnvelope,
sender_contact_id: u32,
sender_name: &str,
state: &Arc<MeshState>,
) {
match message_types::decode_payload::<message_types::TxConfirmationPayload>(&envelope.v) {
Ok(conf) => {
let status_text = if conf.confirmations >= 3 {
format!("TX {} confirmed ({}/3) at block #{}", &conf.txid[..12.min(conf.txid.len())], conf.confirmations, conf.block_height)
} else {
format!("TX {}{}/3 confirmations (block #{})", &conf.txid[..12.min(conf.txid.len())], conf.confirmations, conf.block_height)
};
info!(
txid = %conf.txid,
confirmations = conf.confirmations,
block_height = conf.block_height,
"TX confirmation update received"
);
store_typed_message(state, sender_contact_id, sender_name, &status_text, "tx_confirmation").await;
// Store confirmation for frontend polling
if let Some(ref tracker) = state.relay_tracker {
tracker.store_result(super::super::bitcoin_relay::RelayResult {
request_id: conf.request_id,
txid: Some(conf.txid.clone()),
error: None,
error_code: None,
completed_at: chrono::Utc::now().to_rfc3339(),
}).await;
}
let _ = state.event_tx.send(MeshEvent::TxRelayCompleted {
request_id: conf.request_id,
txid: Some(conf.txid),
error: None,
});
}
Err(e) => warn!("Failed to decode TX confirmation: {}", e),
}
}

View File

@@ -0,0 +1,143 @@
//! Inbound frame dispatcher — routes device frames to the appropriate handler.
use super::decode::{
resolve_peer, store_plain_message, try_base64_typed, try_chunk_reassemble,
try_decrypt_base64, try_decrypt_ratchet_base64,
};
use super::dispatch::handle_typed_message;
use super::MeshState;
use super::super::message_types::TypedEnvelope;
use super::super::protocol;
use std::sync::Arc;
use tracing::{debug, info, warn};
/// Handle a single inbound frame from the device.
/// Returns `true` if contacts should be refreshed from the device.
pub(super) async fn handle_frame(
frame: &protocol::InboundFrame,
state: &Arc<MeshState>,
our_x25519_secret: &[u8; 32],
) -> bool {
let _ = our_x25519_secret; // reserved for future per-frame decryption
match frame.code {
protocol::PUSH_NEW_CONTACT | protocol::PUSH_CONTACT_ADVERT => {
info!(code = frame.code, "Contact discovery event — refreshing contacts");
return true; // Signal caller to fetch contacts
}
protocol::PUSH_ACK => {
debug!("Message delivery confirmed");
// Could track which message was ACKed from frame.data
}
protocol::PUSH_MESSAGES_WAITING => {
info!("Device has messages waiting — will sync");
return true; // Signal caller to sync immediately
}
protocol::RESP_CONTACT_MSG_V3 => {
// Direct message received (v3 format) — check for typed envelope first
match protocol::parse_contact_msg_v3_raw(&frame.data) {
Ok((sender_prefix, payload, _snr)) => {
if !payload.is_empty() {
let (contact_id, name) = resolve_peer(state, &sender_prefix).await;
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, contact_id, &name, state).await;
} else if let Some(decoded) = try_base64_typed(&payload) {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if let Some(decoded) = try_decrypt_ratchet_base64(&payload, contact_id, state).await {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if let Some(decoded) = try_decrypt_base64(&payload, contact_id, state).await {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if let Some(decoded) = try_chunk_reassemble(&payload, contact_id, state).await {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if !payload.starts_with(b"MC") {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, contact_id, &name, &text).await;
info!(from = %sender_prefix, "Received mesh DM (v3)");
}
}
}
Err(e) => warn!("Failed to parse v3 message: {}", e),
}
}
protocol::RESP_CONTACT_MSG => {
// Direct message received (v1 format)
match protocol::parse_contact_msg_v1_raw(&frame.data) {
Ok((sender_prefix, payload)) => {
if !payload.is_empty() {
let (contact_id, name) = resolve_peer(state, &sender_prefix).await;
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, contact_id, &name, state).await;
} else if let Some(decoded) = try_base64_typed(&payload) {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if let Some(decoded) = try_decrypt_ratchet_base64(&payload, contact_id, state).await {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if let Some(decoded) = try_decrypt_base64(&payload, contact_id, state).await {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if let Some(decoded) = try_chunk_reassemble(&payload, contact_id, state).await {
handle_typed_message(&decoded, contact_id, &name, state).await;
} else if !payload.starts_with(b"MC") {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, contact_id, &name, &text).await;
info!(from = %sender_prefix, "Received mesh DM (v1)");
}
}
}
Err(e) => warn!("Failed to parse v1 message: {}", e),
}
}
protocol::RESP_CHANNEL_MSG_V3 => {
// Channel broadcast received (v3) — check for typed envelope
match protocol::parse_channel_msg_v3_raw(&frame.data) {
Ok((channel_idx, payload)) => {
if !payload.is_empty() {
let chan_contact_id = u32::MAX - (channel_idx as u32);
let chan_name = format!("Channel {}", channel_idx);
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, chan_contact_id, &chan_name, state).await;
} else {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, chan_contact_id, &chan_name, &text).await;
info!(channel = channel_idx, "Received mesh channel message (v3)");
}
}
}
Err(e) => warn!("Failed to parse v3 channel message: {}", e),
}
}
protocol::RESP_CHANNEL_MSG => {
// Channel broadcast received (v1)
match protocol::parse_channel_msg_v1_raw(&frame.data) {
Ok((channel_idx, payload)) => {
if !payload.is_empty() {
let chan_contact_id = u32::MAX - (channel_idx as u32);
let chan_name = format!("Channel {}", channel_idx);
if TypedEnvelope::is_typed(&payload) {
handle_typed_message(&payload, chan_contact_id, &chan_name, state).await;
} else {
let text = String::from_utf8_lossy(&payload).to_string();
store_plain_message(state, chan_contact_id, &chan_name, &text).await;
info!(channel = channel_idx, "Received mesh channel message");
}
}
}
Err(e) => warn!("Failed to parse channel message: {}", e),
}
}
protocol::PUSH_LOG_DATA | protocol::PUSH_PATH_UPDATE | protocol::PUSH_RAW_DATA => {
// Internal device logging/path data — safe to ignore
}
_ => {
if protocol::is_push_notification(frame.code) {
debug!(code = frame.code, "Unhandled push notification");
}
}
}
false
}

View File

@@ -0,0 +1,218 @@
//! Background mesh listener task.
//!
//! Runs as a long-lived tokio task that:
//! - Maintains the serial connection to the Meshcore device
//! - Reads incoming frames and dispatches events
//! - Periodically broadcasts our identity advertisement
//! - Reconnects on device disconnect
//! - Manages peer cache and message store
mod bitcoin;
mod decode;
mod dispatch;
mod frames;
mod session;
use super::types::*;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, RwLock};
use tracing::{error, info};
/// How often to broadcast our identity advertisement (seconds).
const ADVERT_INTERVAL: Duration = Duration::from_secs(60);
/// How often to poll for queued messages when no push notifications.
const SYNC_INTERVAL: Duration = Duration::from_secs(10);
/// Maximum stored messages (circular buffer).
const MAX_MESSAGES: usize = 100;
/// Initial delay before reconnection attempt after device disconnect.
const RECONNECT_DELAY_INIT: Duration = Duration::from_secs(5);
/// Maximum reconnect delay (cap for exponential backoff).
const RECONNECT_DELAY_MAX: Duration = Duration::from_secs(60);
/// Number of consecutive write failures before we consider the device dead
/// and trigger a reconnection cycle.
const MAX_CONSECUTIVE_WRITE_FAILURES: u32 = 3;
/// Command sent from MeshService to the listener task (which owns the serial port).
pub enum MeshCommand {
SendText { dest_pubkey_prefix: [u8; 6], payload: Vec<u8> },
/// Send pre-encoded binary (TypedEnvelope wire bytes) to a peer.
SendRaw { dest_pubkey_prefix: [u8; 6], payload: Vec<u8> },
/// Broadcast pre-encoded binary on a mesh channel.
BroadcastChannel { channel: u8, payload: Vec<u8> },
SendAdvert,
}
/// Shared state for the mesh listener, accessible from RPC handlers.
pub struct MeshState {
pub peers: RwLock<HashMap<u32, MeshPeer>>,
pub messages: RwLock<VecDeque<MeshMessage>>,
pub shared_secrets: RwLock<HashMap<u32, [u8; 32]>>,
pub status: RwLock<MeshStatus>,
pub event_tx: broadcast::Sender<MeshEvent>,
pub cmd_tx: mpsc::Sender<MeshCommand>,
next_message_id: RwLock<u64>,
/// Block header cache — populated when receiving headers from internet-connected peers.
pub block_header_cache: Arc<super::bitcoin_relay::BlockHeaderCache>,
/// Relay tracker — stores completed relay results for frontend polling.
pub relay_tracker: Option<Arc<super::bitcoin_relay::RelayTracker>>,
/// Steganography mode for outgoing/incoming messages.
pub stego_mode: super::steganography::SteganographyMode,
/// Chunk reassembly buffer for multi-frame messages.
chunk_buffer: RwLock<HashMap<(u32, u8), ChunkAssembly>>,
/// Double Ratchet session manager for forward-secret encryption.
pub session_manager: Arc<super::session::SessionManager>,
/// Whether to encrypt directed relay messages (config toggle for rollback).
pub encrypt_relay: bool,
}
/// In-progress chunk reassembly for a multi-frame message.
struct ChunkAssembly {
chunks: HashMap<u8, String>,
total: u8,
created: std::time::Instant,
}
impl MeshState {
pub fn new(
channel_name: &str,
block_header_cache: Arc<super::bitcoin_relay::BlockHeaderCache>,
relay_tracker: Option<Arc<super::bitcoin_relay::RelayTracker>>,
stego_mode: super::steganography::SteganographyMode,
encrypt_relay: bool,
session_manager: Arc<super::session::SessionManager>,
) -> (Arc<Self>, broadcast::Receiver<MeshEvent>, mpsc::Receiver<MeshCommand>) {
let (tx, rx) = broadcast::channel(64);
let (cmd_tx, cmd_rx) = mpsc::channel(32);
let state = Arc::new(Self {
peers: RwLock::new(HashMap::new()),
messages: RwLock::new(VecDeque::new()),
shared_secrets: RwLock::new(HashMap::new()),
cmd_tx,
status: RwLock::new(MeshStatus {
enabled: true,
device_type: DeviceType::Unknown,
device_path: None,
device_connected: false,
firmware_version: None,
self_node_id: None,
self_advert_name: None,
peer_count: 0,
channel_name: channel_name.to_string(),
messages_sent: 0,
messages_received: 0,
}),
event_tx: tx,
next_message_id: RwLock::new(1),
block_header_cache,
relay_tracker,
stego_mode,
chunk_buffer: RwLock::new(HashMap::new()),
session_manager,
encrypt_relay,
});
(state, rx, cmd_rx)
}
pub async fn next_id(&self) -> u64 {
let mut id = self.next_message_id.write().await;
let current = *id;
*id += 1;
current
}
pub async fn store_message(&self, msg: MeshMessage) {
let mut messages = self.messages.write().await;
messages.push_back(msg);
if messages.len() > MAX_MESSAGES {
messages.pop_front();
}
}
async fn update_peer_count(&self) {
let count = self.peers.read().await.len();
self.status.write().await.peer_count = count;
}
}
/// Spawn the background mesh listener task.
///
/// This task manages the full lifecycle:
/// 1. Detect and connect to Meshcore device
/// 2. Initialize and set advert name
/// 3. Main loop: read frames, dispatch events, periodic adverts
/// 4. Reconnect on disconnect
pub fn spawn_mesh_listener(
state: Arc<MeshState>,
device_path: Option<String>,
our_did: String,
our_ed_pubkey_hex: String,
our_x25519_secret: [u8; 32],
our_x25519_pubkey_hex: String,
shutdown: tokio::sync::watch::Receiver<bool>,
cmd_rx: mpsc::Receiver<MeshCommand>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut shutdown = shutdown;
let mut cmd_rx = cmd_rx;
let mut reconnect_delay = RECONNECT_DELAY_INIT;
loop {
if *shutdown.borrow() {
info!("Mesh listener shutting down");
return;
}
match session::run_mesh_session(
&state,
device_path.as_deref(),
&our_did,
&our_ed_pubkey_hex,
&our_x25519_secret,
&our_x25519_pubkey_hex,
&mut shutdown,
&mut cmd_rx,
)
.await
{
Ok(()) => {
info!("Mesh session ended cleanly");
// Session was established before ending — reset backoff
reconnect_delay = RECONNECT_DELAY_INIT;
}
Err(e) => {
// Check if session was ever connected (vs failed to open)
let was_connected = state.status.read().await.device_connected;
if was_connected {
reconnect_delay = RECONNECT_DELAY_INIT;
}
error!("Mesh session error: {} (retry in {:?})", e, reconnect_delay);
}
}
// Update status to disconnected
{
let mut status = state.status.write().await;
status.device_connected = false;
status.device_path = None;
}
let _ = state.event_tx.send(MeshEvent::DeviceDisconnected);
// Wait before reconnecting (exponential backoff)
tokio::select! {
_ = tokio::time::sleep(reconnect_delay) => {},
_ = shutdown.changed() => {
if *shutdown.borrow() { return; }
},
}
// Increase backoff for next failure, cap at max
reconnect_delay = (reconnect_delay * 2).min(RECONNECT_DELAY_MAX);
}
})
}

View File

@@ -0,0 +1,337 @@
//! Mesh session lifecycle: connect, initialize, main loop.
use super::{
frames, MeshCommand, MeshState,
ADVERT_INTERVAL, MAX_CONSECUTIVE_WRITE_FAILURES, SYNC_INTERVAL,
};
use super::super::serial::MeshcoreDevice;
use super::super::types::*;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
/// Scan all candidate serial ports and open the first Meshcore device found.
async fn auto_detect_and_open() -> Result<(String, MeshcoreDevice, DeviceInfo)> {
let paths = super::super::serial::detect_serial_devices().await;
if paths.is_empty() {
anyhow::bail!("No serial devices found in /dev");
}
for path in &paths {
debug!(path = %path, "Probing for Meshcore device");
match MeshcoreDevice::open(path).await {
Ok(mut dev) => match dev.initialize().await {
Ok(info) => {
info!(path = %path, firmware = %info.firmware_version, "Found Meshcore device via auto-detect");
return Ok((path.clone(), dev, info));
}
Err(e) => debug!(path = %path, error = %e, "Not a Meshcore device"),
},
Err(e) => debug!(path = %path, error = %e, "Could not open serial port"),
}
}
anyhow::bail!("No Meshcore device found on {} candidate ports: {:?}", paths.len(), paths)
}
/// Fetch the contacts list from the device and update the peer cache.
async fn refresh_contacts(
device: &mut MeshcoreDevice,
state: &Arc<MeshState>,
) {
match device.get_contacts().await {
Ok(contacts) => {
let mut peers = state.peers.write().await;
for (idx, contact) in contacts.iter().enumerate() {
let contact_id = idx as u32;
let existing = peers.get(&contact_id);
let peer = super::super::types::MeshPeer {
contact_id,
advert_name: contact.advert_name.clone(),
did: existing.and_then(|p| p.did.clone()),
pubkey_hex: Some(contact.public_key_hex.clone()),
x25519_pubkey: existing.and_then(|p| p.x25519_pubkey),
rssi: None,
snr: None,
last_heard: chrono::Utc::now().to_rfc3339(),
hops: 0,
};
peers.insert(contact_id, peer);
}
drop(peers);
state.update_peer_count().await;
if !contacts.is_empty() {
info!(count = contacts.len(), "Refreshed mesh contacts");
}
}
Err(e) => {
warn!("Failed to fetch contacts: {}", e);
}
}
}
/// Drain any queued messages from the device.
/// Returns `true` if a write/communication error occurred (for failure tracking).
async fn sync_queued_messages(
device: &mut MeshcoreDevice,
state: &Arc<MeshState>,
our_x25519_secret: &[u8; 32],
) -> bool {
match device.sync_messages().await {
Ok(frames) => {
for frame in &frames {
frames::handle_frame(frame, state, our_x25519_secret).await;
}
if !frames.is_empty() {
info!(count = frames.len(), "Synced queued mesh messages");
}
false
}
Err(e) => {
debug!("Message sync: {}", e);
true
}
}
}
/// Run a single mesh session (connect, initialize, main loop).
pub(super) async fn run_mesh_session(
state: &Arc<MeshState>,
preferred_path: Option<&str>,
our_did: &str,
_our_ed_pubkey_hex: &str,
our_x25519_secret: &[u8; 32],
_our_x25519_pubkey_hex: &str,
shutdown: &mut tokio::sync::watch::Receiver<bool>,
cmd_rx: &mut mpsc::Receiver<MeshCommand>,
) -> Result<()> {
// Detect device — try preferred path first, fall back to auto-detect
let (device_path, mut device, device_info) = if let Some(path) = preferred_path {
match MeshcoreDevice::open(path).await {
Ok(mut dev) => match dev.initialize().await {
Ok(info) => (path.to_string(), dev, info),
Err(e) => {
warn!("Preferred path {} handshake failed: {} — trying auto-detect", path, e);
auto_detect_and_open().await?
}
},
Err(e) => {
warn!("Preferred path {} open failed: {} — trying auto-detect", path, e);
auto_detect_and_open().await?
}
}
} else {
auto_detect_and_open().await?
};
// Update status
{
let mut status = state.status.write().await;
status.device_connected = true;
status.device_type = DeviceType::Meshcore;
status.device_path = Some(device_path.clone());
status.firmware_version = Some(device_info.firmware_version.clone());
status.self_node_id = Some(device_info.node_id);
status.self_advert_name = device.advert_name.clone();
}
let _ = state.event_tx.send(MeshEvent::DeviceConnected(device_info));
// Set advert name to something identifiable
let short_did = our_did.chars().skip(8).take(8).collect::<String>();
let advert_name = format!("Archy-{}", short_did);
if let Err(e) = device.set_advert_name(&advert_name).await {
warn!("Failed to set advert name: {}", e);
}
// Broadcast our advertisement so other nodes can discover us
if let Err(e) = device.send_self_advert().await {
warn!("Failed to send initial advert: {}", e);
}
// Fetch existing contacts from the device
refresh_contacts(&mut device, state).await;
// Sync any queued messages from before we connected
let _ = sync_queued_messages(&mut device, state, our_x25519_secret).await;
// Main loop
let mut advert_timer = tokio::time::interval(ADVERT_INTERVAL);
let mut sync_timer = tokio::time::interval(SYNC_INTERVAL);
advert_timer.tick().await; // skip first immediate tick
sync_timer.tick().await;
let mut consecutive_write_failures: u32 = 0;
loop {
// If too many consecutive writes have failed, the serial port is dead —
// bail out so the outer loop can reconnect to a (possibly re-enumerated) device.
if consecutive_write_failures >= MAX_CONSECUTIVE_WRITE_FAILURES {
error!(
failures = consecutive_write_failures,
"Serial port unresponsive — triggering reconnection"
);
anyhow::bail!("Serial port unresponsive after {} consecutive write failures", consecutive_write_failures);
}
tokio::select! {
// Check for incoming frames
frame_result = device.try_recv_frame() => {
match frame_result {
Ok(Some(frame)) => {
// Successful read resets the failure counter
consecutive_write_failures = 0;
let should_action = frames::handle_frame(
&frame,
state,
our_x25519_secret,
).await;
if should_action {
// Contact discovery or messages waiting — sync both
refresh_contacts(&mut device, state).await;
if sync_queued_messages(&mut device, state, our_x25519_secret).await {
consecutive_write_failures += 1;
}
}
}
Ok(None) => {
// No complete frame yet, that's fine
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(e) => {
error!("Serial read error: {}", e);
return Err(e);
}
}
}
// Periodic advertisement broadcast + contact refresh
_ = advert_timer.tick() => {
debug!("Periodic self-advert broadcast");
if let Err(e) = device.send_self_advert().await {
consecutive_write_failures += 1;
warn!(failures = consecutive_write_failures, "Failed to send advert: {}", e);
} else {
consecutive_write_failures = 0;
}
refresh_contacts(&mut device, state).await;
}
// Process send commands from MeshService
Some(cmd) = cmd_rx.recv() => {
handle_send_command(cmd, &mut device, state, &mut consecutive_write_failures).await;
}
// Periodic message sync
_ = sync_timer.tick() => {
if sync_queued_messages(&mut device, state, our_x25519_secret).await {
consecutive_write_failures += 1;
debug!(failures = consecutive_write_failures, "Message sync failed");
} else {
consecutive_write_failures = 0;
}
}
// Shutdown signal
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!("Mesh listener received shutdown signal");
return Ok(());
}
}
}
}
}
/// Process a single outbound command from MeshService.
async fn handle_send_command(
cmd: MeshCommand,
device: &mut MeshcoreDevice,
state: &Arc<MeshState>,
consecutive_write_failures: &mut u32,
) {
match cmd {
MeshCommand::SendText { dest_pubkey_prefix, payload } => {
if let Err(e) = device.send_text(&dest_pubkey_prefix, &payload).await {
*consecutive_write_failures += 1;
warn!(failures = *consecutive_write_failures, "Failed to send text via mesh: {}", e);
} else {
*consecutive_write_failures = 0;
info!(dest = %hex::encode(dest_pubkey_prefix), len = payload.len(), "Sent mesh message");
}
}
MeshCommand::SendRaw { dest_pubkey_prefix, payload } => {
// Apply steganographic encoding if configured
let wire_payload = if state.stego_mode != super::super::steganography::SteganographyMode::Normal
&& payload.first() == Some(&super::super::message_types::TYPED_MESSAGE_MARKER)
{
match super::super::steganography::encode_typed_wire(state.stego_mode, &payload) {
Ok(stego) => stego,
Err(e) => {
warn!("Stego encode failed, sending plain: {}", e);
payload
}
}
} else {
payload
};
// Base64 encode, then chunk if >140 chars (LoRa 160 byte limit)
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(&wire_payload);
if encoded.len() <= 140 {
// Single frame — fits in one LoRa packet
if let Err(e) = device.send_text(&dest_pubkey_prefix, encoded.as_bytes()).await {
*consecutive_write_failures += 1;
warn!(failures = *consecutive_write_failures, "Failed to send raw via mesh: {}", e);
} else {
*consecutive_write_failures = 0;
info!(dest = %hex::encode(dest_pubkey_prefix), len = encoded.len(), "Sent raw mesh message");
}
} else {
// Multi-frame chunking: "MCxxyyzz..." where xx=msg_id, yy=chunk_idx, zz=total_chunks
static CHUNK_MSG_ID: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0);
let msg_id = CHUNK_MSG_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let chunk_data_size = 132; // 160 - 8 header bytes ("MCxxyyzz") = 152, leave margin
let chunks: Vec<&str> = encoded.as_bytes().chunks(chunk_data_size)
.map(|c| std::str::from_utf8(c).unwrap_or(""))
.collect();
let total = chunks.len() as u8;
info!(
dest = %hex::encode(dest_pubkey_prefix),
raw_len = wire_payload.len(),
b64_len = encoded.len(),
chunks = total,
"Sending chunked mesh message"
);
for (idx, chunk) in chunks.iter().enumerate() {
let frame = format!("MC{:02x}{:02x}{:02x}{}", msg_id, idx as u8, total, chunk);
if let Err(e) = device.send_text(&dest_pubkey_prefix, frame.as_bytes()).await {
*consecutive_write_failures += 1;
warn!(failures = *consecutive_write_failures, chunk = idx, "Chunk send failed: {}", e);
break;
}
// Small delay between chunks to avoid overwhelming the radio
tokio::time::sleep(Duration::from_millis(500)).await;
}
*consecutive_write_failures = 0;
}
}
MeshCommand::BroadcastChannel { channel, payload } => {
if let Err(e) = device.send_channel_text(channel, &payload).await {
*consecutive_write_failures += 1;
warn!(failures = *consecutive_write_failures, "Failed to broadcast on channel {}: {}", channel, e);
} else {
*consecutive_write_failures = 0;
info!(channel, len = payload.len(), "Broadcast on mesh channel");
}
}
MeshCommand::SendAdvert => {
if let Err(e) = device.send_self_advert().await {
*consecutive_write_failures += 1;
warn!(failures = *consecutive_write_failures, "Failed to send advert: {}", e);
} else {
*consecutive_write_failures = 0;
}
}
}
}