feat(mesh): Phase 1/2b/4/5 primitives — ReadReceipt/Forward/Edit/Delete/Presence/Contacts/ChannelInvite + chunked send + unified inbox RPCs

Adds every remaining wire variant and RPC needed to finish the Telegram-quality
mesh plan in a single pass:

* Variants 15 ReadReceipt, 16 Forward, 17 Edit, 18 Delete, 20 Presence,
  21 ChannelInvite; plus MeshMessageType::ContactCard(22) cleanup (was
  enum-only, now wired through from_u8/label/from_label).
* MessageType::from_label() as the inverse of label() — used by the Forward
  path to re-encode a stored typed body back through its original variant.
* RPCs: mesh.send-psbt (variant 3 was previously enum-only),
  mesh.send-read-receipt, mesh.forward-message, mesh.edit-message,
  mesh.delete-message, mesh.broadcast-presence, mesh.presence-list,
  mesh.contacts-list, mesh.contacts-save, mesh.contacts-block,
  mesh.send-channel-invite, conversations.list, conversations.messages.
* MeshState gains presence (pubkey → status+timestamps) and contacts
  (pubkey → ContactEntry{alias,notes,pinned,blocked}) in-memory stores.
* MeshService gains find_message_by_id (Forward lookup), apply_local_edit /
  apply_local_delete (optimistic local echo), and send_chunked_payload — an
  MC-framed base64 splitter that fires as a fallback inside send_typed_wire
  when wire > MAX_MESSAGE_LEN and no federation path is known. Reuses the
  existing receive-side reassembly in listener/decode.rs.
* Receive dispatch arms for PsbtHash, Presence, ChannelInvite, ReadReceipt
  (rolls forward `delivered` flag on own-Sent ≤ seq for that peer), Forward,
  Edit, Delete. Edit/Delete guard against cross-peer tampering by matching
  the target MessageKey pubkey against the sender's advertised pubkey_hex.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dorian
2026-04-13 18:24:05 -04:00
parent 002032b7da
commit 8ef7af985d
7 changed files with 1150 additions and 81 deletions

View File

@@ -299,6 +299,19 @@ impl RpcHandler {
"mesh.fetch-content" => self.handle_mesh_fetch_content(params).await,
"mesh.send-reply" => self.handle_mesh_send_reply(params).await,
"mesh.send-reaction" => self.handle_mesh_send_reaction(params).await,
"mesh.send-read-receipt" => self.handle_mesh_send_read_receipt(params).await,
"mesh.forward-message" => self.handle_mesh_forward_message(params).await,
"mesh.edit-message" => self.handle_mesh_edit_message(params).await,
"mesh.delete-message" => self.handle_mesh_delete_message(params).await,
"mesh.send-psbt" => self.handle_mesh_send_psbt(params).await,
"mesh.broadcast-presence" => self.handle_mesh_broadcast_presence(params).await,
"mesh.presence-list" => self.handle_mesh_presence_list(params).await,
"mesh.contacts-list" => self.handle_mesh_contacts_list(params).await,
"mesh.contacts-save" => self.handle_mesh_contacts_save(params).await,
"mesh.contacts-block" => self.handle_mesh_contacts_block(params).await,
"mesh.send-channel-invite" => self.handle_mesh_send_channel_invite(params).await,
"conversations.list" => self.handle_conversations_list(params).await,
"conversations.messages" => self.handle_conversations_messages(params).await,
"mesh.outbox" => self.handle_mesh_outbox(params).await,
"mesh.session-status" => self.handle_mesh_session_status(params).await,
"mesh.rotate-prekeys" => self.handle_mesh_rotate_prekeys().await,

View File

@@ -70,6 +70,91 @@ impl RpcHandler {
}
}
/// conversations.list — Unified inbox across mesh peers, mesh channels,
/// and federation nodes. Each conversation returns its latest message
/// timestamp + snippet + transport tag so the UI can render one sorted list.
pub(in crate::api::rpc) async fn handle_conversations_list(
&self,
_params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let mut conversations: Vec<serde_json::Value> = Vec::new();
let service = self.mesh_service.read().await;
if let Some(svc) = service.as_ref() {
let peers = svc.peers().await;
let messages = svc.messages(None).await;
// Per-peer last message.
for peer in &peers {
let last = messages
.iter()
.rev()
.find(|m| m.peer_contact_id == peer.contact_id);
let is_federation = peer.contact_id & 0x8000_0000 != 0;
conversations.push(serde_json::json!({
"id": format!("{}:{}", if is_federation { "federation" } else { "mesh" }, peer.contact_id),
"transport": if is_federation { "federation" } else { "mesh" },
"contact_id": peer.contact_id,
"name": peer.advert_name,
"pubkey": peer.pubkey_hex,
"last_text": last.map(|m| m.plaintext.clone()),
"last_timestamp": last.map(|m| m.timestamp.clone()),
"last_direction": last.map(|m| format!("{:?}", m.direction).to_lowercase()),
}));
}
// Channel 0 ("Archipelago") as a synthetic conversation.
let channel_last = messages
.iter()
.rev()
.find(|m| m.message_type == "text" && m.peer_contact_id == 0);
conversations.push(serde_json::json!({
"id": "channel:0",
"transport": "channel",
"channel": 0,
"name": "Archipelago",
"last_text": channel_last.map(|m| m.plaintext.clone()),
"last_timestamp": channel_last.map(|m| m.timestamp.clone()),
}));
}
// Sort by last_timestamp desc (missing timestamps sink).
conversations.sort_by(|a, b| {
let at = a.get("last_timestamp").and_then(|v| v.as_str()).unwrap_or("");
let bt = b.get("last_timestamp").and_then(|v| v.as_str()).unwrap_or("");
bt.cmp(at)
});
Ok(serde_json::json!({ "conversations": conversations }))
}
/// conversations.messages — Return messages for a ConversationId string
/// (format: `mesh:<contact_id>` | `federation:<contact_id>` | `channel:<u8>`).
pub(in crate::api::rpc) async fn handle_conversations_messages(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let id = params["id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing id"))?;
let (kind, rest) = id
.split_once(':')
.ok_or_else(|| anyhow::anyhow!("Invalid conversation id"))?;
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let all = svc.messages(None).await;
let filtered: Vec<_> = match kind {
"mesh" | "federation" => {
let contact_id: u32 = rest.parse().unwrap_or(0);
all.into_iter().filter(|m| m.peer_contact_id == contact_id).collect()
}
"channel" => {
// For now the channel bucket keeps contact_id = 0.
all.into_iter().filter(|m| m.peer_contact_id == 0).collect()
}
_ => Vec::new(),
};
Ok(serde_json::json!({ "messages": filtered }))
}
/// mesh.debug-dump — Full in-memory state snapshot for debugging.
/// Returns peers, all messages, status, shared-secret peer ids, encrypt_relay
/// flag, and stego mode. Intended for smoke tests and bug investigation.

