refactor(install): route orchestrator-managed apps through orchestrator first
Phase 3a of the install path consolidation. Two coupled changes:
1. install.rs handle_package_install: gate the legacy "container exists →
adopt + return" probe on !orchestrator_managed. Apps the orchestrator
knows about (bitcoin-knots, bitcoin-core, lnd, electrumx, fedimint,
filebrowser, btcpay-server stack apps, mempool stack apps, plus the
companion UIs that just moved to Quadlet) skip the legacy probe and
fall straight into the orchestrator branch.
The legacy adopt block was returning success on a bare `podman start`
exit-0 — even when the process inside the container crashed seconds
later. That's the .228 "running but unreachable" failure mode. The
orchestrator's ensure_running honors the manifest's health check and
pre-start hooks (e.g. re-renders bitcoin-ui's nginx.conf if the RPC
password rotated), so this is a behavioral upgrade, not just a
refactor.
2. ProdContainerOrchestrator::install: make idempotent. Previously it
blindly called install_fresh which would fail on `podman create` if
the container name already existed. Now it delegates to ensure_running:
- Container Running + healthy → no-op (refresh hooks, restart if
config rewritten)
- Container Stopped/Exited → start (with hook refresh)
- Container missing → install_fresh
- Container in wedged state (Created/Paused/Unknown) → force-recreate
Without this, change #1 would regress every "container already exists"
case for the 18 orchestrator-managed app IDs. With it, install becomes
the single source of truth for "make app X be in the desired state."
Tests: 654 passed across the workspace (614 unit + 37 orchestration + 3
rpc), 0 failures. The 20 prod_orchestrator tests cover the install /
ensure_running / reconcile paths the new install delegates through.
Net delta: install.rs grows by ~30 lines (gating wrapper + comments),
prod_orchestrator.rs grows by ~30 lines (idempotent install body). Both
are temporary — the larger deletions (~1700 lines) come once every app
has been verified through the orchestrator path in subsequent phases.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -120,7 +120,24 @@ impl RpcHandler {
|
|||||||
false
|
false
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check if container already exists
|
// For orchestrator-managed apps, skip the legacy "container exists →
|
||||||
|
// adopt + return" probe entirely. The orchestrator's own install path
|
||||||
|
// (below) calls ensure_running which:
|
||||||
|
// - no-ops if the container is already up and healthy,
|
||||||
|
// - removes + reinstalls if the container is broken,
|
||||||
|
// - actually verifies health via the manifest's health check
|
||||||
|
// (whereas the legacy adopt block returns success on a podman
|
||||||
|
// `start` exit-0, even if the process inside crashed seconds
|
||||||
|
// later — the .228 bitcoin "running but unreachable" failure
|
||||||
|
// mode).
|
||||||
|
// The adoption block is being phased out as apps move to the
|
||||||
|
// orchestrator path. Non-orchestrator apps still hit it.
|
||||||
|
let orchestrator_managed =
|
||||||
|
should_try_orchestrator_install(package_id, self.orchestrator.is_some());
|
||||||
|
|
||||||
|
// Check if container already exists (legacy adoption — non-orchestrator
|
||||||
|
// apps only).
|
||||||
|
if !orchestrator_managed {
|
||||||
let check_output = tokio::process::Command::new("podman")
|
let check_output = tokio::process::Command::new("podman")
|
||||||
.args([
|
.args([
|
||||||
"ps",
|
"ps",
|
||||||
@@ -168,7 +185,9 @@ impl RpcHandler {
|
|||||||
.args(["restart", package_id])
|
.args(["restart", package_id])
|
||||||
.output()
|
.output()
|
||||||
.await
|
.await
|
||||||
.context("Failed to restart existing container after bitcoin.conf repair")?;
|
.context(
|
||||||
|
"Failed to restart existing container after bitcoin.conf repair",
|
||||||
|
)?;
|
||||||
if !restart_output.status.success() {
|
if !restart_output.status.success() {
|
||||||
let stderr = String::from_utf8_lossy(&restart_output.stderr);
|
let stderr = String::from_utf8_lossy(&restart_output.stderr);
|
||||||
install_log(&format!(
|
install_log(&format!(
|
||||||
@@ -223,10 +242,11 @@ impl RpcHandler {
|
|||||||
"message": format!("Package {} already installed and running", package_id)
|
"message": format!("Package {} already installed and running", package_id)
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Preferred path for apps already modeled in the production orchestrator.
|
// Preferred path for apps already modeled in the production orchestrator.
|
||||||
// Keep legacy install flow as default while migration is in progress.
|
// Keep legacy install flow as default while migration is in progress.
|
||||||
if should_try_orchestrator_install(package_id, self.orchestrator.is_some()) {
|
if orchestrator_managed {
|
||||||
let orchestrator_app_id = orchestrator_install_app_id(package_id);
|
let orchestrator_app_id = orchestrator_install_app_id(package_id);
|
||||||
self.set_install_phase(package_id, InstallPhase::CreatingContainer)
|
self.set_install_phase(package_id, InstallPhase::CreatingContainer)
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -132,8 +132,7 @@ async fn fetch_bitcoin_status() -> Result<BitcoinNodeStatus> {
|
|||||||
.await
|
.await
|
||||||
.context("getindexinfo")
|
.context("getindexinfo")
|
||||||
.ok();
|
.ok();
|
||||||
let zmq_notifications =
|
let zmq_notifications = bitcoin_rpc_call(&client, "getzmqnotifications", serde_json::json!([]))
|
||||||
bitcoin_rpc_call(&client, "getzmqnotifications", serde_json::json!([]))
|
|
||||||
.await
|
.await
|
||||||
.context("getzmqnotifications")
|
.context("getzmqnotifications")
|
||||||
.ok();
|
.ok();
|
||||||
|
|||||||
@@ -103,9 +103,7 @@ impl BootReconciler {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let installed = self.orchestrator.manifest_ids().await;
|
let installed = self.orchestrator.manifest_ids().await;
|
||||||
for (companion, err) in
|
for (companion, err) in crate::container::companion::reconcile(&installed).await {
|
||||||
crate::container::companion::reconcile(&installed).await
|
|
||||||
{
|
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
companion = %companion,
|
companion = %companion,
|
||||||
error = %err,
|
error = %err,
|
||||||
@@ -260,7 +258,8 @@ mod tests {
|
|||||||
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
||||||
let shutdown = Arc::new(Notify::new());
|
let shutdown = Arc::new(Notify::new());
|
||||||
let reconciler =
|
let reconciler =
|
||||||
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone()).without_companion_stage();
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
||||||
|
.without_companion_stage();
|
||||||
let handle = tokio::spawn(reconciler.run_forever());
|
let handle = tokio::spawn(reconciler.run_forever());
|
||||||
|
|
||||||
// Yield so the spawned task gets CPU to run its initial reconcile.
|
// Yield so the spawned task gets CPU to run its initial reconcile.
|
||||||
@@ -284,7 +283,8 @@ mod tests {
|
|||||||
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
||||||
let shutdown = Arc::new(Notify::new());
|
let shutdown = Arc::new(Notify::new());
|
||||||
let reconciler =
|
let reconciler =
|
||||||
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone()).without_companion_stage();
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
||||||
|
.without_companion_stage();
|
||||||
let handle = tokio::spawn(reconciler.run_forever());
|
let handle = tokio::spawn(reconciler.run_forever());
|
||||||
|
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
@@ -313,7 +313,8 @@ mod tests {
|
|||||||
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
let orch = orch_with_one_running_manifest(rt.clone()).await;
|
||||||
let shutdown = Arc::new(Notify::new());
|
let shutdown = Arc::new(Notify::new());
|
||||||
let reconciler =
|
let reconciler =
|
||||||
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone()).without_companion_stage();
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
||||||
|
.without_companion_stage();
|
||||||
let handle = tokio::spawn(reconciler.run_forever());
|
let handle = tokio::spawn(reconciler.run_forever());
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
@@ -347,7 +348,8 @@ mod tests {
|
|||||||
.await;
|
.await;
|
||||||
let shutdown = Arc::new(Notify::new());
|
let shutdown = Arc::new(Notify::new());
|
||||||
let reconciler =
|
let reconciler =
|
||||||
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone()).without_companion_stage();
|
BootReconciler::new(orch.clone(), Duration::from_secs(30), shutdown.clone())
|
||||||
|
.without_companion_stage();
|
||||||
let handle = tokio::spawn(reconciler.run_forever());
|
let handle = tokio::spawn(reconciler.run_forever());
|
||||||
|
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
|
|||||||
@@ -26,9 +26,7 @@ use tokio::fs;
|
|||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::container::quadlet::{
|
use crate::container::quadlet::{self, BindMount, NetworkMode, QuadletUnit};
|
||||||
self, BindMount, NetworkMode, QuadletUnit,
|
|
||||||
};
|
|
||||||
use archipelago_container::image_uses_insecure_registry;
|
use archipelago_container::image_uses_insecure_registry;
|
||||||
|
|
||||||
const COMPANION_REGISTRY: &str = "146.59.87.168:3000/lfg2025";
|
const COMPANION_REGISTRY: &str = "146.59.87.168:3000/lfg2025";
|
||||||
@@ -152,7 +150,10 @@ pub async fn remove_for(package_id: &str) {
|
|||||||
pub async fn install_one(spec: &CompanionSpec) -> Result<()> {
|
pub async fn install_one(spec: &CompanionSpec) -> Result<()> {
|
||||||
if let Some(hook) = spec.pre_start {
|
if let Some(hook) = spec.pre_start {
|
||||||
hook().await.with_context(|| {
|
hook().await.with_context(|| {
|
||||||
format!("pre-start hook failed for {} — companion will not start", spec.name)
|
format!(
|
||||||
|
"pre-start hook failed for {} — companion will not start",
|
||||||
|
spec.name
|
||||||
|
)
|
||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
let image = ensure_image_present(spec).await?;
|
let image = ensure_image_present(spec).await?;
|
||||||
@@ -284,7 +285,10 @@ pub async fn reconcile(installed_apps: &[String]) -> Vec<(String, anyhow::Error)
|
|||||||
match needs_repair(spec).await {
|
match needs_repair(spec).await {
|
||||||
Ok(false) => {}
|
Ok(false) => {}
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
info!(companion = spec.name, "reconcile: companion not active, repairing");
|
info!(
|
||||||
|
companion = spec.name,
|
||||||
|
"reconcile: companion not active, repairing"
|
||||||
|
);
|
||||||
if let Err(e) = install_one(spec).await {
|
if let Err(e) = install_one(spec).await {
|
||||||
failures.push((spec.name.to_string(), e));
|
failures.push((spec.name.to_string(), e));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -707,11 +707,41 @@ enum HookOutcome {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ContainerOrchestrator for ProdContainerOrchestrator {
|
impl ContainerOrchestrator for ProdContainerOrchestrator {
|
||||||
async fn install(&self, app_id: &str) -> Result<String> {
|
async fn install(&self, app_id: &str) -> Result<String> {
|
||||||
|
// Idempotent: if the container is already up and healthy, just
|
||||||
|
// refresh hooks and return. If it's stopped, start it. If it's
|
||||||
|
// missing or in a wedged state, install fresh.
|
||||||
|
//
|
||||||
|
// The RPC install handler used to do its own "container exists →
|
||||||
|
// adopt + return" probe before calling here, but that path skipped
|
||||||
|
// health verification (the .228 "running but unreachable" failure
|
||||||
|
// mode). Routing every install through here means the orchestrator
|
||||||
|
// is the one source of truth for what "installed" means.
|
||||||
let lm = self.loaded(app_id).await?;
|
let lm = self.loaded(app_id).await?;
|
||||||
|
let name = compute_container_name(&lm.manifest);
|
||||||
|
// ensure_running takes the per-app lock itself; release the install
|
||||||
|
// path lock first if we hold one (we don't — install is the entry
|
||||||
|
// point). Just delegate.
|
||||||
|
let action = self.ensure_running(&lm).await?;
|
||||||
|
match action {
|
||||||
|
ReconcileAction::NoOp
|
||||||
|
| ReconcileAction::Started
|
||||||
|
| ReconcileAction::Installed => Ok(name),
|
||||||
|
ReconcileAction::Left(state) => {
|
||||||
|
// Container is in a wedged state (Created / Paused / Unknown).
|
||||||
|
// Force-recreate so the install RPC has a clean outcome.
|
||||||
|
tracing::warn!(
|
||||||
|
app_id = %app_id,
|
||||||
|
state = %state,
|
||||||
|
"install: container in wedged state, force-recreating"
|
||||||
|
);
|
||||||
let lock = self.app_lock(app_id).await;
|
let lock = self.app_lock(app_id).await;
|
||||||
let _guard = lock.lock().await;
|
let _guard = lock.lock().await;
|
||||||
|
let _ = self.runtime.stop_container(&name).await;
|
||||||
|
let _ = self.runtime.remove_container(&name).await;
|
||||||
self.install_fresh(&lm).await?;
|
self.install_fresh(&lm).await?;
|
||||||
Ok(compute_container_name(&lm.manifest))
|
Ok(name)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start(&self, app_id: &str) -> Result<()> {
|
async fn start(&self, app_id: &str) -> Result<()> {
|
||||||
|
|||||||
@@ -86,11 +86,9 @@ pub fn spawn_status_cache() {
|
|||||||
fresh.progress_pct = cached.progress_pct;
|
fresh.progress_pct = cached.progress_pct;
|
||||||
}
|
}
|
||||||
fresh.stale = true;
|
fresh.stale = true;
|
||||||
fresh.error = Some(
|
fresh.error = Some(fresh.error.unwrap_or_else(|| {
|
||||||
fresh
|
"ElectrumX is reconnecting; showing last known indexed height.".to_string()
|
||||||
.error
|
}));
|
||||||
.unwrap_or_else(|| "ElectrumX is reconnecting; showing last known indexed height.".to_string()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
*cached = fresh;
|
*cached = fresh;
|
||||||
drop(cached);
|
drop(cached);
|
||||||
|
|||||||
Reference in New Issue
Block a user