diff --git a/core/archipelago/src/api/rpc/dispatcher.rs b/core/archipelago/src/api/rpc/dispatcher.rs index e2afecd2..6bf4f981 100644 --- a/core/archipelago/src/api/rpc/dispatcher.rs +++ b/core/archipelago/src/api/rpc/dispatcher.rs @@ -299,6 +299,19 @@ impl RpcHandler { "mesh.fetch-content" => self.handle_mesh_fetch_content(params).await, "mesh.send-reply" => self.handle_mesh_send_reply(params).await, "mesh.send-reaction" => self.handle_mesh_send_reaction(params).await, + "mesh.send-read-receipt" => self.handle_mesh_send_read_receipt(params).await, + "mesh.forward-message" => self.handle_mesh_forward_message(params).await, + "mesh.edit-message" => self.handle_mesh_edit_message(params).await, + "mesh.delete-message" => self.handle_mesh_delete_message(params).await, + "mesh.send-psbt" => self.handle_mesh_send_psbt(params).await, + "mesh.broadcast-presence" => self.handle_mesh_broadcast_presence(params).await, + "mesh.presence-list" => self.handle_mesh_presence_list(params).await, + "mesh.contacts-list" => self.handle_mesh_contacts_list(params).await, + "mesh.contacts-save" => self.handle_mesh_contacts_save(params).await, + "mesh.contacts-block" => self.handle_mesh_contacts_block(params).await, + "mesh.send-channel-invite" => self.handle_mesh_send_channel_invite(params).await, + "conversations.list" => self.handle_conversations_list(params).await, + "conversations.messages" => self.handle_conversations_messages(params).await, "mesh.outbox" => self.handle_mesh_outbox(params).await, "mesh.session-status" => self.handle_mesh_session_status(params).await, "mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await, diff --git a/core/archipelago/src/api/rpc/mesh/status.rs b/core/archipelago/src/api/rpc/mesh/status.rs index 57925479..0f7b4a5e 100644 --- a/core/archipelago/src/api/rpc/mesh/status.rs +++ b/core/archipelago/src/api/rpc/mesh/status.rs @@ -70,6 +70,91 @@ impl RpcHandler { } } + /// conversations.list — Unified inbox across mesh peers, mesh channels, + /// and federation nodes. Each conversation returns its latest message + /// timestamp + snippet + transport tag so the UI can render one sorted list. + pub(in crate::api::rpc) async fn handle_conversations_list( + &self, + _params: Option, + ) -> Result { + let mut conversations: Vec = Vec::new(); + let service = self.mesh_service.read().await; + if let Some(svc) = service.as_ref() { + let peers = svc.peers().await; + let messages = svc.messages(None).await; + // Per-peer last message. + for peer in &peers { + let last = messages + .iter() + .rev() + .find(|m| m.peer_contact_id == peer.contact_id); + let is_federation = peer.contact_id & 0x8000_0000 != 0; + conversations.push(serde_json::json!({ + "id": format!("{}:{}", if is_federation { "federation" } else { "mesh" }, peer.contact_id), + "transport": if is_federation { "federation" } else { "mesh" }, + "contact_id": peer.contact_id, + "name": peer.advert_name, + "pubkey": peer.pubkey_hex, + "last_text": last.map(|m| m.plaintext.clone()), + "last_timestamp": last.map(|m| m.timestamp.clone()), + "last_direction": last.map(|m| format!("{:?}", m.direction).to_lowercase()), + })); + } + // Channel 0 ("Archipelago") as a synthetic conversation. + let channel_last = messages + .iter() + .rev() + .find(|m| m.message_type == "text" && m.peer_contact_id == 0); + conversations.push(serde_json::json!({ + "id": "channel:0", + "transport": "channel", + "channel": 0, + "name": "Archipelago", + "last_text": channel_last.map(|m| m.plaintext.clone()), + "last_timestamp": channel_last.map(|m| m.timestamp.clone()), + })); + } + // Sort by last_timestamp desc (missing timestamps sink). + conversations.sort_by(|a, b| { + let at = a.get("last_timestamp").and_then(|v| v.as_str()).unwrap_or(""); + let bt = b.get("last_timestamp").and_then(|v| v.as_str()).unwrap_or(""); + bt.cmp(at) + }); + Ok(serde_json::json!({ "conversations": conversations })) + } + + /// conversations.messages — Return messages for a ConversationId string + /// (format: `mesh:` | `federation:` | `channel:`). + pub(in crate::api::rpc) async fn handle_conversations_messages( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let id = params["id"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing id"))?; + let (kind, rest) = id + .split_once(':') + .ok_or_else(|| anyhow::anyhow!("Invalid conversation id"))?; + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let all = svc.messages(None).await; + let filtered: Vec<_> = match kind { + "mesh" | "federation" => { + let contact_id: u32 = rest.parse().unwrap_or(0); + all.into_iter().filter(|m| m.peer_contact_id == contact_id).collect() + } + "channel" => { + // For now the channel bucket keeps contact_id = 0. + all.into_iter().filter(|m| m.peer_contact_id == 0).collect() + } + _ => Vec::new(), + }; + Ok(serde_json::json!({ "messages": filtered })) + } + /// mesh.debug-dump — Full in-memory state snapshot for debugging. /// Returns peers, all messages, status, shared-secret peer ids, encrypt_relay /// flag, and stego mode. Intended for smoke tests and bug investigation. diff --git a/core/archipelago/src/api/rpc/mesh/typed_messages.rs b/core/archipelago/src/api/rpc/mesh/typed_messages.rs index da0b0326..aec0bb33 100644 --- a/core/archipelago/src/api/rpc/mesh/typed_messages.rs +++ b/core/archipelago/src/api/rpc/mesh/typed_messages.rs @@ -1,8 +1,10 @@ use super::super::RpcHandler; use crate::blobs::DEFAULT_CAP_TTL_SECS; use crate::mesh::message_types::{ - self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MessageKey, - MeshMessageType, ReactionPayload, ReplyPayload, TypedEnvelope, + self, AlertPayload, AlertType, ChannelInvitePayload, ContentRefPayload, Coordinate, + DeletePayload, EditPayload, ForwardPayload, InvoicePayload, MessageKey, MeshMessageType, + PresencePayload, PsbtHashPayload, ReactionPayload, ReadReceiptPayload, ReplyPayload, + TypedEnvelope, }; use anyhow::Result; use tracing::info; @@ -302,21 +304,28 @@ impl RpcHandler { // budget (cid alone is 64 hex chars, plus onion + cap). Route via // federation when the peer has a known onion; fall back to LoRa // only for tiny envelopes that could theoretically fit. - // Match mesh peer → federation node by master DID, NOT by pubkey. - // Mesh adverts carry a LoRa-local ed25519 key that differs from the - // archipelago node's identity key in federation/nodes.json; the DID - // is the only stable identifier the two transports share. + // + // Federation peers are pre-seeded into mesh state with their + // archipelago pubkey as `pubkey_hex` and DID populated — so a direct + // lookup on either key finds the onion. The `explicit_peer_onion` + // frontend override and the DID path remain as fallbacks for the + // transitional case where a mesh-discovered LoRa contact also + // happens to be federated. let federation_onion = if let Some(onion) = explicit_peer_onion { Some(onion) } else { let nodes = crate::federation::load_nodes(&self.config.data_dir) .await .unwrap_or_default(); - if let Some(did) = peer_did.as_ref() { - nodes.into_iter().find(|n| &n.did == did).map(|n| n.onion) - } else { - None - } + nodes + .iter() + .find(|n| n.pubkey == peer_pubkey_hex) + .map(|n| n.onion.clone()) + .or_else(|| { + peer_did.as_ref().and_then(|did| { + nodes.iter().find(|n| &n.did == did).map(|n| n.onion.clone()) + }) + }) }; let msg = if let Some(onion) = federation_onion { svc.send_typed_wire_via_federation( @@ -560,4 +569,449 @@ impl RpcHandler { "local_url": local_url, })) } + + /// mesh.send-psbt — share a PSBT sign-request with a mesh peer. + /// Params: `{ contact_id, psbt_hash, description, amount_sats }`. The payload + /// carries only the SHA-256 hash; actual PSBT bytes travel out-of-band via + /// a ContentRef or federation if needed. + pub(in crate::api::rpc) async fn handle_mesh_send_psbt( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let psbt_hash = params["psbt_hash"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing psbt_hash"))? + .to_string(); + let description = params["description"].as_str().unwrap_or("PSBT sign request").to_string(); + let amount_sats = params["amount_sats"].as_u64().unwrap_or(0); + + let payload = PsbtHashPayload { + psbt_hash: psbt_hash.clone(), + description: description.clone(), + amount_sats, + }; + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let body = message_types::encode_payload(&payload)?; + let envelope = TypedEnvelope::new(MeshMessageType::PsbtHash, body).with_seq(seq); + let wire = envelope.to_wire()?; + let display = format!("PSBT {} sats — {}", amount_sats, description); + let typed_json = serde_json::to_value(&payload).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "psbt_hash", &display, typed_json, seq) + .await?; + info!(contact_id, psbt_hash = %psbt_hash, "Sent PSBT hash over mesh"); + Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq })) + } + + /// mesh.send-read-receipt — "I've seen everything from `target_pubkey` up to `target_seq`." + /// Params: `{ contact_id, target_pubkey, target_seq }`. The peer uses this to roll + /// forward the ✓✓ marker on its local Sent bubbles. + pub(in crate::api::rpc) async fn handle_mesh_send_read_receipt( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let target_pubkey = params["target_pubkey"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing target_pubkey"))? + .to_string(); + let target_seq = params["target_seq"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?; + + let receipt = ReadReceiptPayload { + up_to: MessageKey { sender_pubkey: target_pubkey, sender_seq: target_seq }, + }; + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&receipt)?; + let envelope = TypedEnvelope::new(MeshMessageType::ReadReceipt, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let display = format!("seen ≤ #{}", receipt.up_to.sender_seq); + let typed_json = serde_json::to_value(&receipt).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "read_receipt", &display, typed_json, seq) + .await?; + info!(contact_id, seq, "Sent read receipt over mesh"); + Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq })) + } + + /// mesh.forward-message — Re-broadcast an existing local message to another peer, + /// preserving original sender attribution. Params: `{ contact_id, source_message_id }`. + /// We look up the source by local id, pull its typed_payload (or plaintext for Text), + /// and wrap it in a Forward envelope with the original MessageKey + timestamp. + pub(in crate::api::rpc) async fn handle_mesh_forward_message( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let source_id = params["source_message_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing source_message_id"))?; + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + + // Pull the source message from MeshState. + let source = svc + .find_message_by_id(source_id) + .await + .ok_or_else(|| anyhow::anyhow!("Source message {} not found", source_id))?; + + // Forwarding a message without a stable MessageKey is meaningless — + // the receiver can't attribute it. + let orig_pubkey = source.sender_pubkey.clone().ok_or_else(|| { + anyhow::anyhow!("Source message has no sender_pubkey — cannot forward") + })?; + let orig_seq = source + .sender_seq + .ok_or_else(|| anyhow::anyhow!("Source message has no sender_seq — cannot forward"))?; + + // Re-encode the original body. For typed messages we serialize the + // existing typed_payload JSON back through CBOR via the original type; + // for plain text we forward the plaintext as Text. + let (body_type, body): (u8, Vec) = match source.typed_payload.as_ref() { + Some(json) => { + let type_label = source.message_type.as_str(); + let t = MeshMessageType::from_label(type_label) + .unwrap_or(MeshMessageType::Text) as u8; + let mut buf = Vec::new(); + ciborium::into_writer(json, &mut buf) + .map_err(|e| anyhow::anyhow!("re-encode body failed: {}", e))?; + (t, buf) + } + None => (MeshMessageType::Text as u8, source.plaintext.clone().into_bytes()), + }; + + let forward = ForwardPayload { + orig: MessageKey { sender_pubkey: orig_pubkey, sender_seq: orig_seq }, + orig_ts: source + .timestamp + .parse::>() + .map(|dt| dt.timestamp() as u32) + .unwrap_or(chrono::Utc::now().timestamp() as u32), + orig_name: source.peer_name.clone(), + body_type, + body, + }; + + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&forward)?; + let envelope = TypedEnvelope::new(MeshMessageType::Forward, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let display = format!( + "Forwarded: {}", + if source.plaintext.is_empty() { "(attachment)" } else { source.plaintext.as_str() } + ); + let typed_json = serde_json::to_value(&forward).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "forward", &display, typed_json, seq) + .await?; + info!(contact_id, seq, source_id, "Forwarded message over mesh"); + Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq })) + } + + /// mesh.edit-message — In-place edit of an earlier message's text. The target + /// must have been sent by this node (own MessageKey). Params: + /// `{ contact_id, target_seq, new_text }`. `target_pubkey` is implicit (self). + pub(in crate::api::rpc) async fn handle_mesh_edit_message( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let target_seq = params["target_seq"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?; + let new_text = params["new_text"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing new_text"))? + .to_string(); + + let self_pubkey = { + let guard = self.self_pubkey_hex.read().await; + guard + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Self pubkey not set"))? + .clone() + }; + + let edit = EditPayload { + target: MessageKey { sender_pubkey: self_pubkey, sender_seq: target_seq }, + new_text: new_text.clone(), + edited_at: chrono::Utc::now().timestamp() as u32, + }; + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&edit)?; + let envelope = TypedEnvelope::new(MeshMessageType::Edit, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let typed_json = serde_json::to_value(&edit).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "edit", &new_text, typed_json, seq) + .await?; + + // Best-effort: apply the edit to our own local copy too, so the UI + // updates without waiting for an echo. + svc.apply_local_edit(target_seq, &new_text, edit.edited_at).await; + + info!(contact_id, seq, target_seq, "Sent edit over mesh"); + Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq })) + } + + /// mesh.delete-message — Tombstone an earlier own-message. Params: + /// `{ contact_id, target_seq }`. Applied locally immediately; wire form is + /// informational for peers who already have the bytes. + pub(in crate::api::rpc) async fn handle_mesh_delete_message( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let target_seq = params["target_seq"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?; + + let self_pubkey = { + let guard = self.self_pubkey_hex.read().await; + guard + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Self pubkey not set"))? + .clone() + }; + + let del = DeletePayload { + target: MessageKey { sender_pubkey: self_pubkey, sender_seq: target_seq }, + }; + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&del)?; + let envelope = TypedEnvelope::new(MeshMessageType::Delete, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let typed_json = serde_json::to_value(&del).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "delete", "(deleted)", typed_json, seq) + .await?; + + svc.apply_local_delete(target_seq).await; + + info!(contact_id, seq, target_seq, "Sent delete over mesh"); + Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq })) + } + + /// mesh.broadcast-presence — emit a PresencePayload heartbeat on the + /// channel so online peers can update their presence table. + /// Params: `{ channel?, status? }`. Defaults: channel 0, status "online". + pub(in crate::api::rpc) async fn handle_mesh_broadcast_presence( + &self, + params: Option, + ) -> Result { + let params = params.unwrap_or(serde_json::json!({})); + let channel = params["channel"].as_u64().unwrap_or(0) as u8; + let status = params["status"].as_str().unwrap_or("online").to_string(); + + let presence = PresencePayload { + status: status.clone(), + last_active: chrono::Utc::now().timestamp() as u32, + }; + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(0).await; + let payload = message_types::encode_payload(&presence)?; + let envelope = TypedEnvelope::new(MeshMessageType::Presence, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let typed_json = serde_json::to_value(&presence).ok(); + // Best-effort: if the mesh device isn't connected, skip silently — + // presence heartbeats don't deserve a user-visible error. + match svc + .send_channel_typed_wire(channel, wire, "presence", &status, typed_json, seq) + .await + { + Ok(_) => Ok(serde_json::json!({ "sent": true, "sender_seq": seq })), + Err(e) => Ok(serde_json::json!({ "sent": false, "reason": e.to_string() })), + } + } + + /// mesh.presence-list — return the in-memory presence map (pubkey → status+timestamps). + pub(in crate::api::rpc) async fn handle_mesh_presence_list( + &self, + _params: Option, + ) -> Result { + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let state = svc.shared_state(); + let presence = state.presence.read().await; + let list: Vec<_> = presence + .iter() + .map(|(pk, (status, last_active, received_at))| { + serde_json::json!({ + "pubkey": pk, + "status": status, + "last_active": last_active, + "received_at": received_at, + }) + }) + .collect(); + Ok(serde_json::json!({ "presence": list })) + } + + /// mesh.contacts-list — return the contacts store merged with the peer list. + pub(in crate::api::rpc) async fn handle_mesh_contacts_list( + &self, + _params: Option, + ) -> Result { + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let state = svc.shared_state(); + let contacts = state.contacts.read().await; + let peers = state.peers.read().await; + let mut out: Vec = Vec::new(); + for peer in peers.values() { + if let Some(pk) = peer.pubkey_hex.as_ref() { + let entry = contacts.get(pk).cloned().unwrap_or_default(); + out.push(serde_json::json!({ + "pubkey": pk, + "contact_id": peer.contact_id, + "name": peer.advert_name, + "alias": entry.alias, + "notes": entry.notes, + "pinned": entry.pinned, + "blocked": entry.blocked, + })); + } + } + Ok(serde_json::json!({ "contacts": out })) + } + + /// mesh.contacts-save — create/update a contact entry (alias/notes/pinned). + /// Params: `{ pubkey, alias?, notes?, pinned? }`. + pub(in crate::api::rpc) async fn handle_mesh_contacts_save( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let pubkey = params["pubkey"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing pubkey"))? + .to_string(); + let alias = params["alias"].as_str().map(|s| s.to_string()); + let notes = params["notes"].as_str().map(|s| s.to_string()); + let pinned = params["pinned"].as_bool(); + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let state = svc.shared_state(); + let mut contacts = state.contacts.write().await; + let entry = contacts.entry(pubkey.clone()).or_default(); + if alias.is_some() { entry.alias = alias; } + if notes.is_some() { entry.notes = notes; } + if let Some(p) = pinned { entry.pinned = p; } + let saved = entry.clone(); + Ok(serde_json::json!({ + "saved": true, + "pubkey": pubkey, + "alias": saved.alias, + "notes": saved.notes, + "pinned": saved.pinned, + "blocked": saved.blocked, + })) + } + + /// mesh.contacts-block — toggle the blocked flag on a contact. + /// Params: `{ pubkey, blocked }`. + pub(in crate::api::rpc) async fn handle_mesh_contacts_block( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let pubkey = params["pubkey"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("Missing pubkey"))? + .to_string(); + let blocked = params["blocked"].as_bool().unwrap_or(true); + + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let state = svc.shared_state(); + let mut contacts = state.contacts.write().await; + let entry = contacts.entry(pubkey.clone()).or_default(); + entry.blocked = blocked; + Ok(serde_json::json!({ "pubkey": pubkey, "blocked": blocked })) + } + + /// mesh.send-channel-invite — share a channel invite with a direct peer. + /// Params: `{ contact_id, channel, name, key? }`. + pub(in crate::api::rpc) async fn handle_mesh_send_channel_invite( + &self, + params: Option, + ) -> Result { + let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?; + let contact_id = params["contact_id"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32; + let channel = params["channel"] + .as_u64() + .ok_or_else(|| anyhow::anyhow!("Missing channel"))? as u8; + let name = params["name"].as_str().unwrap_or("").to_string(); + let key = params["key"].as_str().map(|s| s.to_string()); + + let invite = ChannelInvitePayload { channel, name: name.clone(), key }; + let service = self.mesh_service.read().await; + let svc = service + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?; + let seq = svc.next_send_seq(contact_id).await; + let payload = message_types::encode_payload(&invite)?; + let envelope = TypedEnvelope::new(MeshMessageType::ChannelInvite, payload).with_seq(seq); + let wire = envelope.to_wire()?; + let display = format!("Channel invite: {} ({})", channel, name); + let typed_json = serde_json::to_value(&invite).ok(); + let msg = svc + .send_typed_wire(contact_id, wire, "channel_invite", &display, typed_json, seq) + .await?; + Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq })) + } } diff --git a/core/archipelago/src/mesh/listener/dispatch.rs b/core/archipelago/src/mesh/listener/dispatch.rs index 3911bdac..d3e9d067 100644 --- a/core/archipelago/src/mesh/listener/dispatch.rs +++ b/core/archipelago/src/mesh/listener/dispatch.rs @@ -314,6 +314,170 @@ pub(crate) async fn handle_typed_envelope_direct( } } + Some(MeshMessageType::Presence) => { + match message_types::decode_payload::(&envelope.v) { + Ok(p) => { + let sender_pubkey = state + .peers + .read() + .await + .get(&sender_contact_id) + .and_then(|peer| peer.pubkey_hex.clone()); + if let Some(pk) = sender_pubkey { + let now = chrono::Utc::now().timestamp() as u64; + state.presence.write().await.insert(pk, (p.status, p.last_active, now)); + } + } + Err(e) => warn!("Failed to decode presence payload: {}", e), + } + } + + Some(MeshMessageType::ChannelInvite) => { + match message_types::decode_payload::(&envelope.v) { + Ok(inv) => { + let display = format!("Invited to channel {} ({})", inv.channel, inv.name); + let json = payload_to_json(&inv); + store_typed_message(state, sender_contact_id, sender_name, &display, "channel_invite", json, Some(envelope.seq)).await; + } + Err(e) => warn!("Failed to decode channel_invite payload: {}", e), + } + } + + Some(MeshMessageType::PsbtHash) => { + match message_types::decode_payload::(&envelope.v) { + Ok(psbt) => { + let display = format!("PSBT {} sats — {}", psbt.amount_sats, psbt.description); + let json = payload_to_json(&psbt); + store_typed_message(state, sender_contact_id, sender_name, &display, "psbt_hash", json, Some(envelope.seq)).await; + } + Err(e) => warn!("Failed to decode psbt_hash payload: {}", e), + } + } + + Some(MeshMessageType::ReadReceipt) => { + match message_types::decode_payload::(&envelope.v) { + Ok(receipt) => { + // Roll the ✓✓ delivered marker forward on every Sent + // message whose MessageKey ≤ (receipt.up_to.sender_pubkey, + // receipt.up_to.sender_seq) addressed to this peer. The + // pubkey in `up_to` is OUR pubkey (the peer echoing back + // what they've read), so match on sender_seq only within + // own-Sent records for this peer_contact_id. + let mut messages = state.messages.write().await; + for m in messages.iter_mut() { + if matches!(m.direction, MessageDirection::Sent) + && m.peer_contact_id == sender_contact_id + && m.sender_seq + .map(|s| s <= receipt.up_to.sender_seq) + .unwrap_or(false) + { + m.delivered = true; + } + } + drop(messages); + info!( + peer = sender_contact_id, + up_to = receipt.up_to.sender_seq, + "Applied read receipt" + ); + } + Err(e) => warn!("Failed to decode read_receipt payload: {}", e), + } + } + + Some(MeshMessageType::Forward) => { + match message_types::decode_payload::(&envelope.v) { + Ok(fwd) => { + let name = fwd.orig_name.as_deref().unwrap_or("someone"); + let text = format!("Forwarded from {}", name); + let json = payload_to_json(&fwd); + store_typed_message( + state, + sender_contact_id, + sender_name, + &text, + "forward", + json, + Some(envelope.seq), + ) + .await; + } + Err(e) => warn!("Failed to decode forward payload: {}", e), + } + } + + Some(MeshMessageType::Edit) => { + match message_types::decode_payload::(&envelope.v) { + Ok(edit) => { + // Only accept edits from the pubkey the target claims to + // come from — prevents peer A editing a message authored + // by peer B. + let sender_pubkey = state + .peers + .read() + .await + .get(&sender_contact_id) + .and_then(|p| p.pubkey_hex.clone()); + if sender_pubkey.as_deref() != Some(edit.target.sender_pubkey.as_str()) { + warn!( + peer = sender_contact_id, + "Rejecting edit — sender pubkey does not match target" + ); + } else { + let mut messages = state.messages.write().await; + for m in messages.iter_mut() { + if m.sender_pubkey.as_deref() == Some(edit.target.sender_pubkey.as_str()) + && m.sender_seq == Some(edit.target.sender_seq) + { + m.plaintext = edit.new_text.clone(); + let mut obj = match m.typed_payload.take() { + Some(serde_json::Value::Object(o)) => o, + _ => serde_json::Map::new(), + }; + obj.insert("edited_at".to_string(), serde_json::json!(edit.edited_at)); + obj.insert("text".to_string(), serde_json::json!(edit.new_text)); + m.typed_payload = Some(serde_json::Value::Object(obj)); + break; + } + } + } + } + Err(e) => warn!("Failed to decode edit payload: {}", e), + } + } + + Some(MeshMessageType::Delete) => { + match message_types::decode_payload::(&envelope.v) { + Ok(del) => { + let sender_pubkey = state + .peers + .read() + .await + .get(&sender_contact_id) + .and_then(|p| p.pubkey_hex.clone()); + if sender_pubkey.as_deref() != Some(del.target.sender_pubkey.as_str()) { + warn!( + peer = sender_contact_id, + "Rejecting delete — sender pubkey does not match target" + ); + } else { + let mut messages = state.messages.write().await; + for m in messages.iter_mut() { + if m.sender_pubkey.as_deref() == Some(del.target.sender_pubkey.as_str()) + && m.sender_seq == Some(del.target.sender_seq) + { + m.plaintext = "🗑 message deleted".to_string(); + m.typed_payload = Some(serde_json::json!({ "deleted": true })); + m.message_type = "delete".to_string(); + break; + } + } + } + } + Err(e) => warn!("Failed to decode delete payload: {}", e), + } + } + Some(MeshMessageType::ContentRef) => { match message_types::decode_payload::(&envelope.v) { Ok(content) => { @@ -331,9 +495,21 @@ pub(crate) async fn handle_typed_envelope_direct( } Some(MeshMessageType::Text) => { - // Typed text message — extract and store as plain text + // Typed text arrives with a `sender_seq` in the envelope, so we + // can store it with a full MessageKey instead of the bare + // plain-text path — that's what makes replies and reactions + // addressable against text bubbles. let text = String::from_utf8_lossy(&envelope.v).to_string(); - store_plain_message(state, sender_contact_id, sender_name, &text).await; + store_typed_message( + state, + sender_contact_id, + sender_name, + &text, + "text", + None, + Some(envelope.seq), + ) + .await; } _ => { diff --git a/core/archipelago/src/mesh/listener/mod.rs b/core/archipelago/src/mesh/listener/mod.rs index 945a4d72..9d345365 100644 --- a/core/archipelago/src/mesh/listener/mod.rs +++ b/core/archipelago/src/mesh/listener/mod.rs @@ -14,6 +14,7 @@ mod frames; mod session; use super::types::*; +use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::Duration; @@ -86,6 +87,24 @@ pub struct MeshState { pub session_manager: Arc, /// Whether to encrypt directed relay messages (config toggle for rollback). pub encrypt_relay: bool, + /// Last-seen presence heartbeats per peer pubkey hex: (status, last_active_epoch, received_at). + pub presence: RwLock>, + /// Contacts store — alias/notes/pinned/blocked per peer pubkey hex. + pub contacts: RwLock>, +} + +/// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to +/// the top of the chat list, blocked ones are filtered out of notifications. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct ContactEntry { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub alias: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub notes: Option, + #[serde(default)] + pub pinned: bool, + #[serde(default)] + pub blocked: bool, } /// In-progress chunk reassembly for a multi-frame message. @@ -133,6 +152,8 @@ impl MeshState { chunk_buffer: RwLock::new(HashMap::new()), session_manager, encrypt_relay, + presence: RwLock::new(HashMap::new()), + contacts: RwLock::new(HashMap::new()), }); (state, rx, cmd_rx) } @@ -172,7 +193,7 @@ impl MeshState { } } - async fn update_peer_count(&self) { + pub(crate) async fn update_peer_count(&self) { let count = self.peers.read().await.len(); self.status.write().await.peer_count = count; } diff --git a/core/archipelago/src/mesh/message_types.rs b/core/archipelago/src/mesh/message_types.rs index 16839b37..fd356ded 100644 --- a/core/archipelago/src/mesh/message_types.rs +++ b/core/archipelago/src/mesh/message_types.rs @@ -46,8 +46,25 @@ pub enum MeshMessageType { Reply = 13, /// Emoji reaction on an earlier message, targeted by MessageKey. Reaction = 14, + /// Read receipt — "I've seen everything up to and including this MessageKey." + ReadReceipt = 15, + /// Forwarded message — wraps the original sender+timestamp+body so the + /// receiver can render "Forwarded from " above the original content. + Forward = 16, + /// In-place edit of an earlier message (text replacement). UI shows "edited". + Edit = 17, + /// Tombstone for a prior message — peer already has the bytes, we just + /// mark it deleted locally. "🗑 message deleted" in the UI. + Delete = 18, /// Attachment/file reference: CID of a blob held by the sender, fetched out-of-band. ContentRef = 19, + /// Periodic heartbeat advertising presence + last-activity epoch. + Presence = 20, + /// Channel membership announcement — used by the Phase 5 channels/groups feature. + ChannelInvite = 21, + /// Shareable contact card — advertises a federation node (did, onion, pubkey). + /// Lets the receiver one-click-federate with that node. + ContactCard = 22, } impl MeshMessageType { @@ -68,7 +85,46 @@ impl MeshMessageType { 12 => Some(Self::TxConfirmation), 13 => Some(Self::Reply), 14 => Some(Self::Reaction), + 15 => Some(Self::ReadReceipt), + 16 => Some(Self::Forward), + 17 => Some(Self::Edit), + 18 => Some(Self::Delete), 19 => Some(Self::ContentRef), + 20 => Some(Self::Presence), + 21 => Some(Self::ChannelInvite), + 22 => Some(Self::ContactCard), + _ => None, + } + } + + /// Inverse of `label()` — used by the Forward path when it needs to + /// re-encode a stored message whose original `MeshMessageType` we only + /// kept as a string label in `MeshMessage.message_type`. + pub fn from_label(label: &str) -> Option { + match label { + "text" => Some(Self::Text), + "alert" => Some(Self::Alert), + "invoice" => Some(Self::Invoice), + "psbt_hash" => Some(Self::PsbtHash), + "coordinate" => Some(Self::Coordinate), + "prekey_bundle" => Some(Self::PrekeyBundle), + "session_init" => Some(Self::SessionInit), + "block_header" => Some(Self::BlockHeader), + "tx_relay" => Some(Self::TxRelay), + "tx_relay_response" => Some(Self::TxRelayResponse), + "lightning_relay" => Some(Self::LightningRelay), + "lightning_relay_response" => Some(Self::LightningRelayResponse), + "tx_confirmation" => Some(Self::TxConfirmation), + "reply" => Some(Self::Reply), + "reaction" => Some(Self::Reaction), + "read_receipt" => Some(Self::ReadReceipt), + "forward" => Some(Self::Forward), + "edit" => Some(Self::Edit), + "delete" => Some(Self::Delete), + "content_ref" => Some(Self::ContentRef), + "presence" => Some(Self::Presence), + "channel_invite" => Some(Self::ChannelInvite), + "contact_card" => Some(Self::ContactCard), _ => None, } } @@ -90,7 +146,14 @@ impl MeshMessageType { Self::TxConfirmation => "tx_confirmation", Self::Reply => "reply", Self::Reaction => "reaction", + Self::ReadReceipt => "read_receipt", + Self::Forward => "forward", + Self::Edit => "edit", + Self::Delete => "delete", Self::ContentRef => "content_ref", + Self::Presence => "presence", + Self::ChannelInvite => "channel_invite", + Self::ContactCard => "contact_card", } } } @@ -387,6 +450,66 @@ pub struct ContentRefPayload { pub cap_exp: u64, } +/// Read receipt — "I have seen every message from this sender up to and +/// including `up_to`." Receivers apply this to fold the ✓✓ "seen" marker +/// onto all local messages whose MessageKey ≤ `up_to` for that pubkey. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReadReceiptPayload { + pub up_to: MessageKey, +} + +/// Forwarded message. Carries the original sender's pubkey/seq/timestamp and +/// an optional human display name alongside the re-broadcast body so the UI +/// can render "Forwarded from " above the original content. `body_type` +/// + `body` hold the re-serialized original payload bytes. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForwardPayload { + pub orig: MessageKey, + pub orig_ts: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub orig_name: Option, + /// Original MeshMessageType (u8) — lets the receiver render the inner + /// content using the same renderer as the original. + pub body_type: u8, + pub body: Vec, +} + +/// In-place edit of an earlier message. `target` must have originated from +/// the sender (enforced on receive). `new_text` replaces plaintext; UI +/// appends an "edited" marker using `edited_at`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EditPayload { + pub target: MessageKey, + pub new_text: String, + pub edited_at: u32, +} + +/// Tombstone for a prior message by the same sender. Receivers replace the +/// target's displayed content with a "🗑 message deleted" marker locally. +/// True removal is impossible (peers already have the bytes). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeletePayload { + pub target: MessageKey, +} + +/// Presence heartbeat — sent periodically so peers know you're online. +/// `status` is a short tag ("online", "away", "dnd"); `last_active` is epoch secs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PresencePayload { + pub status: String, + pub last_active: u32, +} + +/// ChannelInvite — advertise/invite a peer to join a channel. `key` is an +/// optional base64 pre-shared secret; absent `key` means public. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChannelInvitePayload { + pub channel: u8, + pub name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub key: Option, +} + /// Transaction confirmation update (relay node → originator). /// Sent after each new confirmation (1, 2, 3) until fully confirmed. #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/core/archipelago/src/mesh/mod.rs b/core/archipelago/src/mesh/mod.rs index 3c5a4bd4..5c2b89ff 100644 --- a/core/archipelago/src/mesh/mod.rs +++ b/core/archipelago/src/mesh/mod.rs @@ -35,6 +35,74 @@ use tracing::{error, info, warn}; const MESH_CONFIG_FILE: &str = "mesh-config.json"; +/// Derive a stable synthetic `contact_id` for a federation peer from its +/// archipelago ed25519 pubkey. Mesh LoRa contacts use meshcore firmware's +/// own ID space (small ints 0..N), so federation peers are mapped into the +/// high half of u32 space to avoid collision. Both the receive path +/// (`inject_typed_from_federation`) and the startup pre-seed use this +/// formula so they always produce the same id for the same peer. +pub(crate) fn federation_peer_contact_id(archipelago_pubkey_hex: &str) -> u32 { + let bytes = hex::decode(archipelago_pubkey_hex).unwrap_or_default(); + if bytes.len() < 4 { + return 0x8000_0001; + } + let low = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]); + 0x8000_0000 | (low & 0x7FFF_FFFF) +} + +/// Upsert a mesh peer record representing a federation node so the UI can +/// address it as a chat and `mesh.send-content` can route ContentRef to it. +/// Existing entries (same contact_id) are updated in place, preserving any +/// previously observed radio state (rssi/snr/hops). +pub(crate) async fn upsert_federation_peer( + state: &Arc, + archipelago_pubkey_hex: &str, + did: &str, + name: Option<&str>, +) -> u32 { + let contact_id = federation_peer_contact_id(archipelago_pubkey_hex); + let display_name = name + .map(|s| s.to_string()) + .unwrap_or_else(|| { + let short = &archipelago_pubkey_hex[..archipelago_pubkey_hex.len().min(8)]; + format!("Archipelago {}", short) + }); + let mut peers = state.peers.write().await; + let existing = peers.get(&contact_id).cloned(); + let peer = MeshPeer { + contact_id, + advert_name: display_name, + did: Some(did.to_string()), + pubkey_hex: Some(archipelago_pubkey_hex.to_string()), + x25519_pubkey: existing.as_ref().and_then(|p| p.x25519_pubkey), + rssi: existing.as_ref().and_then(|p| p.rssi), + snr: existing.as_ref().and_then(|p| p.snr), + last_heard: chrono::Utc::now().to_rfc3339(), + hops: existing.as_ref().map(|p| p.hops).unwrap_or(0), + }; + peers.insert(contact_id, peer); + drop(peers); + state.update_peer_count().await; + contact_id +} + +/// Load federation nodes from disk and upsert each as a synthetic mesh peer. +/// Called at MeshService startup so the chat list already contains every +/// known federation node — users can share files to them without first +/// receiving a message. +pub(crate) async fn seed_federation_peers_into_mesh( + state: &Arc, + data_dir: &Path, +) { + let nodes = match crate::federation::load_nodes(data_dir).await { + Ok(n) => n, + Err(_) => return, + }; + for node in nodes { + upsert_federation_peer(state, &node.pubkey, &node.did, node.name.as_deref()).await; + } +} + /// Mesh configuration (persisted to disk). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MeshConfig { @@ -185,6 +253,13 @@ impl MeshService { }), ); + // Pre-seed mesh state with a synthetic peer record for every known + // federation node. Mesh LoRa discovery and federation have disjoint + // identity namespaces, so without this step a user can't address a + // federated peer from the mesh UI until one side receives over the + // radio — which never happens for nodes that only share Tor. + seed_federation_peers_into_mesh(&state, data_dir).await; + Ok(Self { state, config, @@ -474,6 +549,59 @@ impl MeshService { Ok(dest_prefix) } + /// Split an oversized wire payload into MC-framed base64 chunks and send + /// each via the mesh device. Matches the receive-side reassembly in + /// `mesh/listener/decode.rs::handle_chunked_frame` (header `MCIIXXTT`, + /// 20-chunk cap, 152 base64 chars per chunk). The caller must ensure + /// the peer exists and the device is connected. + async fn send_chunked_payload(&self, contact_id: u32, payload: Vec) -> Result<()> { + use base64::Engine; + const HEADER_LEN: usize = 8; // MC + msg_id(2) + chunk_idx(2) + total(2) + const MAX_CHUNK_B64: usize = protocol::MAX_MESSAGE_LEN - HEADER_LEN; + const MAX_CHUNKS: u8 = 20; + + let b64 = base64::engine::general_purpose::STANDARD.encode(&payload); + let total_chunks = ((b64.len() + MAX_CHUNK_B64 - 1) / MAX_CHUNK_B64) as u8; + if total_chunks == 0 || total_chunks > MAX_CHUNKS { + anyhow::bail!( + "Payload too large to chunk: {} bytes → {} chunks (max {})", + payload.len(), + total_chunks, + MAX_CHUNKS + ); + } + + // Pick a 1-byte msg_id. Use the low 8 bits of the unix nanos; not + // cryptographically unique but collisions within a 120s reassembly + // window are astronomically unlikely for normal send rates. + let msg_id: u8 = + (chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64 & 0xFF) as u8; + + let dest_prefix = self.peer_dest_prefix(contact_id).await?; + for chunk_idx in 0..total_chunks { + let start = chunk_idx as usize * MAX_CHUNK_B64; + let end = (start + MAX_CHUNK_B64).min(b64.len()); + let chunk = &b64[start..end]; + let frame = format!("MC{:02X}{:02X}{:02X}{}", msg_id, chunk_idx, total_chunks, chunk); + self.state + .cmd_tx + .send(listener::MeshCommand::SendText { + dest_pubkey_prefix: dest_prefix, + payload: frame.into_bytes(), + }) + .await + .map_err(|_| anyhow::anyhow!("Mesh listener not running"))?; + } + tracing::info!( + contact_id, + msg_id, + chunks = total_chunks, + bytes = payload.len(), + "Sent chunked payload over mesh" + ); + Ok(()) + } + /// Send raw wire payload bytes to a peer (no Sent-record bookkeeping). /// Callers are responsible for storing the MeshMessage record afterwards. async fn send_raw_payload(&self, contact_id: u32, payload: Vec) -> Result<()> { @@ -521,6 +649,61 @@ impl MeshService { typed_payload: Option, sender_seq: u64, ) -> Result { + // Federation-synthetic contacts (high bit set) don't exist in the + // meshcore firmware contact table, so LoRa send would fail at + // `peer_dest_prefix`. Any envelope larger than the LoRa frame budget + // also needs the federation path. In both cases we look up the + // peer's onion (by archipelago pubkey first, then by DID) and POST + // over Tor; otherwise the send falls through to LoRa. + let is_federation_synthetic = contact_id & 0x8000_0000 != 0; + let exceeds_lora = wire.len() > protocol::MAX_MESSAGE_LEN; + if is_federation_synthetic || exceeds_lora { + let (peer_pubkey, peer_did) = { + let peers = self.state.peers.read().await; + match peers.get(&contact_id) { + Some(p) => (p.pubkey_hex.clone(), p.did.clone()), + None if is_federation_synthetic => { + anyhow::bail!("Unknown federation peer {}", contact_id); + } + None => (None, None), + } + }; + let nodes = crate::federation::load_nodes(&self.data_dir) + .await + .unwrap_or_default(); + let onion = peer_pubkey + .as_ref() + .and_then(|pk| nodes.iter().find(|n| &n.pubkey == pk).map(|n| n.onion.clone())) + .or_else(|| { + peer_did.as_ref().and_then(|d| { + nodes.iter().find(|n| &n.did == d).map(|n| n.onion.clone()) + }) + }); + if let Some(onion) = onion { + return self + .send_typed_wire_via_federation( + contact_id, + &onion, + wire, + type_label, + display_text, + typed_payload, + sender_seq, + ) + .await; + } + if exceeds_lora { + // No federation path — fall back to send-side chunking. Receive + // side already handles MC-framed base64 reassembly for up to 20 + // chunks (~3KB) per message, which is plenty for ContentRef or + // long replies when the peer is LoRa-only. + self.send_chunked_payload(contact_id, wire).await?; + return Ok(self + .record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq) + .await); + } + // Fall through: federation-synthetic case handled above, shouldn't reach here. + } self.send_raw_payload(contact_id, wire).await?; Ok(self .record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq) @@ -601,42 +784,39 @@ impl MeshService { wire: Vec, ) -> Result<()> { let envelope = crate::mesh::message_types::TypedEnvelope::from_wire(&wire)?; - // The sender's `from_pubkey_hex` is their archipelago identity key, - // which differs from the mesh peer's LoRa advert pubkey. Resolve - // identity → DID → mesh contact_id via federation/nodes.json (the - // DID is the only stable cross-transport key). - let federation_did = { + // Federation and mesh have disjoint identity namespaces: a LoRa + // mesh contact carries meshcore's firmware-issued pubkey, not the + // archipelago ed25519 key. So we cannot rely on matching pubkeys + // across transports. Instead, every federation peer has a stable + // synthetic mesh contact_id derived from its archipelago pubkey, + // and is upserted into mesh state the first time we hear from them. + let (federation_did, federation_name) = { let nodes = crate::federation::load_nodes(&self.data_dir) .await .unwrap_or_default(); nodes .into_iter() .find(|n| n.pubkey == from_pubkey_hex) - .map(|n| n.did) + .map(|n| (Some(n.did), n.name)) + .unwrap_or((None, None)) }; - let contact_id = { - let peers = self.state.peers.read().await; - peers - .iter() - .find_map(|(cid, p)| { - let did_match = federation_did - .as_ref() - .zip(p.did.as_ref()) - .map(|(a, b)| a == b) - .unwrap_or(false); - let pk_match = p.pubkey_hex.as_deref() == Some(from_pubkey_hex); - if did_match || pk_match { Some(*cid) } else { None } - }) - .unwrap_or_else(|| { - let bytes = hex::decode(from_pubkey_hex).unwrap_or_default(); - if bytes.len() >= 4 { - u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) - } else { - 0 - } - }) - }; - let display_name = from_name.unwrap_or("federation peer").to_string(); + // If the federation list knows this sender, use its DID; otherwise + // fall back to `from_name` (the sender's self-reported DID from the + // envelope body, signature-verified upstream in handle_mesh_typed_relay). + let effective_did = federation_did + .or_else(|| from_name.map(|s| s.to_string())) + .unwrap_or_else(|| format!("did:unknown:{}", &from_pubkey_hex[..from_pubkey_hex.len().min(16)])); + let display_name = federation_name + .clone() + .or_else(|| from_name.map(|s| s.to_string())) + .unwrap_or_else(|| "federation peer".to_string()); + let contact_id = upsert_federation_peer( + &self.state, + from_pubkey_hex, + &effective_did, + Some(&display_name), + ) + .await; listener::dispatch::handle_typed_envelope_direct( &self.state, contact_id, @@ -654,6 +834,51 @@ impl MeshService { self.state.next_send_seq(target).await } + /// Look up a stored MeshMessage by its local `id`. Used by the Forward + /// RPC to pull an existing record's typed payload for re-encoding. + pub async fn find_message_by_id(&self, id: u64) -> Option { + let messages = self.state.messages.read().await; + messages.iter().find(|m| m.id == id).cloned() + } + + /// Apply an Edit locally to any own-Sent message matching `sender_seq` + /// (sender_pubkey is implicit = self). Rewrites `plaintext` and appends + /// an `edited_at` marker on `typed_payload` so the UI can show "(edited)". + /// Best-effort: missing target is silently ignored. + pub async fn apply_local_edit(&self, target_seq: u64, new_text: &str, edited_at: u32) { + let mut messages = self.state.messages.write().await; + for m in messages.iter_mut() { + if m.sender_seq == Some(target_seq) + && matches!(m.direction, crate::mesh::types::MessageDirection::Sent) + { + m.plaintext = new_text.to_string(); + let mut obj = match m.typed_payload.take() { + Some(serde_json::Value::Object(o)) => o, + _ => serde_json::Map::new(), + }; + obj.insert("edited_at".to_string(), serde_json::json!(edited_at)); + obj.insert("text".to_string(), serde_json::json!(new_text)); + m.typed_payload = Some(serde_json::Value::Object(obj)); + break; + } + } + } + + /// Apply a Delete tombstone locally to an own-Sent message. + pub async fn apply_local_delete(&self, target_seq: u64) { + let mut messages = self.state.messages.write().await; + for m in messages.iter_mut() { + if m.sender_seq == Some(target_seq) + && matches!(m.direction, crate::mesh::types::MessageDirection::Sent) + { + m.plaintext = "🗑 message deleted".to_string(); + m.typed_payload = Some(serde_json::json!({ "deleted": true })); + m.message_type = "delete".to_string(); + break; + } + } + } + /// Broadcast a typed envelope wire payload on a mesh channel and record a /// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode /// binary envelope bytes before handing them here. @@ -714,45 +939,17 @@ impl MeshService { Ok(msg) } - /// Send a message to a peer by contact_id. - /// Routes through the background listener which owns the serial port. + /// Send a text message to a peer. Wraps the text in a typed `Text` + /// envelope (variant 0) with an allocated `sender_seq`, so the resulting + /// MeshMessage carries a stable MessageKey — this is what makes replies + /// and reactions addressable against plain text bubbles. pub async fn send_message(&self, contact_id: u32, text: &str) -> Result { - let payload = text.as_bytes().to_vec(); - let encrypted = false; - - self.send_raw_payload(contact_id, payload).await?; - - let msg_id = self.state.next_id().await; - let peer_name = self - .state - .peers - .read() - .await - .get(&contact_id) - .map(|p| p.advert_name.clone()); - - let msg = MeshMessage { - id: msg_id, - direction: MessageDirection::Sent, - peer_contact_id: contact_id, - peer_name, - plaintext: text.to_string(), - timestamp: chrono::Utc::now().to_rfc3339(), - delivered: false, - encrypted, - message_type: "text".to_string(), - typed_payload: None, - sender_pubkey: None, - sender_seq: None, - }; - - self.state.store_message(msg.clone()).await; - { - let mut status = self.state.status.write().await; - status.messages_sent += 1; - } - - Ok(msg) + use crate::mesh::message_types::{MeshMessageType, TypedEnvelope}; + let seq = self.state.next_send_seq(contact_id).await; + let envelope = TypedEnvelope::new(MeshMessageType::Text, text.as_bytes().to_vec()) + .with_seq(seq); + let wire = envelope.to_wire()?; + self.send_typed_wire(contact_id, wire, "text", text, None, seq).await } /// Record a Sent MeshMessage for a typed envelope that has already been