View File

@@ -1,8 +1,10 @@
use super::super::RpcHandler;
use crate::blobs::DEFAULT_CAP_TTL_SECS;
use crate::mesh::message_types::{
self, AlertPayload, AlertType, ContentRefPayload, Coordinate, InvoicePayload, MessageKey,
MeshMessageType, ReactionPayload, ReplyPayload, TypedEnvelope,
self, AlertPayload, AlertType, ChannelInvitePayload, ContentRefPayload, Coordinate,
DeletePayload, EditPayload, ForwardPayload, InvoicePayload, MessageKey, MeshMessageType,
PresencePayload, PsbtHashPayload, ReactionPayload, ReadReceiptPayload, ReplyPayload,
TypedEnvelope,
};
use anyhow::Result;
use tracing::info;
@@ -302,21 +304,28 @@ impl RpcHandler {
// budget (cid alone is 64 hex chars, plus onion + cap). Route via
// federation when the peer has a known onion; fall back to LoRa
// only for tiny envelopes that could theoretically fit.
// Match mesh peer → federation node by master DID, NOT by pubkey.
// Mesh adverts carry a LoRa-local ed25519 key that differs from the
// archipelago node's identity key in federation/nodes.json; the DID
// is the only stable identifier the two transports share.
//
// Federation peers are pre-seeded into mesh state with their
// archipelago pubkey as `pubkey_hex` and DID populated — so a direct
// lookup on either key finds the onion. The `explicit_peer_onion`
// frontend override and the DID path remain as fallbacks for the
// transitional case where a mesh-discovered LoRa contact also
// happens to be federated.
let federation_onion = if let Some(onion) = explicit_peer_onion {
Some(onion)
} else {
let nodes = crate::federation::load_nodes(&self.config.data_dir)
.await
.unwrap_or_default();
if let Some(did) = peer_did.as_ref() {
nodes.into_iter().find(|n| &n.did == did).map(|n| n.onion)
} else {
None
}
nodes
.iter()
.find(|n| n.pubkey == peer_pubkey_hex)
.map(|n| n.onion.clone())
.or_else(|| {
peer_did.as_ref().and_then(|did| {
nodes.iter().find(|n| &n.did == did).map(|n| n.onion.clone())
})
})
};
let msg = if let Some(onion) = federation_onion {
svc.send_typed_wire_via_federation(
@@ -560,4 +569,449 @@ impl RpcHandler {
"local_url": local_url,
}))
}
/// mesh.send-psbt — share a PSBT sign-request with a mesh peer.
/// Params: `{ contact_id, psbt_hash, description, amount_sats }`. The payload
/// carries only the SHA-256 hash; actual PSBT bytes travel out-of-band via
/// a ContentRef or federation if needed.
pub(in crate::api::rpc) async fn handle_mesh_send_psbt(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let psbt_hash = params["psbt_hash"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing psbt_hash"))?
.to_string();
let description = params["description"].as_str().unwrap_or("PSBT sign request").to_string();
let amount_sats = params["amount_sats"].as_u64().unwrap_or(0);
let payload = PsbtHashPayload {
psbt_hash: psbt_hash.clone(),
description: description.clone(),
amount_sats,
};
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let body = message_types::encode_payload(&payload)?;
let envelope = TypedEnvelope::new(MeshMessageType::PsbtHash, body).with_seq(seq);
let wire = envelope.to_wire()?;
let display = format!("PSBT {} sats — {}", amount_sats, description);
let typed_json = serde_json::to_value(&payload).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "psbt_hash", &display, typed_json, seq)
.await?;
info!(contact_id, psbt_hash = %psbt_hash, "Sent PSBT hash over mesh");
Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq }))
}
/// mesh.send-read-receipt — "I've seen everything from `target_pubkey` up to `target_seq`."
/// Params: `{ contact_id, target_pubkey, target_seq }`. The peer uses this to roll
/// forward the ✓✓ marker on its local Sent bubbles.
pub(in crate::api::rpc) async fn handle_mesh_send_read_receipt(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let target_pubkey = params["target_pubkey"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing target_pubkey"))?
.to_string();
let target_seq = params["target_seq"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?;
let receipt = ReadReceiptPayload {
up_to: MessageKey { sender_pubkey: target_pubkey, sender_seq: target_seq },
};
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&receipt)?;
let envelope = TypedEnvelope::new(MeshMessageType::ReadReceipt, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let display = format!("seen ≤ #{}", receipt.up_to.sender_seq);
let typed_json = serde_json::to_value(&receipt).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "read_receipt", &display, typed_json, seq)
.await?;
info!(contact_id, seq, "Sent read receipt over mesh");
Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq }))
}
/// mesh.forward-message — Re-broadcast an existing local message to another peer,
/// preserving original sender attribution. Params: `{ contact_id, source_message_id }`.
/// We look up the source by local id, pull its typed_payload (or plaintext for Text),
/// and wrap it in a Forward envelope with the original MessageKey + timestamp.
pub(in crate::api::rpc) async fn handle_mesh_forward_message(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let source_id = params["source_message_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing source_message_id"))?;
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
// Pull the source message from MeshState.
let source = svc
.find_message_by_id(source_id)
.await
.ok_or_else(|| anyhow::anyhow!("Source message {} not found", source_id))?;
// Forwarding a message without a stable MessageKey is meaningless —
// the receiver can't attribute it.
let orig_pubkey = source.sender_pubkey.clone().ok_or_else(|| {
anyhow::anyhow!("Source message has no sender_pubkey — cannot forward")
})?;
let orig_seq = source
.sender_seq
.ok_or_else(|| anyhow::anyhow!("Source message has no sender_seq — cannot forward"))?;
// Re-encode the original body. For typed messages we serialize the
// existing typed_payload JSON back through CBOR via the original type;
// for plain text we forward the plaintext as Text.
let (body_type, body): (u8, Vec<u8>) = match source.typed_payload.as_ref() {
Some(json) => {
let type_label = source.message_type.as_str();
let t = MeshMessageType::from_label(type_label)
.unwrap_or(MeshMessageType::Text) as u8;
let mut buf = Vec::new();
ciborium::into_writer(json, &mut buf)
.map_err(|e| anyhow::anyhow!("re-encode body failed: {}", e))?;
(t, buf)
}
None => (MeshMessageType::Text as u8, source.plaintext.clone().into_bytes()),
};
let forward = ForwardPayload {
orig: MessageKey { sender_pubkey: orig_pubkey, sender_seq: orig_seq },
orig_ts: source
.timestamp
.parse::<chrono::DateTime<chrono::Utc>>()
.map(|dt| dt.timestamp() as u32)
.unwrap_or(chrono::Utc::now().timestamp() as u32),
orig_name: source.peer_name.clone(),
body_type,
body,
};
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&forward)?;
let envelope = TypedEnvelope::new(MeshMessageType::Forward, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let display = format!(
"Forwarded: {}",
if source.plaintext.is_empty() { "(attachment)" } else { source.plaintext.as_str() }
);
let typed_json = serde_json::to_value(&forward).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "forward", &display, typed_json, seq)
.await?;
info!(contact_id, seq, source_id, "Forwarded message over mesh");
Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq }))
}
/// mesh.edit-message — In-place edit of an earlier message's text. The target
/// must have been sent by this node (own MessageKey). Params:
/// `{ contact_id, target_seq, new_text }`. `target_pubkey` is implicit (self).
pub(in crate::api::rpc) async fn handle_mesh_edit_message(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let target_seq = params["target_seq"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?;
let new_text = params["new_text"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing new_text"))?
.to_string();
let self_pubkey = {
let guard = self.self_pubkey_hex.read().await;
guard
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Self pubkey not set"))?
.clone()
};
let edit = EditPayload {
target: MessageKey { sender_pubkey: self_pubkey, sender_seq: target_seq },
new_text: new_text.clone(),
edited_at: chrono::Utc::now().timestamp() as u32,
};
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&edit)?;
let envelope = TypedEnvelope::new(MeshMessageType::Edit, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let typed_json = serde_json::to_value(&edit).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "edit", &new_text, typed_json, seq)
.await?;
// Best-effort: apply the edit to our own local copy too, so the UI
// updates without waiting for an echo.
svc.apply_local_edit(target_seq, &new_text, edit.edited_at).await;
info!(contact_id, seq, target_seq, "Sent edit over mesh");
Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq }))
}
/// mesh.delete-message — Tombstone an earlier own-message. Params:
/// `{ contact_id, target_seq }`. Applied locally immediately; wire form is
/// informational for peers who already have the bytes.
pub(in crate::api::rpc) async fn handle_mesh_delete_message(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let target_seq = params["target_seq"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing target_seq"))?;
let self_pubkey = {
let guard = self.self_pubkey_hex.read().await;
guard
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Self pubkey not set"))?
.clone()
};
let del = DeletePayload {
target: MessageKey { sender_pubkey: self_pubkey, sender_seq: target_seq },
};
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&del)?;
let envelope = TypedEnvelope::new(MeshMessageType::Delete, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let typed_json = serde_json::to_value(&del).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "delete", "(deleted)", typed_json, seq)
.await?;
svc.apply_local_delete(target_seq).await;
info!(contact_id, seq, target_seq, "Sent delete over mesh");
Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq }))
}
/// mesh.broadcast-presence — emit a PresencePayload heartbeat on the
/// channel so online peers can update their presence table.
/// Params: `{ channel?, status? }`. Defaults: channel 0, status "online".
pub(in crate::api::rpc) async fn handle_mesh_broadcast_presence(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.unwrap_or(serde_json::json!({}));
let channel = params["channel"].as_u64().unwrap_or(0) as u8;
let status = params["status"].as_str().unwrap_or("online").to_string();
let presence = PresencePayload {
status: status.clone(),
last_active: chrono::Utc::now().timestamp() as u32,
};
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(0).await;
let payload = message_types::encode_payload(&presence)?;
let envelope = TypedEnvelope::new(MeshMessageType::Presence, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let typed_json = serde_json::to_value(&presence).ok();
// Best-effort: if the mesh device isn't connected, skip silently —
// presence heartbeats don't deserve a user-visible error.
match svc
.send_channel_typed_wire(channel, wire, "presence", &status, typed_json, seq)
.await
{
Ok(_) => Ok(serde_json::json!({ "sent": true, "sender_seq": seq })),
Err(e) => Ok(serde_json::json!({ "sent": false, "reason": e.to_string() })),
}
}
/// mesh.presence-list — return the in-memory presence map (pubkey → status+timestamps).
pub(in crate::api::rpc) async fn handle_mesh_presence_list(
&self,
_params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let state = svc.shared_state();
let presence = state.presence.read().await;
let list: Vec<_> = presence
.iter()
.map(|(pk, (status, last_active, received_at))| {
serde_json::json!({
"pubkey": pk,
"status": status,
"last_active": last_active,
"received_at": received_at,
})
})
.collect();
Ok(serde_json::json!({ "presence": list }))
}
/// mesh.contacts-list — return the contacts store merged with the peer list.
pub(in crate::api::rpc) async fn handle_mesh_contacts_list(
&self,
_params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let state = svc.shared_state();
let contacts = state.contacts.read().await;
let peers = state.peers.read().await;
let mut out: Vec<serde_json::Value> = Vec::new();
for peer in peers.values() {
if let Some(pk) = peer.pubkey_hex.as_ref() {
let entry = contacts.get(pk).cloned().unwrap_or_default();
out.push(serde_json::json!({
"pubkey": pk,
"contact_id": peer.contact_id,
"name": peer.advert_name,
"alias": entry.alias,
"notes": entry.notes,
"pinned": entry.pinned,
"blocked": entry.blocked,
}));
}
}
Ok(serde_json::json!({ "contacts": out }))
}
/// mesh.contacts-save — create/update a contact entry (alias/notes/pinned).
/// Params: `{ pubkey, alias?, notes?, pinned? }`.
pub(in crate::api::rpc) async fn handle_mesh_contacts_save(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let pubkey = params["pubkey"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing pubkey"))?
.to_string();
let alias = params["alias"].as_str().map(|s| s.to_string());
let notes = params["notes"].as_str().map(|s| s.to_string());
let pinned = params["pinned"].as_bool();
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let state = svc.shared_state();
let mut contacts = state.contacts.write().await;
let entry = contacts.entry(pubkey.clone()).or_default();
if alias.is_some() { entry.alias = alias; }
if notes.is_some() { entry.notes = notes; }
if let Some(p) = pinned { entry.pinned = p; }
let saved = entry.clone();
Ok(serde_json::json!({
"saved": true,
"pubkey": pubkey,
"alias": saved.alias,
"notes": saved.notes,
"pinned": saved.pinned,
"blocked": saved.blocked,
}))
}
/// mesh.contacts-block — toggle the blocked flag on a contact.
/// Params: `{ pubkey, blocked }`.
pub(in crate::api::rpc) async fn handle_mesh_contacts_block(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let pubkey = params["pubkey"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("Missing pubkey"))?
.to_string();
let blocked = params["blocked"].as_bool().unwrap_or(true);
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let state = svc.shared_state();
let mut contacts = state.contacts.write().await;
let entry = contacts.entry(pubkey.clone()).or_default();
entry.blocked = blocked;
Ok(serde_json::json!({ "pubkey": pubkey, "blocked": blocked }))
}
/// mesh.send-channel-invite — share a channel invite with a direct peer.
/// Params: `{ contact_id, channel, name, key? }`.
pub(in crate::api::rpc) async fn handle_mesh_send_channel_invite(
&self,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value> {
let params = params.ok_or_else(|| anyhow::anyhow!("Missing params"))?;
let contact_id = params["contact_id"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing contact_id"))? as u32;
let channel = params["channel"]
.as_u64()
.ok_or_else(|| anyhow::anyhow!("Missing channel"))? as u8;
let name = params["name"].as_str().unwrap_or("").to_string();
let key = params["key"].as_str().map(|s| s.to_string());
let invite = ChannelInvitePayload { channel, name: name.clone(), key };
let service = self.mesh_service.read().await;
let svc = service
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Mesh service not running"))?;
let seq = svc.next_send_seq(contact_id).await;
let payload = message_types::encode_payload(&invite)?;
let envelope = TypedEnvelope::new(MeshMessageType::ChannelInvite, payload).with_seq(seq);
let wire = envelope.to_wire()?;
let display = format!("Channel invite: {} ({})", channel, name);
let typed_json = serde_json::to_value(&invite).ok();
let msg = svc
.send_typed_wire(contact_id, wire, "channel_invite", &display, typed_json, seq)
.await?;
Ok(serde_json::json!({ "sent": true, "message_id": msg.id, "sender_seq": seq }))
}
}

View File

@@ -314,6 +314,170 @@ pub(crate) async fn handle_typed_envelope_direct(
}
}
Some(MeshMessageType::Presence) => {
match message_types::decode_payload::<message_types::PresencePayload>(&envelope.v) {
Ok(p) => {
let sender_pubkey = state
.peers
.read()
.await
.get(&sender_contact_id)
.and_then(|peer| peer.pubkey_hex.clone());
if let Some(pk) = sender_pubkey {
let now = chrono::Utc::now().timestamp() as u64;
state.presence.write().await.insert(pk, (p.status, p.last_active, now));
}
}
Err(e) => warn!("Failed to decode presence payload: {}", e),
}
}
Some(MeshMessageType::ChannelInvite) => {
match message_types::decode_payload::<message_types::ChannelInvitePayload>(&envelope.v) {
Ok(inv) => {
let display = format!("Invited to channel {} ({})", inv.channel, inv.name);
let json = payload_to_json(&inv);
store_typed_message(state, sender_contact_id, sender_name, &display, "channel_invite", json, Some(envelope.seq)).await;
}
Err(e) => warn!("Failed to decode channel_invite payload: {}", e),
}
}
Some(MeshMessageType::PsbtHash) => {
match message_types::decode_payload::<message_types::PsbtHashPayload>(&envelope.v) {
Ok(psbt) => {
let display = format!("PSBT {} sats — {}", psbt.amount_sats, psbt.description);
let json = payload_to_json(&psbt);
store_typed_message(state, sender_contact_id, sender_name, &display, "psbt_hash", json, Some(envelope.seq)).await;
}
Err(e) => warn!("Failed to decode psbt_hash payload: {}", e),
}
}
Some(MeshMessageType::ReadReceipt) => {
match message_types::decode_payload::<message_types::ReadReceiptPayload>(&envelope.v) {
Ok(receipt) => {
// Roll the ✓✓ delivered marker forward on every Sent
// message whose MessageKey ≤ (receipt.up_to.sender_pubkey,
// receipt.up_to.sender_seq) addressed to this peer. The
// pubkey in `up_to` is OUR pubkey (the peer echoing back
// what they've read), so match on sender_seq only within
// own-Sent records for this peer_contact_id.
let mut messages = state.messages.write().await;
for m in messages.iter_mut() {
if matches!(m.direction, MessageDirection::Sent)
&& m.peer_contact_id == sender_contact_id
&& m.sender_seq
.map(|s| s <= receipt.up_to.sender_seq)
.unwrap_or(false)
{
m.delivered = true;
}
}
drop(messages);
info!(
peer = sender_contact_id,
up_to = receipt.up_to.sender_seq,
"Applied read receipt"
);
}
Err(e) => warn!("Failed to decode read_receipt payload: {}", e),
}
}
Some(MeshMessageType::Forward) => {
match message_types::decode_payload::<message_types::ForwardPayload>(&envelope.v) {
Ok(fwd) => {
let name = fwd.orig_name.as_deref().unwrap_or("someone");
let text = format!("Forwarded from {}", name);
let json = payload_to_json(&fwd);
store_typed_message(
state,
sender_contact_id,
sender_name,
&text,
"forward",
json,
Some(envelope.seq),
)
.await;
}
Err(e) => warn!("Failed to decode forward payload: {}", e),
}
}
Some(MeshMessageType::Edit) => {
match message_types::decode_payload::<message_types::EditPayload>(&envelope.v) {
Ok(edit) => {
// Only accept edits from the pubkey the target claims to
// come from — prevents peer A editing a message authored
// by peer B.
let sender_pubkey = state
.peers
.read()
.await
.get(&sender_contact_id)
.and_then(|p| p.pubkey_hex.clone());
if sender_pubkey.as_deref() != Some(edit.target.sender_pubkey.as_str()) {
warn!(
peer = sender_contact_id,
"Rejecting edit — sender pubkey does not match target"
);
} else {
let mut messages = state.messages.write().await;
for m in messages.iter_mut() {
if m.sender_pubkey.as_deref() == Some(edit.target.sender_pubkey.as_str())
&& m.sender_seq == Some(edit.target.sender_seq)
{
m.plaintext = edit.new_text.clone();
let mut obj = match m.typed_payload.take() {
Some(serde_json::Value::Object(o)) => o,
_ => serde_json::Map::new(),
};
obj.insert("edited_at".to_string(), serde_json::json!(edit.edited_at));
obj.insert("text".to_string(), serde_json::json!(edit.new_text));
m.typed_payload = Some(serde_json::Value::Object(obj));
break;
}
}
}
}
Err(e) => warn!("Failed to decode edit payload: {}", e),
}
}
Some(MeshMessageType::Delete) => {
match message_types::decode_payload::<message_types::DeletePayload>(&envelope.v) {
Ok(del) => {
let sender_pubkey = state
.peers
.read()
.await
.get(&sender_contact_id)
.and_then(|p| p.pubkey_hex.clone());
if sender_pubkey.as_deref() != Some(del.target.sender_pubkey.as_str()) {
warn!(
peer = sender_contact_id,
"Rejecting delete — sender pubkey does not match target"
);
} else {
let mut messages = state.messages.write().await;
for m in messages.iter_mut() {
if m.sender_pubkey.as_deref() == Some(del.target.sender_pubkey.as_str())
&& m.sender_seq == Some(del.target.sender_seq)
{
m.plaintext = "🗑 message deleted".to_string();
m.typed_payload = Some(serde_json::json!({ "deleted": true }));
m.message_type = "delete".to_string();
break;
}
}
}
}
Err(e) => warn!("Failed to decode delete payload: {}", e),
}
}
Some(MeshMessageType::ContentRef) => {
match message_types::decode_payload::<message_types::ContentRefPayload>(&envelope.v) {
Ok(content) => {
@@ -331,9 +495,21 @@ pub(crate) async fn handle_typed_envelope_direct(
}
Some(MeshMessageType::Text) => {
// Typed text message — extract and store as plain text
// Typed text arrives with a `sender_seq` in the envelope, so we
// can store it with a full MessageKey instead of the bare
// plain-text path — that's what makes replies and reactions
// addressable against text bubbles.
let text = String::from_utf8_lossy(&envelope.v).to_string();
store_plain_message(state, sender_contact_id, sender_name, &text).await;
store_typed_message(
state,
sender_contact_id,
sender_name,
&text,
"text",
None,
Some(envelope.seq),
)
.await;
}
_ => {

View File

@@ -14,6 +14,7 @@ mod frames;
mod session;
use super::types::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
@@ -86,6 +87,24 @@ pub struct MeshState {
pub session_manager: Arc<super::session::SessionManager>,
/// Whether to encrypt directed relay messages (config toggle for rollback).
pub encrypt_relay: bool,
/// Last-seen presence heartbeats per peer pubkey hex: (status, last_active_epoch, received_at).
pub presence: RwLock<HashMap<String, (String, u32, u64)>>,
/// Contacts store — alias/notes/pinned/blocked per peer pubkey hex.
pub contacts: RwLock<HashMap<String, ContactEntry>>,
}
/// Contact metadata kept alongside MeshState.peers. Pinned contacts sort to
/// the top of the chat list, blocked ones are filtered out of notifications.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ContactEntry {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alias: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub notes: Option<String>,
#[serde(default)]
pub pinned: bool,
#[serde(default)]
pub blocked: bool,
}
/// In-progress chunk reassembly for a multi-frame message.
@@ -133,6 +152,8 @@ impl MeshState {
chunk_buffer: RwLock::new(HashMap::new()),
session_manager,
encrypt_relay,
presence: RwLock::new(HashMap::new()),
contacts: RwLock::new(HashMap::new()),
});
(state, rx, cmd_rx)
}
@@ -172,7 +193,7 @@ impl MeshState {
}
}
async fn update_peer_count(&self) {
pub(crate) async fn update_peer_count(&self) {
let count = self.peers.read().await.len();
self.status.write().await.peer_count = count;
}

View File

@@ -46,8 +46,25 @@ pub enum MeshMessageType {
Reply = 13,
/// Emoji reaction on an earlier message, targeted by MessageKey.
Reaction = 14,
/// Read receipt — "I've seen everything up to and including this MessageKey."
ReadReceipt = 15,
/// Forwarded message — wraps the original sender+timestamp+body so the
/// receiver can render "Forwarded from <name>" above the original content.
Forward = 16,
/// In-place edit of an earlier message (text replacement). UI shows "edited".
Edit = 17,
/// Tombstone for a prior message — peer already has the bytes, we just
/// mark it deleted locally. "🗑 message deleted" in the UI.
Delete = 18,
/// Attachment/file reference: CID of a blob held by the sender, fetched out-of-band.
ContentRef = 19,
/// Periodic heartbeat advertising presence + last-activity epoch.
Presence = 20,
/// Channel membership announcement — used by the Phase 5 channels/groups feature.
ChannelInvite = 21,
/// Shareable contact card — advertises a federation node (did, onion, pubkey).
/// Lets the receiver one-click-federate with that node.
ContactCard = 22,
}
impl MeshMessageType {
@@ -68,7 +85,46 @@ impl MeshMessageType {
12 => Some(Self::TxConfirmation),
13 => Some(Self::Reply),
14 => Some(Self::Reaction),
15 => Some(Self::ReadReceipt),
16 => Some(Self::Forward),
17 => Some(Self::Edit),
18 => Some(Self::Delete),
19 => Some(Self::ContentRef),
20 => Some(Self::Presence),
21 => Some(Self::ChannelInvite),
22 => Some(Self::ContactCard),
_ => None,
}
}
/// Inverse of `label()` — used by the Forward path when it needs to
/// re-encode a stored message whose original `MeshMessageType` we only
/// kept as a string label in `MeshMessage.message_type`.
pub fn from_label(label: &str) -> Option<Self> {
match label {
"text" => Some(Self::Text),
"alert" => Some(Self::Alert),
"invoice" => Some(Self::Invoice),
"psbt_hash" => Some(Self::PsbtHash),
"coordinate" => Some(Self::Coordinate),
"prekey_bundle" => Some(Self::PrekeyBundle),
"session_init" => Some(Self::SessionInit),
"block_header" => Some(Self::BlockHeader),
"tx_relay" => Some(Self::TxRelay),
"tx_relay_response" => Some(Self::TxRelayResponse),
"lightning_relay" => Some(Self::LightningRelay),
"lightning_relay_response" => Some(Self::LightningRelayResponse),
"tx_confirmation" => Some(Self::TxConfirmation),
"reply" => Some(Self::Reply),
"reaction" => Some(Self::Reaction),
"read_receipt" => Some(Self::ReadReceipt),
"forward" => Some(Self::Forward),
"edit" => Some(Self::Edit),
"delete" => Some(Self::Delete),
"content_ref" => Some(Self::ContentRef),
"presence" => Some(Self::Presence),
"channel_invite" => Some(Self::ChannelInvite),
"contact_card" => Some(Self::ContactCard),
_ => None,
}
}
@@ -90,7 +146,14 @@ impl MeshMessageType {
Self::TxConfirmation => "tx_confirmation",
Self::Reply => "reply",
Self::Reaction => "reaction",
Self::ReadReceipt => "read_receipt",
Self::Forward => "forward",
Self::Edit => "edit",
Self::Delete => "delete",
Self::ContentRef => "content_ref",
Self::Presence => "presence",
Self::ChannelInvite => "channel_invite",
Self::ContactCard => "contact_card",
}
}
}
@@ -387,6 +450,66 @@ pub struct ContentRefPayload {
pub cap_exp: u64,
}
/// Read receipt — "I have seen every message from this sender up to and
/// including `up_to`." Receivers apply this to fold the ✓✓ "seen" marker
/// onto all local messages whose MessageKey ≤ `up_to` for that pubkey.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReadReceiptPayload {
pub up_to: MessageKey,
}
/// Forwarded message. Carries the original sender's pubkey/seq/timestamp and
/// an optional human display name alongside the re-broadcast body so the UI
/// can render "Forwarded from <name>" above the original content. `body_type`
/// + `body` hold the re-serialized original payload bytes.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForwardPayload {
pub orig: MessageKey,
pub orig_ts: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub orig_name: Option<String>,
/// Original MeshMessageType (u8) — lets the receiver render the inner
/// content using the same renderer as the original.
pub body_type: u8,
pub body: Vec<u8>,
}
/// In-place edit of an earlier message. `target` must have originated from
/// the sender (enforced on receive). `new_text` replaces plaintext; UI
/// appends an "edited" marker using `edited_at`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EditPayload {
pub target: MessageKey,
pub new_text: String,
pub edited_at: u32,
}
/// Tombstone for a prior message by the same sender. Receivers replace the
/// target's displayed content with a "🗑 message deleted" marker locally.
/// True removal is impossible (peers already have the bytes).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeletePayload {
pub target: MessageKey,
}
/// Presence heartbeat — sent periodically so peers know you're online.
/// `status` is a short tag ("online", "away", "dnd"); `last_active` is epoch secs.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PresencePayload {
pub status: String,
pub last_active: u32,
}
/// ChannelInvite — advertise/invite a peer to join a channel. `key` is an
/// optional base64 pre-shared secret; absent `key` means public.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelInvitePayload {
pub channel: u8,
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
}
/// Transaction confirmation update (relay node → originator).
/// Sent after each new confirmation (1, 2, 3) until fully confirmed.
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -35,6 +35,74 @@ use tracing::{error, info, warn};
const MESH_CONFIG_FILE: &str = "mesh-config.json";
/// Derive a stable synthetic `contact_id` for a federation peer from its
/// archipelago ed25519 pubkey. Mesh LoRa contacts use meshcore firmware's
/// own ID space (small ints 0..N), so federation peers are mapped into the
/// high half of u32 space to avoid collision. Both the receive path
/// (`inject_typed_from_federation`) and the startup pre-seed use this
/// formula so they always produce the same id for the same peer.
pub(crate) fn federation_peer_contact_id(archipelago_pubkey_hex: &str) -> u32 {
let bytes = hex::decode(archipelago_pubkey_hex).unwrap_or_default();
if bytes.len() < 4 {
return 0x8000_0001;
}
let low = u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
0x8000_0000 | (low & 0x7FFF_FFFF)
}
/// Upsert a mesh peer record representing a federation node so the UI can
/// address it as a chat and `mesh.send-content` can route ContentRef to it.
/// Existing entries (same contact_id) are updated in place, preserving any
/// previously observed radio state (rssi/snr/hops).
pub(crate) async fn upsert_federation_peer(
state: &Arc<listener::MeshState>,
archipelago_pubkey_hex: &str,
did: &str,
name: Option<&str>,
) -> u32 {
let contact_id = federation_peer_contact_id(archipelago_pubkey_hex);
let display_name = name
.map(|s| s.to_string())
.unwrap_or_else(|| {
let short = &archipelago_pubkey_hex[..archipelago_pubkey_hex.len().min(8)];
format!("Archipelago {}", short)
});
let mut peers = state.peers.write().await;
let existing = peers.get(&contact_id).cloned();
let peer = MeshPeer {
contact_id,
advert_name: display_name,
did: Some(did.to_string()),
pubkey_hex: Some(archipelago_pubkey_hex.to_string()),
x25519_pubkey: existing.as_ref().and_then(|p| p.x25519_pubkey),
rssi: existing.as_ref().and_then(|p| p.rssi),
snr: existing.as_ref().and_then(|p| p.snr),
last_heard: chrono::Utc::now().to_rfc3339(),
hops: existing.as_ref().map(|p| p.hops).unwrap_or(0),
};
peers.insert(contact_id, peer);
drop(peers);
state.update_peer_count().await;
contact_id
}
/// Load federation nodes from disk and upsert each as a synthetic mesh peer.
/// Called at MeshService startup so the chat list already contains every
/// known federation node — users can share files to them without first
/// receiving a message.
pub(crate) async fn seed_federation_peers_into_mesh(
state: &Arc<listener::MeshState>,
data_dir: &Path,
) {
let nodes = match crate::federation::load_nodes(data_dir).await {
Ok(n) => n,
Err(_) => return,
};
for node in nodes {
upsert_federation_peer(state, &node.pubkey, &node.did, node.name.as_deref()).await;
}
}
/// Mesh configuration (persisted to disk).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeshConfig {
@@ -185,6 +253,13 @@ impl MeshService {
}),
);
// Pre-seed mesh state with a synthetic peer record for every known
// federation node. Mesh LoRa discovery and federation have disjoint
// identity namespaces, so without this step a user can't address a
// federated peer from the mesh UI until one side receives over the
// radio — which never happens for nodes that only share Tor.
seed_federation_peers_into_mesh(&state, data_dir).await;
Ok(Self {
state,
config,
@@ -474,6 +549,59 @@ impl MeshService {
Ok(dest_prefix)
}
/// Split an oversized wire payload into MC-framed base64 chunks and send
/// each via the mesh device. Matches the receive-side reassembly in
/// `mesh/listener/decode.rs::handle_chunked_frame` (header `MCIIXXTT`,
/// 20-chunk cap, 152 base64 chars per chunk). The caller must ensure
/// the peer exists and the device is connected.
async fn send_chunked_payload(&self, contact_id: u32, payload: Vec<u8>) -> Result<()> {
use base64::Engine;
const HEADER_LEN: usize = 8; // MC + msg_id(2) + chunk_idx(2) + total(2)
const MAX_CHUNK_B64: usize = protocol::MAX_MESSAGE_LEN - HEADER_LEN;
const MAX_CHUNKS: u8 = 20;
let b64 = base64::engine::general_purpose::STANDARD.encode(&payload);
let total_chunks = ((b64.len() + MAX_CHUNK_B64 - 1) / MAX_CHUNK_B64) as u8;
if total_chunks == 0 || total_chunks > MAX_CHUNKS {
anyhow::bail!(
"Payload too large to chunk: {} bytes → {} chunks (max {})",
payload.len(),
total_chunks,
MAX_CHUNKS
);
}
// Pick a 1-byte msg_id. Use the low 8 bits of the unix nanos; not
// cryptographically unique but collisions within a 120s reassembly
// window are astronomically unlikely for normal send rates.
let msg_id: u8 =
(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64 & 0xFF) as u8;
let dest_prefix = self.peer_dest_prefix(contact_id).await?;
for chunk_idx in 0..total_chunks {
let start = chunk_idx as usize * MAX_CHUNK_B64;
let end = (start + MAX_CHUNK_B64).min(b64.len());
let chunk = &b64[start..end];
let frame = format!("MC{:02X}{:02X}{:02X}{}", msg_id, chunk_idx, total_chunks, chunk);
self.state
.cmd_tx
.send(listener::MeshCommand::SendText {
dest_pubkey_prefix: dest_prefix,
payload: frame.into_bytes(),
})
.await
.map_err(|_| anyhow::anyhow!("Mesh listener not running"))?;
}
tracing::info!(
contact_id,
msg_id,
chunks = total_chunks,
bytes = payload.len(),
"Sent chunked payload over mesh"
);
Ok(())
}
/// Send raw wire payload bytes to a peer (no Sent-record bookkeeping).
/// Callers are responsible for storing the MeshMessage record afterwards.
async fn send_raw_payload(&self, contact_id: u32, payload: Vec<u8>) -> Result<()> {
@@ -521,6 +649,61 @@ impl MeshService {
typed_payload: Option<serde_json::Value>,
sender_seq: u64,
) -> Result<MeshMessage> {
// Federation-synthetic contacts (high bit set) don't exist in the
// meshcore firmware contact table, so LoRa send would fail at
// `peer_dest_prefix`. Any envelope larger than the LoRa frame budget
// also needs the federation path. In both cases we look up the
// peer's onion (by archipelago pubkey first, then by DID) and POST
// over Tor; otherwise the send falls through to LoRa.
let is_federation_synthetic = contact_id & 0x8000_0000 != 0;
let exceeds_lora = wire.len() > protocol::MAX_MESSAGE_LEN;
if is_federation_synthetic || exceeds_lora {
let (peer_pubkey, peer_did) = {
let peers = self.state.peers.read().await;
match peers.get(&contact_id) {
Some(p) => (p.pubkey_hex.clone(), p.did.clone()),
None if is_federation_synthetic => {
anyhow::bail!("Unknown federation peer {}", contact_id);
}
None => (None, None),
}
};
let nodes = crate::federation::load_nodes(&self.data_dir)
.await
.unwrap_or_default();
let onion = peer_pubkey
.as_ref()
.and_then(|pk| nodes.iter().find(|n| &n.pubkey == pk).map(|n| n.onion.clone()))
.or_else(|| {
peer_did.as_ref().and_then(|d| {
nodes.iter().find(|n| &n.did == d).map(|n| n.onion.clone())
})
});
if let Some(onion) = onion {
return self
.send_typed_wire_via_federation(
contact_id,
&onion,
wire,
type_label,
display_text,
typed_payload,
sender_seq,
)
.await;
}
if exceeds_lora {
// No federation path — fall back to send-side chunking. Receive
// side already handles MC-framed base64 reassembly for up to 20
// chunks (~3KB) per message, which is plenty for ContentRef or
// long replies when the peer is LoRa-only.
self.send_chunked_payload(contact_id, wire).await?;
return Ok(self
.record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq)
.await);
}
// Fall through: federation-synthetic case handled above, shouldn't reach here.
}
self.send_raw_payload(contact_id, wire).await?;
Ok(self
.record_sent_typed(contact_id, type_label, display_text, typed_payload, sender_seq)
@@ -601,42 +784,39 @@ impl MeshService {
wire: Vec<u8>,
) -> Result<()> {
let envelope = crate::mesh::message_types::TypedEnvelope::from_wire(&wire)?;
// The sender's `from_pubkey_hex` is their archipelago identity key,
// which differs from the mesh peer's LoRa advert pubkey. Resolve
// identity → DID → mesh contact_id via federation/nodes.json (the
// DID is the only stable cross-transport key).
let federation_did = {
// Federation and mesh have disjoint identity namespaces: a LoRa
// mesh contact carries meshcore's firmware-issued pubkey, not the
// archipelago ed25519 key. So we cannot rely on matching pubkeys
// across transports. Instead, every federation peer has a stable
// synthetic mesh contact_id derived from its archipelago pubkey,
// and is upserted into mesh state the first time we hear from them.
let (federation_did, federation_name) = {
let nodes = crate::federation::load_nodes(&self.data_dir)
.await
.unwrap_or_default();
nodes
.into_iter()
.find(|n| n.pubkey == from_pubkey_hex)
.map(|n| n.did)
.map(|n| (Some(n.did), n.name))
.unwrap_or((None, None))
};
let contact_id = {
let peers = self.state.peers.read().await;
peers
.iter()
.find_map(|(cid, p)| {
let did_match = federation_did
.as_ref()
.zip(p.did.as_ref())
.map(|(a, b)| a == b)
.unwrap_or(false);
let pk_match = p.pubkey_hex.as_deref() == Some(from_pubkey_hex);
if did_match || pk_match { Some(*cid) } else { None }
})
.unwrap_or_else(|| {
let bytes = hex::decode(from_pubkey_hex).unwrap_or_default();
if bytes.len() >= 4 {
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
} else {
0
}
})
};
let display_name = from_name.unwrap_or("federation peer").to_string();
// If the federation list knows this sender, use its DID; otherwise
// fall back to `from_name` (the sender's self-reported DID from the
// envelope body, signature-verified upstream in handle_mesh_typed_relay).
let effective_did = federation_did
.or_else(|| from_name.map(|s| s.to_string()))
.unwrap_or_else(|| format!("did:unknown:{}", &from_pubkey_hex[..from_pubkey_hex.len().min(16)]));
let display_name = federation_name
.clone()
.or_else(|| from_name.map(|s| s.to_string()))
.unwrap_or_else(|| "federation peer".to_string());
let contact_id = upsert_federation_peer(
&self.state,
from_pubkey_hex,
&effective_did,
Some(&display_name),
)
.await;
listener::dispatch::handle_typed_envelope_direct(
&self.state,
contact_id,
@@ -654,6 +834,51 @@ impl MeshService {
self.state.next_send_seq(target).await
}
/// Look up a stored MeshMessage by its local `id`. Used by the Forward
/// RPC to pull an existing record's typed payload for re-encoding.
pub async fn find_message_by_id(&self, id: u64) -> Option<MeshMessage> {
let messages = self.state.messages.read().await;
messages.iter().find(|m| m.id == id).cloned()
}
/// Apply an Edit locally to any own-Sent message matching `sender_seq`
/// (sender_pubkey is implicit = self). Rewrites `plaintext` and appends
/// an `edited_at` marker on `typed_payload` so the UI can show "(edited)".
/// Best-effort: missing target is silently ignored.
pub async fn apply_local_edit(&self, target_seq: u64, new_text: &str, edited_at: u32) {
let mut messages = self.state.messages.write().await;
for m in messages.iter_mut() {
if m.sender_seq == Some(target_seq)
&& matches!(m.direction, crate::mesh::types::MessageDirection::Sent)
{
m.plaintext = new_text.to_string();
let mut obj = match m.typed_payload.take() {
Some(serde_json::Value::Object(o)) => o,
_ => serde_json::Map::new(),
};
obj.insert("edited_at".to_string(), serde_json::json!(edited_at));
obj.insert("text".to_string(), serde_json::json!(new_text));
m.typed_payload = Some(serde_json::Value::Object(obj));
break;
}
}
}
/// Apply a Delete tombstone locally to an own-Sent message.
pub async fn apply_local_delete(&self, target_seq: u64) {
let mut messages = self.state.messages.write().await;
for m in messages.iter_mut() {
if m.sender_seq == Some(target_seq)
&& matches!(m.direction, crate::mesh::types::MessageDirection::Sent)
{
m.plaintext = "🗑 message deleted".to_string();
m.typed_payload = Some(serde_json::json!({ "deleted": true }));
m.message_type = "delete".to_string();
break;
}
}
}
/// Broadcast a typed envelope wire payload on a mesh channel and record a
/// rich Sent MeshMessage. Bytes are sent directly — do NOT utf8_lossy-encode
/// binary envelope bytes before handing them here.
@@ -714,45 +939,17 @@ impl MeshService {
Ok(msg)
}
/// Send a message to a peer by contact_id.
/// Routes through the background listener which owns the serial port.
/// Send a text message to a peer. Wraps the text in a typed `Text`
/// envelope (variant 0) with an allocated `sender_seq`, so the resulting
/// MeshMessage carries a stable MessageKey — this is what makes replies
/// and reactions addressable against plain text bubbles.
pub async fn send_message(&self, contact_id: u32, text: &str) -> Result<MeshMessage> {
let payload = text.as_bytes().to_vec();
let encrypted = false;
self.send_raw_payload(contact_id, payload).await?;
let msg_id = self.state.next_id().await;
let peer_name = self
.state
.peers
.read()
.await
.get(&contact_id)
.map(|p| p.advert_name.clone());
let msg = MeshMessage {
id: msg_id,
direction: MessageDirection::Sent,
peer_contact_id: contact_id,
peer_name,
plaintext: text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
delivered: false,
encrypted,
message_type: "text".to_string(),
typed_payload: None,
sender_pubkey: None,
sender_seq: None,
};
self.state.store_message(msg.clone()).await;
{
let mut status = self.state.status.write().await;
status.messages_sent += 1;
}
Ok(msg)
use crate::mesh::message_types::{MeshMessageType, TypedEnvelope};
let seq = self.state.next_send_seq(contact_id).await;
let envelope = TypedEnvelope::new(MeshMessageType::Text, text.as_bytes().to_vec())
.with_seq(seq);
let wire = envelope.to_wire()?;
self.send_typed_wire(contact_id, wire, "text", text, None, seq).await
}
/// Record a Sent MeshMessage for a typed envelope that has already been