Files
archy/core/archipelago/src/api/handler.rs
Dorian 27f205f38a security(TASK-8): fix 8 pentest findings — C1/C3/H1/M1/M2/L2
CRITICAL:
- C1: /lnd-connect-info now requires session auth, CORS wildcard removed
- C3: DEV_MODE removed from production service file (dev override only)

HIGH:
- H1: node-message endpoint now verifies ed25519 signatures when
  provided, logs warning for unsigned messages

MEDIUM:
- M1: content.add rejects filenames containing ".." (path traversal)
- M2: NIP-07 postMessage responses use specific origin instead of '*'

LOW:
- L2: Onion validation now enforces strict v3 format (56 base32 chars
  + ".onion", exactly 62 chars, no colons)

Previously fixed: C2 (RPC creds generated per-install from secrets)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 19:45:10 +00:00

865 lines
36 KiB
Rust

use crate::api::rpc::RpcHandler;
use crate::content_server;
use crate::electrs_status;
use crate::monitoring::MetricsStore;
use crate::network::dwn_store::DwnStore;
use crate::node_message as node_msg;
use crate::config::Config;
use crate::session::{self, SessionStore};
use crate::state::StateManager;
use anyhow::Result;
use futures_util::{SinkExt, StreamExt};
use hyper::{Method, Request, Response, StatusCode};
use hyper_ws_listener::WsStream;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_tungstenite::tungstenite::Message;
use std::time::Instant;
use tracing::{debug, info};
pub struct ApiHandler {
config: Config,
rpc_handler: Arc<RpcHandler>,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
session_store: SessionStore,
}
impl ApiHandler {
pub async fn new(
config: Config,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
) -> Result<Self> {
let session_store = SessionStore::new();
let rpc_handler = Arc::new(
RpcHandler::new(
config.clone(),
state_manager.clone(),
metrics_store.clone(),
session_store.clone(),
)
.await?,
);
Ok(Self {
config,
rpc_handler,
state_manager,
metrics_store,
session_store,
})
}
/// Access the RPC handler (for service initialization after construction).
pub fn rpc_handler(&self) -> &Arc<RpcHandler> {
&self.rpc_handler
}
/// Check if the request has a valid session cookie.
async fn is_authenticated(&self, headers: &hyper::HeaderMap) -> bool {
match session::extract_session_cookie(headers) {
Some(token) => self.session_store.validate(&token).await,
None => false,
}
}
/// Build a 401 Unauthorized JSON response.
fn unauthorized() -> Response<hyper::Body> {
let body = serde_json::json!({ "error": "Unauthorized" });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body_bytes))
.unwrap()
}
/// Allowed CORS origins derived from the config host IP.
fn allowed_origins(&self) -> Vec<String> {
vec![
format!("http://{}", self.config.host_ip),
format!("https://{}", self.config.host_ip),
"http://localhost:8100".to_string(), // Vite dev server
]
}
/// Validate the Origin header against allowed origins.
/// Returns the matched origin if valid, None if cross-origin is not allowed.
fn validate_origin(&self, headers: &hyper::HeaderMap) -> Option<String> {
let origin = headers
.get("origin")
.and_then(|v| v.to_str().ok())?;
let allowed = self.allowed_origins();
if allowed.iter().any(|a| a == origin) {
Some(origin.to_string())
} else {
None
}
}
pub async fn handle_request(
&self,
req: Request<hyper::Body>,
) -> Result<Response<hyper::Body>> {
let path = req.uri().path().to_string();
let method = req.method().clone();
// Handle CORS preflight for all routes
if method == Method::OPTIONS {
let mut builder = Response::builder()
.status(StatusCode::NO_CONTENT)
.header("Vary", "Origin");
if let Some(origin) = self.validate_origin(req.headers()) {
builder = builder
.header("Access-Control-Allow-Origin", &origin)
.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
.header("Access-Control-Allow-Headers", "Content-Type, X-CSRF-Token")
.header("Access-Control-Allow-Credentials", "true");
}
return Ok(builder.body(hyper::Body::empty()).unwrap());
}
// WebSocket upgrade — validate session before upgrading
if method == Method::GET && path == "/ws/db" {
if !self.is_authenticated(req.headers()).await {
tracing::warn!("401 WebSocket /ws/db — session invalid or missing");
return Ok(Self::unauthorized());
}
return Self::handle_websocket(req, self.state_manager.clone(), self.metrics_store.clone()).await;
}
// Convert body to bytes for non-WS routes
let headers = req.headers().clone();
let (parts, body) = req.into_parts();
let body_bytes = hyper::body::to_bytes(body).await
.map_err(|e| anyhow::anyhow!("Failed to read body: {}", e))?;
let req_with_bytes = Request::from_parts(parts, hyper::Body::from(body_bytes.clone()));
debug!("{} {}", method, path);
match (method, path.as_str()) {
// RPC — auth is handled inside rpc handler per-method
(Method::POST, "/rpc/v1") => self.rpc_handler.handle(req_with_bytes).await,
// Health — unauthenticated
(Method::GET, "/health") => Ok(Response::builder()
.status(StatusCode::OK)
.body(hyper::Body::from("OK"))
.unwrap()),
// Node message — P2P endpoint (authenticated by source validation, not cookie)
(Method::POST, "/archipelago/node-message") => {
Self::handle_node_message(body_bytes).await
}
// Content serving — peers access shared content over Tor (no session auth)
(Method::GET, p) if p.starts_with("/content/") => {
Self::handle_content_request(p, &headers, &self.config).await
}
// Content catalog — list available content (no session auth, for peers)
(Method::GET, "/content") => {
Self::handle_content_catalog(&self.config).await
}
// Electrs status — unauthenticated (read-only sync status)
(Method::GET, "/electrs-status") => Self::handle_electrs_status().await,
// LND connect info — unauthenticated (read-only, localhost only)
(Method::GET, "/lnd-connect-info") => {
Self::handle_lnd_connect_info(self.rpc_handler.clone()).await
}
// Container logs — requires session
(Method::GET, path) if path.starts_with("/api/container/logs") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = self.validate_origin(&headers).unwrap_or_default();
Self::handle_container_logs_http(self.rpc_handler.clone(), path, &origin).await
}
// LND proxy — requires session
(Method::GET, path) if path.starts_with("/proxy/lnd/") => {
if !self.is_authenticated(&headers).await {
return Ok(Self::unauthorized());
}
let origin = self.validate_origin(&headers).unwrap_or_default();
Self::handle_lnd_proxy(path, &origin).await
}
// DWN health — unauthenticated
(Method::GET, "/dwn/health") => {
Self::handle_dwn_health(&self.config).await
}
// DWN message processing — peers access over Tor for sync (no session auth)
(Method::POST, "/dwn") => {
Self::handle_dwn_message(body_bytes, &self.config).await
}
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Not Found"))
.unwrap()),
}
}
async fn handle_container_logs_http(
rpc: Arc<RpcHandler>,
path: &str,
cors_origin: &str,
) -> Result<Response<hyper::Body>> {
let query = path
.strip_prefix("/api/container/logs")
.and_then(|s| s.strip_prefix('?'))
.unwrap_or("");
let params: std::collections::HashMap<String, String> =
query
.split('&')
.filter_map(|p| {
let mut it = p.splitn(2, '=');
let k = it.next()?.to_string();
let v = it.next()?.to_string();
Some((k, v))
})
.collect();
let app_id = params.get("app_id").map(|s| s.as_str()).unwrap_or("lnd");
// Validate app_id format
if !is_valid_app_id(app_id) {
let body = serde_json::json!({ "error": "Invalid app_id" });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body_bytes))
.unwrap());
}
let lines = params
.get("lines")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(200);
match rpc.get_container_logs_value(app_id, lines).await {
Ok(value) => {
let body = serde_json::json!({ "result": value });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", cors_origin)
.header("Access-Control-Allow-Credentials", "true")
.header("Vary", "Origin")
.body(hyper::Body::from(body_bytes))
.unwrap())
}
Err(e) => {
let body = serde_json::json!({ "error": e.to_string() });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", cors_origin)
.header("Access-Control-Allow-Credentials", "true")
.header("Vary", "Origin")
.body(hyper::Body::from(body_bytes))
.unwrap())
}
}
}
async fn handle_node_message(body: hyper::body::Bytes) -> Result<Response<hyper::Body>> {
#[derive(serde::Deserialize)]
struct Incoming {
from_pubkey: Option<String>,
message: Option<String>,
signature: Option<String>,
}
let incoming: Incoming = serde_json::from_slice(&body).unwrap_or(Incoming {
from_pubkey: None,
message: None,
signature: None,
});
if let (Some(from), Some(msg)) = (incoming.from_pubkey.as_ref(), incoming.message.as_ref()) {
// Validate from_pubkey is a valid hex ed25519 pubkey
if !is_valid_pubkey_hex(from) {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(hyper::Body::from(r#"{"error":"Invalid pubkey format"}"#))
.unwrap());
}
// Verify ed25519 signature if provided (required for trusted messages)
if let Some(sig_hex) = &incoming.signature {
match crate::identity::NodeIdentity::verify(from, msg.as_bytes(), sig_hex) {
Ok(true) => {}
_ => {
return Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.header("Content-Type", "application/json")
.body(hyper::Body::from(r#"{"error":"Invalid signature"}"#))
.unwrap());
}
}
} else {
// No signature — accept but mark as unverified
tracing::warn!("Node message from {} has no signature — unverified", &from[..16.min(from.len())]);
}
// Sanitize log output to prevent log injection
let safe_from = sanitize_log_string(from);
let safe_msg = sanitize_log_string(msg);
tracing::info!("Received message from {}: {}", safe_from, safe_msg);
// Sanitize stored message content (strip HTML entities)
let clean_from = sanitize_html(from);
let clean_msg = sanitize_html(msg);
node_msg::store_received(&clean_from, &clean_msg).await;
}
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(r#"{"ok":true}"#))
.unwrap())
}
async fn handle_electrs_status() -> Result<Response<hyper::Body>> {
let status = electrs_status::get_electrs_sync_status().await;
let body = serde_json::to_vec(&status).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body))
.unwrap())
}
async fn handle_lnd_connect_info(
rpc: std::sync::Arc<super::rpc::RpcHandler>,
) -> Result<Response<hyper::Body>> {
match rpc.handle_lnd_connect_info().await {
Ok(val) => {
let body = serde_json::to_vec(&val).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body))
.unwrap())
}
Err(e) => Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("Content-Type", "application/json")
.body(hyper::Body::from(
serde_json::json!({"error": e.to_string()}).to_string(),
))
.unwrap()),
}
}
async fn handle_lnd_proxy(path: &str, cors_origin: &str) -> Result<Response<hyper::Body>> {
let suffix = path.strip_prefix("/proxy/lnd").unwrap_or("/");
let url = format!("http://127.0.0.1:8080{}", suffix);
match reqwest::get(&url).await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers = resp.headers().clone();
let body = resp.bytes().await.unwrap_or_default();
let mut builder = Response::builder().status(status);
if let Some(ct) = headers.get("content-type") {
if let Ok(s) = ct.to_str() {
builder = builder.header("Content-Type", s);
}
}
builder
.header("Access-Control-Allow-Origin", cors_origin)
.header("Access-Control-Allow-Credentials", "true")
.header("Vary", "Origin")
.body(hyper::Body::from(body))
.map_err(|e| anyhow::anyhow!("response build: {}", e))
}
Err(e) => {
let body = serde_json::json!({ "error": e.to_string() });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::BAD_GATEWAY)
.header("Content-Type", "application/json")
.header("Access-Control-Allow-Origin", cors_origin)
.header("Access-Control-Allow-Credentials", "true")
.header("Vary", "Origin")
.body(hyper::Body::from(body_bytes))
.unwrap())
}
}
}
async fn handle_content_catalog(config: &Config) -> Result<Response<hyper::Body>> {
match content_server::load_catalog(&config.data_dir).await {
Ok(catalog) => {
// Only expose public metadata for available items
let items: Vec<serde_json::Value> = catalog
.items
.iter()
.filter(|i| !matches!(i.availability, content_server::Availability::Nobody))
.map(|i| {
serde_json::json!({
"id": i.id,
"filename": i.filename,
"mime_type": i.mime_type,
"size_bytes": i.size_bytes,
"description": i.description,
"access": i.access,
})
})
.collect();
let body = serde_json::to_vec(&serde_json::json!({ "items": items }))
.unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body))
.unwrap())
}
Err(e) => {
let body = serde_json::json!({ "error": e.to_string() });
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body_bytes))
.unwrap())
}
}
}
async fn handle_content_request(
path: &str,
headers: &hyper::HeaderMap,
config: &Config,
) -> Result<Response<hyper::Body>> {
let content_id = path.strip_prefix("/content/").unwrap_or("");
if content_id.is_empty() || !is_valid_app_id(content_id) {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(hyper::Body::from("Invalid content ID"))
.unwrap());
}
// Extract payment token from X-Payment-Token header
let payment_token = headers
.get("x-payment-token")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
// Extract federation peer DID from X-Federation-DID header
let peer_did = headers
.get("x-federation-did")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
// Parse Range header for streaming support
let range = headers
.get("range")
.and_then(|v| v.to_str().ok())
.and_then(content_server::parse_range_header);
match content_server::serve_content(
&config.data_dir,
content_id,
payment_token.as_deref(),
peer_did.as_deref(),
range,
)
.await
{
Ok(content_server::ServeResult::Ok(bytes, mime_type)) => {
let len = bytes.len();
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", mime_type)
.header("Content-Length", len.to_string())
.header("Accept-Ranges", "bytes")
.body(hyper::Body::from(bytes))
.unwrap())
}
Ok(content_server::ServeResult::Partial {
bytes,
mime_type,
start,
end,
total,
}) => {
Ok(Response::builder()
.status(StatusCode::PARTIAL_CONTENT)
.header("Content-Type", mime_type)
.header("Content-Length", bytes.len().to_string())
.header("Content-Range", format!("bytes {}-{}/{}", start, end, total))
.header("Accept-Ranges", "bytes")
.body(hyper::Body::from(bytes))
.unwrap())
}
Ok(content_server::ServeResult::PaymentRequired(price_sats)) => {
let body = serde_json::json!({
"error": "Payment required",
"price_sats": price_sats,
"payment_header": "X-Payment-Token",
});
let body_bytes = serde_json::to_vec(&body).unwrap_or_default();
Ok(Response::builder()
.status(StatusCode::PAYMENT_REQUIRED)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body_bytes))
.unwrap())
}
Ok(content_server::ServeResult::Forbidden) => {
Ok(Response::builder()
.status(StatusCode::FORBIDDEN)
.header("Content-Type", "application/json")
.body(hyper::Body::from(
r#"{"error":"Access denied — federation peer required"}"#,
))
.unwrap())
}
Ok(content_server::ServeResult::NotFound) | Err(_) => {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(hyper::Body::from("Content not found"))
.unwrap())
}
}
}
async fn handle_websocket(
req: Request<hyper::Body>,
state_manager: Arc<StateManager>,
metrics_store: Arc<MetricsStore>,
) -> Result<Response<hyper::Body>> {
let (response, ws_fut_opt) = hyper_ws_listener::create_ws(req)
.map_err(|e| anyhow::anyhow!("WebSocket upgrade failed: {}", e))?;
if let Some(ws_fut) = ws_fut_opt {
tokio::spawn(async move {
let ws_stream: WsStream = match ws_fut.await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
debug!("WebSocket handshake failed (hyper): {}", e);
return;
}
Err(e) => {
debug!("WebSocket task join failed: {}", e);
return;
}
};
metrics_store.increment_ws();
info!("WebSocket /ws/db connected");
let (mut tx, mut rx) = ws_stream.split();
let initial_msg = state_manager.get_initial_message().await;
if let Ok(json_msg) = serde_json::to_string(&initial_msg) {
if let Err(e) = tx.send(Message::Text(json_msg)).await {
debug!("Failed to send initial data: {}", e);
return;
}
debug!("Sent initial data dump at revision {}", initial_msg.rev);
}
let mut state_rx = state_manager.subscribe();
let ping_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
tokio::pin!(ping_interval);
let mut last_client_activity = Instant::now();
const INACTIVITY_TIMEOUT_SECS: u64 = 300; // 5 minutes
loop {
tokio::select! {
_ = ping_interval.tick() => {
// Check inactivity timeout
if last_client_activity.elapsed().as_secs() >= INACTIVITY_TIMEOUT_SECS {
info!("WebSocket client inactive for {}s, closing", INACTIVITY_TIMEOUT_SECS);
let _ = tx.send(Message::Close(None)).await;
break;
}
if tx.send(Message::Ping(vec![])).await.is_err() {
debug!("Failed to send ping, connection likely closed");
break;
}
}
update = state_rx.recv() => {
match update {
Ok(msg) => {
if let Ok(json_msg) = serde_json::to_string(&msg) {
if let Err(e) = tx.send(Message::Text(json_msg)).await {
debug!("Failed to send state update: {}", e);
break;
}
debug!("Sent state update at revision {}", msg.rev);
}
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
debug!("Client lagged behind, skipped {} messages", skipped);
}
Err(broadcast::error::RecvError::Closed) => {
debug!("Broadcast channel closed");
break;
}
}
}
msg = rx.next() => {
match msg {
Some(Ok(Message::Close(_))) => break,
Some(Ok(Message::Pong(_))) => {
last_client_activity = Instant::now();
debug!("Received pong");
}
Some(Ok(Message::Ping(data))) => {
last_client_activity = Instant::now();
let _ = tx.send(Message::Pong(data)).await;
}
Some(Ok(Message::Text(text))) => {
last_client_activity = Instant::now();
// Handle JSON ping from frontend
if text.contains("\"type\":\"ping\"") || text.contains("\"type\": \"ping\"") {
let _ = tx.send(Message::Text(r#"{"type":"pong"}"#.to_string())).await;
}
}
Some(Ok(_)) => {
last_client_activity = Instant::now();
}
Some(Err(e)) => {
debug!("WebSocket stream error: {}", e);
break;
}
None => break,
}
}
}
}
metrics_store.decrement_ws();
info!("WebSocket /ws/db disconnected");
});
}
Ok(response)
}
}
/// Validate that an app ID matches the safe pattern: lowercase alphanumeric + hyphens.
fn is_valid_app_id(id: &str) -> bool {
!id.is_empty()
&& id.len() <= 64
&& id.bytes().all(|b| b.is_ascii_lowercase() || b.is_ascii_digit() || b == b'-')
&& id.as_bytes()[0] != b'-'
}
/// Validate that a pubkey is a 64-char hex string.
fn is_valid_pubkey_hex(s: &str) -> bool {
s.len() == 64 && s.bytes().all(|b| b.is_ascii_hexdigit())
}
/// Strip newlines and ANSI escape sequences from strings before logging.
fn sanitize_log_string(s: &str) -> String {
s.replace('\n', "\\n")
.replace('\r', "\\r")
.replace('\x1b', "")
}
/// Strip HTML-sensitive characters to prevent XSS when stored/rendered.
fn sanitize_html(s: &str) -> String {
s.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace('\'', "&#x27;")
}
impl ApiHandler {
/// DWN health endpoint — returns store stats.
async fn handle_dwn_health(config: &Config) -> Result<Response<hyper::Body>> {
match DwnStore::new(&config.data_dir).await {
Ok(store) => {
let stats = store.stats().await.unwrap_or(crate::network::dwn_store::StoreStats {
message_count: 0,
protocol_count: 0,
total_bytes: 0,
});
let body = serde_json::json!({
"status": "ok",
"message_count": stats.message_count,
"protocol_count": stats.protocol_count,
"total_bytes": stats.total_bytes,
});
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(hyper::Body::from(body.to_string()))
.unwrap())
}
Err(_) => Ok(Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.header("Content-Type", "application/json")
.body(hyper::Body::from(r#"{"status":"unavailable"}"#))
.unwrap()),
}
}
/// DWN message processing endpoint — handles RecordsWrite, RecordsQuery, RecordsRead, RecordsDelete.
/// Supports batch processing: all messages in the array are processed.
async fn handle_dwn_message(
body: hyper::body::Bytes,
config: &Config,
) -> Result<Response<hyper::Body>> {
let request: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
let err = serde_json::json!({"error": format!("Invalid JSON: {}", e)});
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(hyper::Body::from(err.to_string()))
.unwrap());
}
};
// Collect all messages to process
let messages: Vec<serde_json::Value> = if request.get("message").is_some() {
vec![request["message"].clone()]
} else if let Some(msgs) = request["messages"].as_array() {
msgs.clone()
} else {
vec![serde_json::Value::Null]
};
let store = DwnStore::new(&config.data_dir).await?;
let mut results = Vec::new();
for message in &messages {
let interface = message["descriptor"]["interface"]
.as_str()
.unwrap_or("");
let method = message["descriptor"]["method"]
.as_str()
.unwrap_or("");
let result = match (interface, method) {
("Records", "Write") => {
let author = message["author"].as_str().unwrap_or("unknown");
let protocol = message["descriptor"]["protocol"].as_str();
let schema = message["descriptor"]["schema"].as_str();
let data_format = message["descriptor"]["dataFormat"].as_str();
let data = message.get("data").cloned();
// Deduplicate: check if recordId already exists
if let Some(record_id) = message["recordId"].as_str() {
if store.read_message(record_id).await.ok().flatten().is_some() {
serde_json::json!({"status": {"code": 200, "detail": "Already exists"}})
} else {
match store
.write_message(author, protocol, schema, data_format, data)
.await
{
Ok(msg) => {
serde_json::json!({"status": {"code": 202}, "entry": msg})
}
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
} else {
match store
.write_message(author, protocol, schema, data_format, data)
.await
{
Ok(msg) => serde_json::json!({"status": {"code": 202}, "entry": msg}),
Err(e) => serde_json::json!({"status": {"code": 500, "detail": e.to_string()}}),
}
}
}
("Records", "Query") => {
let query = crate::network::dwn_store::MessageQuery {
protocol: message["descriptor"]["filter"]["protocol"]
.as_str()
.map(|s| s.to_string()),
schema: message["descriptor"]["filter"]["schema"]
.as_str()
.map(|s| s.to_string()),
author: message["descriptor"]["filter"]["author"]
.as_str()
.map(|s| s.to_string()),
date_from: message["descriptor"]["filter"]["dateFrom"]
.as_str()
.map(|s| s.to_string()),
date_to: message["descriptor"]["filter"]["dateTo"]
.as_str()
.map(|s| s.to_string()),
limit: message["descriptor"]["filter"]["limit"]
.as_u64()
.map(|n| n as usize),
};
match store.query_messages(&query).await {
Ok(messages) => {
serde_json::json!({"status": {"code": 200}, "entries": messages})
}
Err(e) => {
serde_json::json!({"status": {"code": 500, "detail": e.to_string()}})
}
}
}
("Records", "Read") => {
let record_id = message["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.read_message(record_id).await {
Ok(Some(msg)) => {
serde_json::json!({"status": {"code": 200}, "entry": msg})
}
Ok(None) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => {
serde_json::json!({"status": {"code": 500, "detail": e.to_string()}})
}
}
}
("Records", "Delete") => {
let record_id = message["descriptor"]["recordId"]
.as_str()
.unwrap_or("");
match store.delete_message(record_id).await {
Ok(true) => serde_json::json!({"status": {"code": 200}}),
Ok(false) => serde_json::json!({"status": {"code": 404, "detail": "Record not found"}}),
Err(e) => {
serde_json::json!({"status": {"code": 500, "detail": e.to_string()}})
}
}
}
_ => {
serde_json::json!({"status": {"code": 400, "detail": format!("Unknown method: {}.{}", interface, method)}})
}
};
results.push(result);
}
// Return single result for single message, array for batch
let (response_body, http_status) = if results.len() == 1 {
let result = &results[0];
let status_code = result["status"]["code"].as_u64().unwrap_or(200);
let http_status = match status_code {
202 => StatusCode::ACCEPTED,
400 => StatusCode::BAD_REQUEST,
404 => StatusCode::NOT_FOUND,
500 => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::OK,
};
(result.to_string(), http_status)
} else {
(
serde_json::json!({"replies": results}).to_string(),
StatusCode::OK,
)
};
Ok(Response::builder()
.status(http_status)
.header("Content-Type", "application/json")
.body(hyper::Body::from(response_body))
.unwrap())
}
}