From 7ffe7d5bae1234d1f66a2477c76ff6156645b9cc Mon Sep 17 00:00:00 2001 From: Aaron Erickson Date: Wed, 1 Jul 2026 19:01:16 -0700 Subject: [PATCH 1/2] fix(docker): reconcile duplicate sandbox containers Signed-off-by: Aaron Erickson --- crates/openshell-driver-docker/src/lib.rs | 267 +++++++++++++++----- crates/openshell-driver-docker/src/tests.rs | 93 +++++++ 2 files changed, 293 insertions(+), 67 deletions(-) diff --git a/crates/openshell-driver-docker/src/lib.rs b/crates/openshell-driver-docker/src/lib.rs index a59352018..d662f2da1 100644 --- a/crates/openshell-driver-docker/src/lib.rs +++ b/crates/openshell-driver-docker/src/lib.rs @@ -49,6 +49,7 @@ use openshell_core::proto_struct::{ deserialize_optional_non_empty_string_list, struct_to_json_value, }; use openshell_core::{Config, Error, Result as CoreResult}; +use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::io::Read; use std::net::{IpAddr, SocketAddr}; @@ -641,8 +642,8 @@ impl DockerComputeDriver { async fn current_snapshots(&self) -> Result, Status> { let containers = self.list_managed_container_summaries().await?; - let container_sandboxes = containers - .iter() + let container_sandboxes = preferred_container_summaries_by_sandbox_id(&containers) + .into_values() .filter_map(|summary| { sandbox_from_container_summary(summary, self.supervisor_readiness.as_ref()) }) @@ -853,10 +854,10 @@ impl DockerComputeDriver { task.abort(); } - let Some(container) = self - .find_managed_container_summary(sandbox_id, sandbox_name) - .await? - else { + let containers = self + .find_managed_container_summaries(sandbox_id, sandbox_name) + .await?; + if containers.is_empty() { if let Some(record) = pending { let container_name = container_name_for_sandbox(&record.sandbox); match self @@ -881,36 +882,48 @@ impl DockerComputeDriver { } } return Ok(false); - }; - let Some(target) = summary_container_target(&container) else { - return Ok(pending.is_some()); - }; - - match self - .docker - .remove_container( - &target, - Some(RemoveContainerOptionsBuilder::default().force(true).build()), - ) - .await - { - Ok(()) => { + } + let targets = managed_container_targets(&containers); + if targets.is_empty() { + if pending.is_some() { cleanup_sandbox_token_file_for_delete(sandbox_id, pending.as_ref(), &self.config); - Ok(true) } - Err(err) if is_not_found_error(&err) => { - cleanup_sandbox_token_file_for_delete(sandbox_id, pending.as_ref(), &self.config); - Ok(pending.is_some()) + return Ok(pending.is_some()); + } + + let mut deleted = false; + let mut failures = Vec::new(); + for target in targets { + match self + .docker + .remove_container( + &target, + Some(RemoveContainerOptionsBuilder::default().force(true).build()), + ) + .await + { + Ok(()) => deleted = true, + Err(err) if is_not_found_error(&err) => {} + Err(err) => failures.push(format!("{target}: {err}")), } - Err(err) => Err(internal_status("delete docker sandbox container", err)), } + + if !failures.is_empty() { + return Err(Status::internal(format!( + "delete Docker sandbox containers: {}", + failures.join("; ") + ))); + } + + cleanup_sandbox_token_file_for_delete(sandbox_id, pending.as_ref(), &self.config); + Ok(deleted || pending.is_some()) } async fn stop_sandbox_inner(&self, sandbox_id: &str, sandbox_name: &str) -> Result<(), Status> { - let Some(container) = self - .find_managed_container_summary(sandbox_id, sandbox_name) - .await? - else { + let containers = self + .find_managed_container_summaries(sandbox_id, sandbox_name) + .await?; + if containers.is_empty() { if let Some(record) = self.remove_pending_sandbox(sandbox_id, sandbox_name).await { if let Some(task) = record.task { task.abort(); @@ -920,27 +933,44 @@ impl DockerComputeDriver { return Ok(()); } return Err(Status::not_found("sandbox not found")); - }; - let Some(target) = summary_container_target(&container) else { + } + let targets = managed_container_targets(&containers); + if targets.is_empty() { return Err(Status::not_found("sandbox container has no id or name")); - }; + } - match self - .docker - .stop_container( - &target, - Some( - StopContainerOptionsBuilder::default() - .t(docker_stop_timeout_secs(self.config.stop_timeout_secs)) - .build(), - ), - ) - .await - { - Ok(()) => Ok(()), - Err(err) if is_not_modified_error(&err) => Ok(()), - Err(err) if is_not_found_error(&err) => Err(Status::not_found("sandbox not found")), - Err(err) => Err(internal_status("stop docker sandbox container", err)), + let mut stopped = false; + let mut failures = Vec::new(); + for target in targets { + match self + .docker + .stop_container( + &target, + Some( + StopContainerOptionsBuilder::default() + .t(docker_stop_timeout_secs(self.config.stop_timeout_secs)) + .build(), + ), + ) + .await + { + Ok(()) => stopped = true, + Err(err) if is_not_modified_error(&err) => stopped = true, + Err(err) if is_not_found_error(&err) => {} + Err(err) => failures.push(format!("{target}: {err}")), + } + } + + if !failures.is_empty() { + return Err(Status::internal(format!( + "stop Docker sandbox containers: {}", + failures.join("; ") + ))); + } + if stopped { + Ok(()) + } else { + Err(Status::not_found("sandbox not found")) } } @@ -1260,11 +1290,11 @@ impl DockerComputeDriver { .map_err(|err| internal_status("list Docker sandbox containers", err)) } - async fn find_managed_container_summary( + async fn find_managed_container_summaries( &self, sandbox_id: &str, sandbox_name: &str, - ) -> Result, Status> { + ) -> Result, Status> { let mut label_filter_values = Vec::new(); if !sandbox_id.is_empty() { label_filter_values.push(format!("{LABEL_SANDBOX_ID}={sandbox_id}")); @@ -1285,23 +1315,24 @@ impl DockerComputeDriver { .await .map_err(|err| internal_status("find Docker sandbox container", err))?; - Ok(containers.into_iter().find(|summary| { - let Some(labels) = summary.labels.as_ref() else { - return false; - }; - let namespace_matches = labels - .get(LABEL_SANDBOX_NAMESPACE) - .is_some_and(|value| value == &self.config.sandbox_namespace); - let id_matches = sandbox_id.is_empty() - || labels - .get(LABEL_SANDBOX_ID) - .is_some_and(|value| value == sandbox_id); - let name_matches = sandbox_name.is_empty() - || labels - .get(LABEL_SANDBOX_NAME) - .is_some_and(|value| value == sandbox_name); - namespace_matches && id_matches && name_matches - })) + Ok(matching_managed_container_summaries( + containers, + &self.config.sandbox_namespace, + sandbox_id, + sandbox_name, + )) + } + + async fn find_managed_container_summary( + &self, + sandbox_id: &str, + sandbox_name: &str, + ) -> Result, Status> { + Ok(self + .find_managed_container_summaries(sandbox_id, sandbox_name) + .await? + .into_iter() + .max_by(compare_container_summary_preference)) } async fn ensure_image_available(&self, sandbox_id: &str, image: &str) -> Result<(), Status> { @@ -2691,6 +2722,101 @@ fn parse_memory_limit(value: &str) -> Result, Status> { Ok(Some((amount * multiplier).round() as i64)) } +fn compare_container_summary_preference( + left: &ContainerSummary, + right: &ContainerSummary, +) -> Ordering { + let left_running = matches!(left.state, Some(ContainerSummaryStateEnum::RUNNING)); + let right_running = matches!(right.state, Some(ContainerSummaryStateEnum::RUNNING)); + + left_running + .cmp(&right_running) + .then_with(|| { + is_canonical_container_summary(left).cmp(&is_canonical_container_summary(right)) + }) + .then_with(|| left.created.cmp(&right.created)) + // Docker container IDs are unique and make equal-state, equal-time + // selection deterministic regardless of daemon response order. + .then_with(|| left.id.cmp(&right.id)) +} + +fn is_canonical_container_summary(summary: &ContainerSummary) -> bool { + let Some(labels) = summary.labels.as_ref() else { + return false; + }; + let Some(sandbox_id) = labels.get(LABEL_SANDBOX_ID) else { + return false; + }; + let Some(sandbox_name) = labels.get(LABEL_SANDBOX_NAME) else { + return false; + }; + let expected = container_name_for_sandbox(&DriverSandbox { + id: sandbox_id.clone(), + name: sandbox_name.clone(), + ..Default::default() + }); + summary.names.as_ref().is_some_and(|names| { + names + .iter() + .any(|name| name.trim_start_matches('/') == expected) + }) +} + +fn matching_managed_container_summaries( + containers: Vec, + sandbox_namespace: &str, + sandbox_id: &str, + sandbox_name: &str, +) -> Vec { + containers + .into_iter() + .filter(|summary| { + let Some(labels) = summary.labels.as_ref() else { + return false; + }; + let namespace_matches = labels + .get(LABEL_SANDBOX_NAMESPACE) + .is_some_and(|value| value == sandbox_namespace); + let id_matches = sandbox_id.is_empty() + || labels + .get(LABEL_SANDBOX_ID) + .is_some_and(|value| value == sandbox_id); + let name_matches = sandbox_name.is_empty() + || labels + .get(LABEL_SANDBOX_NAME) + .is_some_and(|value| value == sandbox_name); + namespace_matches && id_matches && name_matches + }) + .collect() +} + +fn preferred_container_summaries_by_sandbox_id( + summaries: &[ContainerSummary], +) -> HashMap<&str, &ContainerSummary> { + let mut preferred = HashMap::new(); + for summary in summaries { + let Some(sandbox_id) = summary + .labels + .as_ref() + .and_then(|labels| labels.get(LABEL_SANDBOX_ID)) + else { + continue; + }; + + match preferred.entry(sandbox_id.as_str()) { + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(summary); + } + std::collections::hash_map::Entry::Occupied(mut entry) => { + if compare_container_summary_preference(summary, entry.get()).is_gt() { + entry.insert(summary); + } + } + } + } + preferred +} + fn sandbox_from_container_summary( summary: &ContainerSummary, readiness: &dyn SupervisorReadiness, @@ -2807,6 +2933,13 @@ fn summary_container_target(summary: &ContainerSummary) -> Option { .or_else(|| summary_container_name(summary)) } +fn managed_container_targets(summaries: &[ContainerSummary]) -> Vec { + summaries + .iter() + .filter_map(summary_container_target) + .collect() +} + fn container_state_needs_shutdown_stop(state: ContainerSummaryStateEnum) -> bool { matches!( state, diff --git a/crates/openshell-driver-docker/src/tests.rs b/crates/openshell-driver-docker/src/tests.rs index 923c6d618..b911167c9 100644 --- a/crates/openshell-driver-docker/src/tests.rs +++ b/crates/openshell-driver-docker/src/tests.rs @@ -1539,6 +1539,99 @@ fn build_container_create_body_uses_runtime_namespace_label() { ); } +fn managed_container_summary( + container_id: &str, + state: ContainerSummaryStateEnum, + created: i64, +) -> ContainerSummary { + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + ContainerSummary { + id: Some(container_id.to_string()), + names: Some(vec![format!("/{}", container_name_for_sandbox(&sandbox))]), + labels: Some(HashMap::from([ + (LABEL_SANDBOX_ID.to_string(), sandbox.id), + (LABEL_SANDBOX_NAME.to_string(), sandbox.name), + (LABEL_SANDBOX_NAMESPACE.to_string(), "default".to_string()), + ])), + state: Some(state), + created: Some(created), + ..Default::default() + } +} + +fn selected_container_id(summaries: &[ContainerSummary]) -> &str { + preferred_container_summaries_by_sandbox_id(summaries) + .get("sbx-1") + .and_then(|summary| summary.id.as_deref()) + .expect("sandbox has a preferred container") +} + +#[test] +fn preferred_container_summary_selects_running_in_both_list_orders() { + let running = managed_container_summary("running", ContainerSummaryStateEnum::RUNNING, 10); + let newer_exited = + managed_container_summary("newer-exited", ContainerSummaryStateEnum::EXITED, 20); + + assert_eq!( + selected_container_id(&[running.clone(), newer_exited.clone()]), + "running" + ); + assert_eq!(selected_container_id(&[newer_exited, running]), "running"); +} + +#[test] +fn preferred_container_summary_selects_newer_same_state_in_both_list_orders() { + let older = managed_container_summary("older", ContainerSummaryStateEnum::EXITED, 10); + let newer = managed_container_summary("newer", ContainerSummaryStateEnum::EXITED, 20); + + assert_eq!( + selected_container_id(&[older.clone(), newer.clone()]), + "newer" + ); + assert_eq!(selected_container_id(&[newer, older]), "newer"); +} + +#[test] +fn preferred_container_summary_selects_canonical_name_for_equal_time_clones() { + let canonical = managed_container_summary("aaa", ContainerSummaryStateEnum::EXITED, 10); + let mut backup = managed_container_summary("zzz", ContainerSummaryStateEnum::EXITED, 10); + backup.names = Some(vec!["/openshell-demo-nemoclaw-gpu-backup-1234".to_string()]); + + assert_eq!( + selected_container_id(&[canonical.clone(), backup.clone()]), + "aaa" + ); + assert_eq!(selected_container_id(&[backup, canonical]), "aaa"); +} + +#[test] +fn matching_managed_container_targets_include_all_duplicate_containers() { + let first = managed_container_summary("first", ContainerSummaryStateEnum::RUNNING, 10); + let second = managed_container_summary("second", ContainerSummaryStateEnum::EXITED, 10); + let mut other_namespace = + managed_container_summary("other", ContainerSummaryStateEnum::RUNNING, 10); + other_namespace + .labels + .as_mut() + .expect("labels") + .insert(LABEL_SANDBOX_NAMESPACE.to_string(), "other".to_string()); + + let matches = matching_managed_container_summaries( + vec![first, second, other_namespace], + "default", + "sbx-1", + "demo", + ); + let mut targets = managed_container_targets(&matches); + targets.sort(); + + assert_eq!(targets, vec!["first", "second"]); +} + #[test] fn driver_status_keeps_running_sandboxes_provisioning_with_stable_message() { let running = ContainerSummary { From 378674280712766e34ff17d64dace59849608adf Mon Sep 17 00:00:00 2001 From: Aaron Erickson Date: Fri, 3 Jul 2026 10:26:24 -0700 Subject: [PATCH 2/2] fix(docker): enforce sandbox instance ownership Signed-off-by: Aaron Erickson --- Cargo.lock | 2 + architecture/compute-runtimes.md | 9 + crates/openshell-driver-docker/Cargo.toml | 2 + crates/openshell-driver-docker/README.md | 43 + crates/openshell-driver-docker/src/lib.rs | 1828 +++++++++++++++++-- crates/openshell-driver-docker/src/tests.rs | 904 ++++++++- crates/openshell-server/src/compute/mod.rs | 639 +++++-- crates/openshell-server/src/lib.rs | 12 +- 8 files changed, 3158 insertions(+), 281 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 13b670f55..a2c1d9fbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3659,6 +3659,7 @@ dependencies = [ "prost-types", "serde", "serde_json", + "sha2 0.10.9", "tar", "temp-env", "tempfile", @@ -3667,6 +3668,7 @@ dependencies = [ "tonic", "tracing", "url", + "uuid", ] [[package]] diff --git a/architecture/compute-runtimes.md b/architecture/compute-runtimes.md index f122fda5d..e81c7e00e 100644 --- a/architecture/compute-runtimes.md +++ b/architecture/compute-runtimes.md @@ -36,6 +36,15 @@ when a sandbox create request asks for GPU resources. | VM | Experimental microVM isolation. | Per-sandbox libkrun VM. | Managed endpoint-backed driver. The gateway spawns `openshell-driver-vm`, waits for its Unix socket, and then consumes it through the same remote `compute_driver.proto` path used by unmanaged endpoint drivers. The VM driver boots a cached bootstrap `rootfs.ext4`, prepares requested OCI images inside a bootstrap VM with `umoci`, attaches the prepared image disk read-only, and gives each sandbox a writable `overlay.ext4` for merged-root changes and runtime material. The driver persists each accepted launch request beside the overlay and restarts those VMs on driver startup without recreating the overlay. | | Extension | Out-of-tree drivers operated alongside the gateway. | Whatever boundary the driver implements. | Selected by a non-reserved custom `compute_drivers = [""]` entry with `[openshell.drivers.].socket_path`, or at launch time by pairing `--drivers ` with `--compute-driver-socket=`. Reserved built-in names such as `vm`, `docker`, `podman`, and `kubernetes` cannot be used as unmanaged socket endpoints. The gateway connects to a UDS the operator already provisioned, runs `GetCapabilities`, logs the advertised `driver_name`, and dispatches all sandbox lifecycle calls through `compute_driver.proto`. The driver process and socket lifecycle are operator-owned; the gateway does not spawn, supervise, or remove unmanaged extension drivers. The trust boundary is the socket's filesystem permissions: the operator must ensure only the gateway uid can read/write it. | +The Docker driver treats container labels as discovery metadata rather than +proof of ownership. It reconciles a persisted sandbox against the recorded +container ID and a driver-issued, privately journaled instance generation. The +NemoClaw compatibility handoff journals an exact old-ID/new-ID intent and +requires two consistent observations before adoption; the same generation can +recover a missed overlap without deleting durable sandbox state. Missing or +incorrect generations, legacy containers, and ambiguous candidates fail closed +with an operator-visible warning. + Per-sandbox CPU and memory values currently enter the driver layer through template resource limits. Docker and Podman apply them as runtime limits. Kubernetes mirrors each limit into the matching request. VM accepts the fields diff --git a/crates/openshell-driver-docker/Cargo.toml b/crates/openshell-driver-docker/Cargo.toml index 7e1bc069c..97a6fd22b 100644 --- a/crates/openshell-driver-docker/Cargo.toml +++ b/crates/openshell-driver-docker/Cargo.toml @@ -21,6 +21,8 @@ tracing = { workspace = true } bytes = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +sha2 = { workspace = true } +uuid = { workspace = true } prost-types = { workspace = true } bollard = { version = "0.20" } tar = "0.4" diff --git a/crates/openshell-driver-docker/README.md b/crates/openshell-driver-docker/README.md index 4f2977add..e0f9b3aca 100644 --- a/crates/openshell-driver-docker/README.md +++ b/crates/openshell-driver-docker/README.md @@ -19,6 +19,49 @@ Desktop, OrbStack, and macOS-hosted gateways, those names use Docker's `host-gateway` alias. On native Linux Docker, the gateway also binds the bridge gateway IP so containers can call back to the host process. +## Container Instance Ownership + +Docker labels identify containers for discovery and cleanup. They do not grant +authority to represent a persisted sandbox because another Docker client can +copy them. The driver records the authoritative container ID from the Docker +create response and restores it from `Sandbox.status.agent_pod` at gateway +startup. New containers also receive a driver-issued instance-generation label. +That label is not a secret; it is checked against an owner-only journal under +`$XDG_STATE_HOME/openshell/docker-sandbox-instances//`, so +generic copied identity labels are insufficient to authorize a replacement. +The local Docker driver assumes that Docker-daemon access remains trusted host +authority. + +The journal is written after Docker returns the created container ID and before +the driver publishes it as managed. It durably records the current ID, earlier +IDs, generation, and any exact replacement intent. A crash before the first +journal write fails closed as unresolved ownership. A crash after a replacement +journal commit can recover even if the public sandbox status still contains the +previous ID. + +The driver permits one compatibility handoff used by NemoClaw's Docker GPU +patch. The current authoritative container must remain present in the `exited` +state under the exact `-nemoclaw-gpu-backup-` name while +exactly one active replacement uses the canonical name and carries the recorded +generation. The first consistent observation journals an intent for those exact +container IDs; a subsequent consistent observation adopts the replacement. If +polling observes the gap or misses the overlap, the durable sandbox is retained. +One canonical, generation-matching replacement can then be authorized and +adopted through the same two-observation intent without resurrecting a skeletal +sandbox record. + +Ambiguous candidates, terminal replacements, interrupted intents, and missing +or incorrect generations fail closed as ownership conflicts. Containers created +by an older driver without a journaled generation cannot use the external +handoff; recreate them through OpenShell first so the driver can establish the +generation. Explicit sandbox deletion removes every exact identity match, then +clears the driver's in-memory and journaled ownership. + +Missing instances, ignored unowned containers, duplicate candidates, rejected +replacements, and accepted handoff or rollback transitions produce deduplicated +warnings and warning-level driver platform events. Notices generated before a +watcher subscribes remain eligible for delivery on the first watched snapshot. + ## Container Contract The driver-controlled container settings are part of the sandbox security diff --git a/crates/openshell-driver-docker/src/lib.rs b/crates/openshell-driver-docker/src/lib.rs index c26662824..95cc7812f 100644 --- a/crates/openshell-driver-docker/src/lib.rs +++ b/crates/openshell-driver-docker/src/lib.rs @@ -49,15 +49,18 @@ use openshell_core::proto_struct::{ deserialize_optional_non_empty_string_list, struct_to_json_value, }; use openshell_core::{Config, Error, Result as CoreResult}; -use std::cmp::Ordering; +use sha2::{Digest, Sha256}; use std::collections::{HashMap, HashSet}; -use std::io::Read; +use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering as AtomicOrdering}, +}; use std::time::Duration; -use tokio::sync::{Mutex, broadcast, mpsc}; +use tokio::sync::{Mutex, broadcast, mpsc, oneshot}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -67,6 +70,8 @@ use url::Url; const WATCH_BUFFER: usize = 128; const WATCH_POLL_INTERVAL: Duration = Duration::from_secs(2); const WATCH_POLL_MAX_BACKOFF: Duration = Duration::from_secs(30); +const NEMOCLAW_GPU_BACKUP_MARKER: &str = "-nemoclaw-gpu-backup-"; +const LABEL_INSTANCE_GENERATION: &str = "openshell.ai/docker-instance-generation"; const SUPERVISOR_MOUNT_PATH: &str = openshell_core::driver_utils::SUPERVISOR_CONTAINER_BINARY; const TLS_CA_MOUNT_PATH: &str = openshell_core::driver_utils::TLS_CA_MOUNT_PATH; @@ -238,6 +243,7 @@ struct DockerDriverRuntimeConfig { allow_all_default_gpu: bool, sandbox_pids_limit: i64, enable_bind_mounts: bool, + instance_state_dir: PathBuf, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -255,6 +261,7 @@ pub struct DockerComputeDriver { config: DockerDriverRuntimeConfig, events: broadcast::Sender, pending: Arc>>, + instances: Arc>, supervisor_readiness: Arc, gpu_selector: Arc, } @@ -262,6 +269,102 @@ pub struct DockerComputeDriver { struct PendingSandboxRecord { sandbox: DriverSandbox, task: Option>, + cancelled: Arc, +} + +#[derive(Debug, Default)] +struct DockerInstanceOwnership { + sandboxes: HashMap, + last_notices: HashMap, + restoration_complete: bool, +} + +#[derive(Debug, Clone)] +struct TrackedSandboxInstances { + sandbox_id: String, + sandbox_name: String, + namespace: String, + authoritative_id: String, + retired_ids: HashSet, + instance_generation: String, + replacement_intent: Option, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct PersistedDockerInstanceOwnership { + version: u32, + sandbox_id: String, + sandbox_name: String, + namespace: String, + authoritative_id: String, + retired_ids: Vec, + #[serde(default)] + instance_generation: String, + #[serde(default)] + replacement_intent: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +struct DockerReplacementIntent { + incumbent_id: String, + replacement_id: String, + backup_name: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum TrackedContainerResolution { + OwnershipUnresolved { + candidate_ids: Vec, + }, + Authoritative { + index: usize, + unexpected_ids: Vec, + }, + Replacement { + incumbent_index: usize, + replacement_index: usize, + }, + ReplacementFromIntent { + replacement_index: usize, + }, + ReplacementIntentRequired { + incumbent_index: usize, + replacement_index: usize, + backup_name: String, + }, + ReplacementIntentRequiredWithoutIncumbent { + replacement_index: usize, + }, + ReplacementIntentInvalidated { + candidate_ids: Vec, + }, + HandoffPending { + incumbent_index: usize, + candidate_ids: Vec, + }, + Rollback { + index: usize, + }, + Missing, + Conflict { + candidate_ids: Vec, + }, +} + +#[derive(Debug)] +struct OwnershipNotice { + sandbox_id: String, + key: String, + reason: &'static str, + message: String, + metadata: HashMap, +} + +#[derive(Debug)] +enum TrackedSandboxObservation { + Container(ContainerSummary), + Conflict(DriverSandbox), + Untracked, } #[derive(Debug, Clone)] @@ -401,6 +504,15 @@ impl DockerComputeDriver { let daemon_arch = normalize_docker_arch(version.arch.as_deref().unwrap_or_default()); let supervisor_bin = resolve_supervisor_bin(&docker, &docker_config, &daemon_arch).await?; let guest_tls = docker_guest_tls_paths(&docker_config)?; + let instance_state_dir = openshell_core::paths::xdg_state_dir() + .map_err(|err| { + Error::config(format!("resolve Docker instance state directory: {err}")) + })? + .join("openshell") + .join("docker-sandbox-instances") + .join(docker_instance_namespace_dir_name( + &docker_config.sandbox_namespace, + )); let driver = Self { docker: Arc::new(docker), @@ -421,9 +533,11 @@ impl DockerComputeDriver { allow_all_default_gpu, sandbox_pids_limit: docker_config.sandbox_pids_limit, enable_bind_mounts: docker_config.enable_bind_mounts, + instance_state_dir, }, events: broadcast::channel(WATCH_BUFFER).0, pending: Arc::new(Mutex::new(HashMap::new())), + instances: Arc::new(Mutex::new(DockerInstanceOwnership::default())), supervisor_readiness, gpu_selector: Arc::new(CdiGpuDefaultSelector::new( cdi_gpu_inventory, @@ -632,13 +746,27 @@ impl DockerComputeDriver { sandbox_id: &str, sandbox_name: &str, ) -> Result, Status> { - let container = self - .find_managed_container_summary(sandbox_id, sandbox_name) + let tracked_id = self + .resolve_tracked_sandbox_id(sandbox_id, sandbox_name) .await?; - if let Some(sandbox) = container.and_then(|summary| { - sandbox_from_container_summary(&summary, self.supervisor_readiness.as_ref()) - }) { - return Ok(Some(sandbox)); + let containers = self + .find_managed_container_summaries(sandbox_id, sandbox_name) + .await?; + if let Some(tracked_id) = tracked_id { + match self + .resolve_tracked_sandbox_observation(&tracked_id, sandbox_name, &containers) + .await + { + TrackedSandboxObservation::Container(summary) => { + if let Some(sandbox) = + sandbox_from_container_summary(&summary, self.supervisor_readiness.as_ref()) + { + return Ok(Some(sandbox)); + } + } + TrackedSandboxObservation::Conflict(sandbox) => return Ok(Some(sandbox)), + TrackedSandboxObservation::Untracked => {} + } } Ok(self.pending_snapshot(sandbox_id, sandbox_name).await) @@ -646,12 +774,7 @@ impl DockerComputeDriver { async fn current_snapshots(&self) -> Result, Status> { let containers = self.list_managed_container_summaries().await?; - let container_sandboxes = preferred_container_summaries_by_sandbox_id(&containers) - .into_values() - .filter_map(|summary| { - sandbox_from_container_summary(summary, self.supervisor_readiness.as_ref()) - }) - .collect::>(); + let container_sandboxes = self.resolve_tracked_sandbox_snapshots(&containers).await; let mut by_id = self.pending_snapshot_map().await; for sandbox in container_sandboxes { by_id.insert(sandbox.id.clone(), sandbox); @@ -680,15 +803,15 @@ impl DockerComputeDriver { gpu_devices.as_deref(), )?; - if self - .find_managed_container_summary(&sandbox.id, &sandbox.name) + if !self + .find_managed_container_summaries(&sandbox.id, &sandbox.name) .await? - .is_some() + .is_empty() { return Err(Status::already_exists("sandbox already exists")); } - self.reserve_pending_sandbox(sandbox).await?; + let cancellation = self.reserve_pending_sandbox(sandbox).await?; let image = sandbox_image(sandbox).unwrap_or_default(); self.publish_docker_progress( &sandbox.id, @@ -706,22 +829,45 @@ impl DockerComputeDriver { let driver = self.clone(); let sandbox_for_task = sandbox.clone(); let sandbox_id = sandbox.id.clone(); - let task = tokio::spawn(async move { - driver.provision_sandbox(sandbox_for_task).await; - }); + let task_cancellation = cancellation.clone(); + let (start_tx, start_rx) = oneshot::channel(); + let mut task = Some(tokio::spawn(async move { + if start_rx.await.is_ok() { + driver + .provision_sandbox(sandbox_for_task, task_cancellation) + .await; + } + })); - let mut pending = self.pending.lock().await; - if let Some(record) = pending.get_mut(&sandbox_id) { - record.task = Some(task); + let task_registered = { + let mut pending = self.pending.lock().await; + if let Some(record) = pending.get_mut(&sandbox_id) + && !record.cancelled.load(AtomicOrdering::Acquire) + { + record.task = task.take(); + true + } else { + false + } + }; + if task_registered { + let _ = start_tx.send(()); } else { - task.abort(); + cancellation.store(true, AtomicOrdering::Release); + drop(start_tx); + if let Some(task) = task { + let _ = task.await; + } } Ok(()) } - async fn provision_sandbox(&self, sandbox: DriverSandbox) { - match self.provision_sandbox_inner(&sandbox).await { + async fn provision_sandbox(&self, sandbox: DriverSandbox, cancellation: Arc) { + match self + .provision_sandbox_inner(&sandbox, cancellation.as_ref()) + .await + { Ok(()) => { self.clear_pending_sandbox(&sandbox.id).await; } @@ -734,7 +880,14 @@ impl DockerComputeDriver { async fn provision_sandbox_inner( &self, sandbox: &DriverSandbox, + cancellation: &AtomicBool, ) -> Result<(), DockerProvisioningFailure> { + if cancellation.load(AtomicOrdering::Acquire) { + return Err(DockerProvisioningFailure::new( + "ContainerCreateCancelled", + "Docker sandbox creation was cancelled", + )); + } let validated = Self::validated_sandbox(sandbox, &self.config).map_err(|status| { DockerProvisioningFailure::new("ContainerCreateFailed", status.message()) })?; @@ -744,6 +897,12 @@ impl DockerComputeDriver { .map_err(|status| { DockerProvisioningFailure::new("ImagePullFailed", status.message()) })?; + if cancellation.load(AtomicOrdering::Acquire) { + return Err(DockerProvisioningFailure::new( + "ContainerCreateCancelled", + "Docker sandbox creation was cancelled", + )); + } let token_file_created = write_sandbox_token_file(sandbox, &self.config) .await .map_err(|status| { @@ -764,7 +923,8 @@ impl DockerComputeDriver { } DockerProvisioningFailure::new("ContainerCreateFailed", status.message()) })?; - let create_body = build_container_create_body_with_gpu_devices( + let instance_generation = uuid::Uuid::new_v4().to_string(); + let mut create_body = build_container_create_body_with_gpu_devices( sandbox, &self.config, &validated.driver_config, @@ -776,7 +936,12 @@ impl DockerComputeDriver { } DockerProvisioningFailure::new("ContainerCreateFailed", status.message()) })?; - self.docker + create_body.labels.get_or_insert_default().insert( + LABEL_INSTANCE_GENERATION.to_string(), + instance_generation.clone(), + ); + let created = self + .docker .create_container( Some( CreateContainerOptionsBuilder::default() @@ -795,6 +960,30 @@ impl DockerComputeDriver { create_status_from_docker_error("create docker sandbox container", err), ) })?; + if let Err(status) = self + .track_created_instance( + sandbox, + &created.id, + &instance_generation, + Some(cancellation), + ) + .await + { + let _ = self + .docker + .remove_container( + &created.id, + Some(RemoveContainerOptionsBuilder::default().force(true).build()), + ) + .await; + if token_file_created { + cleanup_sandbox_token_file(sandbox, &self.config); + } + return Err(DockerProvisioningFailure::from_status( + "ContainerOwnershipPersistFailed", + status, + )); + } self.publish_docker_progress( &sandbox.id, "Created", @@ -821,6 +1010,8 @@ impl DockerComputeDriver { if token_file_created { cleanup_sandbox_token_file(sandbox, &self.config); } + self.forget_tracked_sandbox(&sandbox.id, &sandbox.name) + .await; return Err(DockerProvisioningFailure::from_status( "ContainerStartFailed", create_status_from_docker_error("start docker sandbox container", err), @@ -851,18 +1042,24 @@ impl DockerComputeDriver { sandbox_id: &str, sandbox_name: &str, ) -> Result { - let pending = self.remove_pending_sandbox(sandbox_id, sandbox_name).await; - if let Some(record) = pending.as_ref() - && let Some(task) = record.task.as_ref() + let tracked_id = self + .resolve_tracked_sandbox_id(sandbox_id, sandbox_name) + .await?; + let effective_id = tracked_id.as_deref().unwrap_or(sandbox_id); + let mut pending = self + .remove_pending_sandbox(effective_id, sandbox_name) + .await; + if let Some(record) = pending.as_mut() + && let Some(task) = record.task.take() { - task.abort(); + let _ = task.await; } let containers = self - .find_managed_container_summaries(sandbox_id, sandbox_name) + .find_managed_container_summaries(effective_id, sandbox_name) .await?; if containers.is_empty() { - if let Some(record) = pending { + if let Some(record) = pending.as_ref() { let container_name = container_name_for_sandbox(&record.sandbox); match self .docker @@ -874,10 +1071,14 @@ impl DockerComputeDriver { { Ok(()) => { cleanup_sandbox_token_file(&record.sandbox, &self.config); + self.forget_tracked_sandbox(&record.sandbox.id, &record.sandbox.name) + .await; return Ok(true); } Err(err) if is_not_found_error(&err) => { cleanup_sandbox_token_file(&record.sandbox, &self.config); + self.forget_tracked_sandbox(&record.sandbox.id, &record.sandbox.name) + .await; return Ok(true); } Err(err) => { @@ -885,13 +1086,16 @@ impl DockerComputeDriver { } } } + cleanup_sandbox_token_file_for_delete(effective_id, None, &self.config); + self.forget_tracked_sandbox(effective_id, sandbox_name) + .await; return Ok(false); } let targets = managed_container_targets(&containers); if targets.is_empty() { - if pending.is_some() { - cleanup_sandbox_token_file_for_delete(sandbox_id, pending.as_ref(), &self.config); - } + cleanup_sandbox_token_file_for_delete(effective_id, pending.as_ref(), &self.config); + self.forget_tracked_sandbox(effective_id, sandbox_name) + .await; return Ok(pending.is_some()); } @@ -919,20 +1123,33 @@ impl DockerComputeDriver { ))); } - cleanup_sandbox_token_file_for_delete(sandbox_id, pending.as_ref(), &self.config); + cleanup_sandbox_token_file_for_delete(effective_id, pending.as_ref(), &self.config); + self.forget_tracked_sandbox(effective_id, sandbox_name) + .await; Ok(deleted || pending.is_some()) } async fn stop_sandbox_inner(&self, sandbox_id: &str, sandbox_name: &str) -> Result<(), Status> { + let tracked_id = self + .resolve_tracked_sandbox_id(sandbox_id, sandbox_name) + .await?; + let effective_id = tracked_id.as_deref().unwrap_or(sandbox_id); + let mut pending = self + .remove_pending_sandbox(effective_id, sandbox_name) + .await; + if let Some(record) = pending.as_mut() + && let Some(task) = record.task.take() + { + let _ = task.await; + } let containers = self - .find_managed_container_summaries(sandbox_id, sandbox_name) + .find_managed_container_summaries(effective_id, sandbox_name) .await?; if containers.is_empty() { - if let Some(record) = self.remove_pending_sandbox(sandbox_id, sandbox_name).await { - if let Some(task) = record.task { - task.abort(); - } + if let Some(record) = pending { cleanup_sandbox_token_file(&record.sandbox, &self.config); + self.forget_tracked_sandbox(&record.sandbox.id, &record.sandbox.name) + .await; self.publish_deleted(record.sandbox.id); return Ok(()); } @@ -990,16 +1207,32 @@ impl DockerComputeDriver { &self, sandbox_id: &str, sandbox_name: &str, + expected_instance_id: &str, + should_start: bool, ) -> Result { - let Some(container) = self - .find_managed_container_summary(sandbox_id, sandbox_name) - .await? - else { + self.track_persisted_instance(sandbox_id, sandbox_name, expected_instance_id) + .await?; + let containers = self + .find_managed_container_summaries(sandbox_id, sandbox_name) + .await?; + if containers.is_empty() { return Ok(false); + } + let container = match self + .resolve_tracked_sandbox_observation(sandbox_id, sandbox_name, &containers) + .await + { + TrackedSandboxObservation::Container(container) => container, + TrackedSandboxObservation::Conflict(_) | TrackedSandboxObservation::Untracked => { + return Ok(false); + } }; let Some(target) = summary_container_target(&container) else { return Ok(false); }; + if !should_start { + return Ok(true); + } let state = container.state.unwrap_or(ContainerSummaryStateEnum::EMPTY); if !container_state_needs_resume(state) { return Ok(true); @@ -1015,6 +1248,18 @@ impl DockerComputeDriver { } } + pub async fn complete_persisted_delete( + &self, + sandbox_id: &str, + sandbox_name: &str, + ) -> Result { + let result = self.delete_sandbox_inner(sandbox_id, sandbox_name).await; + if result.is_ok() && !sandbox_id.is_empty() { + remove_docker_instance_ownership(sandbox_id, &self.config); + } + result + } + pub async fn stop_managed_containers_on_shutdown(&self) -> Result { let containers = self.list_managed_container_summaries().await?; let targets = containers @@ -1078,7 +1323,10 @@ impl DockerComputeDriver { Ok(stopped) } - async fn reserve_pending_sandbox(&self, sandbox: &DriverSandbox) -> Result<(), Status> { + async fn reserve_pending_sandbox( + &self, + sandbox: &DriverSandbox, + ) -> Result, Status> { let mut pending = self.pending.lock().await; if pending .values() @@ -1087,6 +1335,7 @@ impl DockerComputeDriver { return Err(Status::already_exists("sandbox already exists")); } + let cancelled = Arc::new(AtomicBool::new(false)); pending.insert( sandbox.id.clone(), PendingSandboxRecord { @@ -1097,9 +1346,10 @@ impl DockerComputeDriver { false, ), task: None, + cancelled: cancelled.clone(), }, ); - Ok(()) + Ok(cancelled) } async fn pending_snapshot( @@ -1136,7 +1386,9 @@ impl DockerComputeDriver { let id = pending.iter().find_map(|(id, record)| { pending_sandbox_matches(&record.sandbox, sandbox_id, sandbox_name).then(|| id.clone()) })?; - pending.remove(&id) + let record = pending.remove(&id)?; + record.cancelled.store(true, AtomicOrdering::Release); + Some(record) } async fn fail_pending_sandbox( @@ -1178,13 +1430,30 @@ impl DockerComputeDriver { sandbox_id: &str, sandbox_name: &str, ) -> Result<(), Status> { - if let Some(summary) = self - .find_managed_container_summary(sandbox_id, sandbox_name) + let Some(tracked_id) = self + .resolve_tracked_sandbox_id(sandbox_id, sandbox_name) .await? - && let Some(sandbox) = - sandbox_from_container_summary(&summary, self.supervisor_readiness.as_ref()) + else { + return Ok(()); + }; + let containers = self + .find_managed_container_summaries(sandbox_id, sandbox_name) + .await?; + match self + .resolve_tracked_sandbox_observation(&tracked_id, sandbox_name, &containers) + .await { - self.publish_sandbox_snapshot(sandbox); + TrackedSandboxObservation::Container(summary) => { + if let Some(sandbox) = + sandbox_from_container_summary(&summary, self.supervisor_readiness.as_ref()) + { + self.publish_sandbox_snapshot(sandbox); + } + } + TrackedSandboxObservation::Conflict(sandbox) => { + self.publish_sandbox_snapshot(sandbox); + } + TrackedSandboxObservation::Untracked => {} } Ok(()) } @@ -1207,15 +1476,17 @@ impl DockerComputeDriver { }); } - fn publish_platform_event(&self, sandbox_id: String, event: DriverPlatformEvent) { - let _ = self.events.send(WatchSandboxesEvent { - payload: Some(watch_sandboxes_event::Payload::PlatformEvent( - WatchSandboxesPlatformEvent { - sandbox_id, - event: Some(event), - }, - )), - }); + fn publish_platform_event(&self, sandbox_id: String, event: DriverPlatformEvent) -> bool { + self.events + .send(WatchSandboxesEvent { + payload: Some(watch_sandboxes_event::Payload::PlatformEvent( + WatchSandboxesPlatformEvent { + sandbox_id, + event: Some(event), + }, + )), + }) + .is_ok() } fn publish_docker_progress( @@ -1327,16 +1598,899 @@ impl DockerComputeDriver { )) } - async fn find_managed_container_summary( + async fn track_created_instance( + &self, + sandbox: &DriverSandbox, + instance_id: &str, + instance_generation: &str, + cancellation: Option<&AtomicBool>, + ) -> Result<(), Status> { + if instance_id.is_empty() { + return Err(Status::internal( + "Docker returned an empty container ID; instance ownership was not recorded", + )); + } + let tracked = TrackedSandboxInstances { + sandbox_id: sandbox.id.clone(), + sandbox_name: sandbox.name.clone(), + namespace: self.config.sandbox_namespace.clone(), + authoritative_id: instance_id.to_string(), + retired_ids: HashSet::new(), + instance_generation: instance_generation.to_string(), + replacement_intent: None, + }; + persist_docker_instance_ownership(&tracked, &self.config).map_err(|err| { + Status::internal(format!("persist Docker instance ownership failed: {err}")) + })?; + if cancellation.is_some_and(|cancelled| cancelled.load(AtomicOrdering::Acquire)) { + remove_docker_instance_ownership(&sandbox.id, &self.config); + return Err(Status::cancelled("Docker sandbox creation was cancelled")); + } + { + let mut ownership = self.instances.lock().await; + if cancellation.is_some_and(|cancelled| cancelled.load(AtomicOrdering::Acquire)) { + drop(ownership); + remove_docker_instance_ownership(&sandbox.id, &self.config); + return Err(Status::cancelled("Docker sandbox creation was cancelled")); + } + ownership + .sandboxes + .insert(sandbox.id.clone(), tracked.clone()); + ownership.last_notices.remove(&sandbox.id); + } + Ok(()) + } + + async fn resolve_tracked_sandbox_id( &self, sandbox_id: &str, sandbox_name: &str, - ) -> Result, Status> { - Ok(self - .find_managed_container_summaries(sandbox_id, sandbox_name) - .await? - .into_iter() - .max_by(compare_container_summary_preference)) + ) -> Result, Status> { + let ownership = self.instances.lock().await; + if !sandbox_id.is_empty() { + let Some(tracked) = ownership.sandboxes.get(sandbox_id) else { + return Ok(None); + }; + if !sandbox_name.is_empty() && tracked.sandbox_name != sandbox_name { + return Err(Status::not_found("sandbox identity does not match")); + } + return Ok(Some(sandbox_id.to_string())); + } + + let mut matching_ids = ownership + .sandboxes + .iter() + .filter(|(_, tracked)| tracked.sandbox_name == sandbox_name) + .map(|(id, _)| id.clone()); + let Some(id) = matching_ids.next() else { + return Ok(None); + }; + if matching_ids.next().is_some() { + return Err(Status::failed_precondition( + "sandbox name matches multiple tracked instances", + )); + } + Ok(Some(id)) + } + + async fn track_persisted_instance( + &self, + sandbox_id: &str, + sandbox_name: &str, + expected_instance_id: &str, + ) -> Result<(), Status> { + let expected_instance_id = expected_instance_id.to_string(); + let (instance_id, retired_ids, instance_generation, replacement_intent) = + match load_docker_instance_ownership(sandbox_id, sandbox_name, &self.config) { + Ok(Some(persisted)) + if !persisted.authoritative_id.is_empty() + && (expected_instance_id.is_empty() + || persisted.authoritative_id == expected_instance_id + || persisted.retired_ids.contains(&expected_instance_id)) => + { + let authoritative_id = persisted.authoritative_id; + let retired_ids = persisted + .retired_ids + .into_iter() + .filter(|retired_id| { + !retired_id.is_empty() && retired_id != &authoritative_id + }) + .collect(); + ( + authoritative_id, + retired_ids, + persisted.instance_generation, + persisted.replacement_intent, + ) + } + Ok(Some(persisted)) => { + warn!( + sandbox_id = %sandbox_id, + persisted_instance_id = %persisted.authoritative_id, + expected_instance_id = %expected_instance_id, + "Ignoring stale Docker instance ownership state" + ); + (expected_instance_id, HashSet::new(), String::new(), None) + } + Ok(None) => (expected_instance_id, HashSet::new(), String::new(), None), + Err(err) => { + warn!( + sandbox_id = %sandbox_id, + error = %err, + "Ignoring unreadable Docker instance ownership state" + ); + (expected_instance_id, HashSet::new(), String::new(), None) + } + }; + let tracked = TrackedSandboxInstances { + sandbox_id: sandbox_id.to_string(), + sandbox_name: sandbox_name.to_string(), + namespace: self.config.sandbox_namespace.clone(), + authoritative_id: instance_id, + retired_ids, + instance_generation, + replacement_intent, + }; + + persist_docker_instance_ownership(&tracked, &self.config).map_err(|err| { + Status::internal(format!("persist Docker instance ownership failed: {err}")) + })?; + { + let mut ownership = self.instances.lock().await; + ownership + .sandboxes + .insert(sandbox_id.to_string(), tracked.clone()); + ownership.last_notices.remove(sandbox_id); + } + + Ok(()) + } + + async fn forget_tracked_sandbox(&self, sandbox_id: &str, sandbox_name: &str) { + let tracked_id = match self + .resolve_tracked_sandbox_id(sandbox_id, sandbox_name) + .await + { + Ok(Some(tracked_id)) => Some(tracked_id), + Ok(None) if !sandbox_id.is_empty() => Some(sandbox_id.to_string()), + Ok(None) | Err(_) => None, + }; + let mut ownership = self.instances.lock().await; + if let Some(tracked_id) = tracked_id { + ownership.sandboxes.remove(&tracked_id); + ownership.last_notices.remove(&tracked_id); + drop(ownership); + remove_docker_instance_ownership(&tracked_id, &self.config); + } + } + + pub async fn complete_instance_restore(&self) { + self.instances.lock().await.restoration_complete = true; + } + + async fn resolve_tracked_sandbox_snapshots( + &self, + summaries: &[ContainerSummary], + ) -> Vec { + let mut grouped = HashMap::>::new(); + for summary in summaries { + let Some(sandbox_id) = summary + .labels + .as_ref() + .and_then(|labels| labels.get(LABEL_SANDBOX_ID)) + else { + continue; + }; + grouped + .entry(sandbox_id.clone()) + .or_default() + .push(summary.clone()); + } + + let tracked_ids = { + let ownership = self.instances.lock().await; + ownership.sandboxes.keys().cloned().collect::>() + }; + let mut snapshots = Vec::with_capacity(tracked_ids.len()); + for sandbox_id in &tracked_ids { + let (sandbox_name, namespace) = { + let ownership = self.instances.lock().await; + ownership + .sandboxes + .get(sandbox_id) + .map(|tracked| (tracked.sandbox_name.clone(), tracked.namespace.clone())) + .unwrap_or_default() + }; + let candidates = matching_managed_container_summaries( + grouped.remove(sandbox_id).unwrap_or_default(), + &namespace, + sandbox_id, + &sandbox_name, + ); + match self + .resolve_tracked_sandbox_observation(sandbox_id, &sandbox_name, &candidates) + .await + { + TrackedSandboxObservation::Container(summary) => { + if let Some(sandbox) = + sandbox_from_container_summary(&summary, self.supervisor_readiness.as_ref()) + { + snapshots.push(sandbox); + } + } + TrackedSandboxObservation::Conflict(sandbox) => snapshots.push(sandbox), + TrackedSandboxObservation::Untracked => {} + } + } + + let restoration_complete = self.instances.lock().await.restoration_complete; + if restoration_complete { + for (sandbox_id, candidates) in grouped { + let mut candidate_ids = candidates + .iter() + .filter_map(container_summary_id) + .map(str::to_string) + .collect::>(); + candidate_ids.sort(); + let key = format!("unowned:{}", candidate_ids.join(",")); + let should_publish = { + let mut ownership = self.instances.lock().await; + if ownership.last_notices.get(&sandbox_id) == Some(&key) { + false + } else { + ownership + .last_notices + .insert(sandbox_id.clone(), key.clone()); + true + } + }; + if should_publish { + let notice = OwnershipNotice { + sandbox_id: sandbox_id.clone(), + key: key.clone(), + reason: "UnownedSandboxContainersIgnored", + message: format!( + "Ignored Docker containers with sandbox identity {sandbox_id} because no durable OpenShell instance owns them" + ), + metadata: HashMap::from([( + "candidate_instance_ids".to_string(), + candidate_ids.join(","), + )]), + }; + if !self.publish_ownership_notice(notice) { + let mut ownership = self.instances.lock().await; + if ownership.last_notices.get(&sandbox_id) == Some(&key) { + ownership.last_notices.remove(&sandbox_id); + } + } + } + } + } + + snapshots + } + + async fn resolve_tracked_sandbox_observation( + &self, + sandbox_id: &str, + _sandbox_name: &str, + summaries: &[ContainerSummary], + ) -> TrackedSandboxObservation { + let (observation, notice, should_publish) = { + let mut ownership = self.instances.lock().await; + let Some(tracked) = ownership.sandboxes.get_mut(sandbox_id) else { + return TrackedSandboxObservation::Untracked; + }; + let resolution = resolve_tracked_container(summaries, tracked); + let (observation, notice) = match resolution { + TrackedContainerResolution::OwnershipUnresolved { mut candidate_ids } => { + candidate_ids.sort(); + let message = if candidate_ids.is_empty() { + "Docker sandbox ownership is unresolved because the durable record has no instance ID" + .to_string() + } else { + format!( + "Refused Docker candidates {} because the durable sandbox record has no authoritative instance ID", + candidate_ids.join(",") + ) + }; + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerOwnershipUnresolved", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("unresolved:{}", candidate_ids.join(",")), + reason: "ContainerOwnershipUnresolved", + message, + metadata: HashMap::from([( + "candidate_instance_ids".to_string(), + candidate_ids.join(","), + )]), + }), + ) + } + TrackedContainerResolution::Authoritative { + index, + unexpected_ids, + } => { + let notice = if unexpected_ids.is_empty() { + None + } else { + let mut ids = unexpected_ids; + ids.sort(); + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("duplicates:{}", ids.join(",")), + reason: "ContainerIdentityConflict", + message: format!( + "Ignored duplicate Docker sandbox instances {}; retaining authoritative instance {}", + ids.join(","), + tracked.authoritative_id + ), + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ("candidate_instance_ids".to_string(), ids.join(",")), + ]), + }) + }; + ( + TrackedSandboxObservation::Container(summaries[index].clone()), + notice, + ) + } + TrackedContainerResolution::ReplacementIntentRequired { + incumbent_index, + replacement_index, + backup_name, + } => { + let incumbent_id = container_summary_id(&summaries[incumbent_index]) + .unwrap_or_default() + .to_string(); + let replacement_id = container_summary_id(&summaries[replacement_index]) + .unwrap_or_default() + .to_string(); + let mut next = tracked.clone(); + next.replacement_intent = Some(DockerReplacementIntent { + incumbent_id: incumbent_id.clone(), + replacement_id: replacement_id.clone(), + backup_name, + }); + match persist_docker_instance_ownership(&next, &self.config) { + Ok(()) => { + *tracked = next; + let message = format!( + "Recorded replacement intent from Docker instance {incumbent_id} to canonical instance {replacement_id}; adoption requires a subsequent consistent observation" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerReplacementPending", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!( + "replacement-intent:{incumbent_id}:{replacement_id}" + ), + reason: "SandboxReplacementIntentRecorded", + message, + metadata: HashMap::from([ + ("authoritative_instance_id".to_string(), incumbent_id), + ("replacement_instance_id".to_string(), replacement_id), + ]), + }), + ) + } + Err(err) => { + let message = format!( + "Refused to authorize Docker replacement {replacement_id} because replacement intent could not be persisted: {err}" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerOwnershipPersistFailed", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("intent-persist-failed:{replacement_id}"), + reason: "ContainerOwnershipPersistFailed", + message, + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ("candidate_instance_ids".to_string(), replacement_id), + ]), + }), + ) + } + } + } + TrackedContainerResolution::ReplacementIntentRequiredWithoutIncumbent { + replacement_index, + } => { + let incumbent_id = tracked.authoritative_id.clone(); + let replacement_id = container_summary_id(&summaries[replacement_index]) + .unwrap_or_default() + .to_string(); + let mut next = tracked.clone(); + next.replacement_intent = Some(DockerReplacementIntent { + incumbent_id: incumbent_id.clone(), + replacement_id: replacement_id.clone(), + backup_name: String::new(), + }); + match persist_docker_instance_ownership(&next, &self.config) { + Ok(()) => { + *tracked = next; + let message = format!( + "Recorded replacement intent from missing Docker instance {incumbent_id} to canonical instance {replacement_id} after validating its driver-issued generation; adoption requires a subsequent consistent observation" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerReplacementPending", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!( + "replacement-intent-without-incumbent:{incumbent_id}:{replacement_id}" + ), + reason: "SandboxReplacementIntentRecorded", + message, + metadata: HashMap::from([ + ("authoritative_instance_id".to_string(), incumbent_id), + ("replacement_instance_id".to_string(), replacement_id), + ( + "replacement_basis".to_string(), + "driver-issued-instance-generation".to_string(), + ), + ]), + }), + ) + } + Err(err) => { + let message = format!( + "Refused to authorize Docker replacement {replacement_id} because replacement intent could not be persisted: {err}" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerOwnershipPersistFailed", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("intent-persist-failed:{replacement_id}"), + reason: "ContainerOwnershipPersistFailed", + message, + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ("candidate_instance_ids".to_string(), replacement_id), + ]), + }), + ) + } + } + } + TrackedContainerResolution::ReplacementIntentInvalidated { mut candidate_ids } => { + candidate_ids.sort(); + let invalidated = tracked.replacement_intent.clone(); + let mut next = tracked.clone(); + next.replacement_intent = None; + match persist_docker_instance_ownership(&next, &self.config) { + Ok(()) => { + *tracked = next; + let message = format!( + "Invalidated Docker replacement intent because the exact safe overlap was interrupted; observed candidates {}", + candidate_ids.join(",") + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerReplacementPending", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!( + "replacement-intent-invalidated:{}", + candidate_ids.join(",") + ), + reason: "SandboxReplacementIntentInvalidated", + message, + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ( + "candidate_instance_ids".to_string(), + candidate_ids.join(","), + ), + ( + "invalidated_replacement_instance_id".to_string(), + invalidated + .map(|intent| intent.replacement_id) + .unwrap_or_default(), + ), + ]), + }), + ) + } + Err(err) => { + let message = format!( + "Refused Docker candidates because stale replacement intent could not be cleared: {err}" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerOwnershipPersistFailed", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: "intent-clear-persist-failed".to_string(), + reason: "ContainerOwnershipPersistFailed", + message, + metadata: HashMap::from([( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + )]), + }), + ) + } + } + } + TrackedContainerResolution::ReplacementFromIntent { replacement_index } => { + let replacement_id = container_summary_id(&summaries[replacement_index]) + .unwrap_or_default() + .to_string(); + let incumbent_id = tracked.replacement_intent.as_ref().map_or_else( + || tracked.authoritative_id.clone(), + |intent| intent.incumbent_id.clone(), + ); + let mut next = tracked.clone(); + next.retired_ids.insert(incumbent_id.clone()); + next.authoritative_id.clone_from(&replacement_id); + next.replacement_intent = None; + match persist_docker_instance_ownership(&next, &self.config) { + Ok(()) => { + *tracked = next; + ( + TrackedSandboxObservation::Container( + summaries[replacement_index].clone(), + ), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!( + "replacement-completed:{incumbent_id}:{replacement_id}" + ), + reason: "SandboxInstanceReplaced", + message: format!( + "Completed authorized Docker replacement {incumbent_id} to canonical instance {replacement_id} after the retained backup was removed" + ), + metadata: HashMap::from([ + ("previous_instance_id".to_string(), incumbent_id), + ("authoritative_instance_id".to_string(), replacement_id), + ]), + }), + ) + } + Err(err) => { + let message = format!( + "Refused to complete authorized Docker replacement {replacement_id} because ownership intent could not be persisted: {err}" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerOwnershipPersistFailed", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("replacement-complete-failed:{replacement_id}"), + reason: "ContainerOwnershipPersistFailed", + message, + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ("candidate_instance_ids".to_string(), replacement_id), + ]), + }), + ) + } + } + } + TrackedContainerResolution::Replacement { + incumbent_index, + replacement_index, + } => { + let incumbent_id = container_summary_id(&summaries[incumbent_index]) + .unwrap_or_default() + .to_string(); + let replacement_id = container_summary_id(&summaries[replacement_index]) + .unwrap_or_default() + .to_string(); + let mut next = tracked.clone(); + next.retired_ids.insert(incumbent_id.clone()); + next.authoritative_id.clone_from(&replacement_id); + next.replacement_intent = None; + match persist_docker_instance_ownership(&next, &self.config) { + Ok(()) => { + *tracked = next; + ( + TrackedSandboxObservation::Container( + summaries[replacement_index].clone(), + ), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("replacement:{incumbent_id}:{replacement_id}"), + reason: "SandboxInstanceReplaced", + message: format!( + "Accepted canonical Docker instance {replacement_id} as the replacement for retained compatibility backup {incumbent_id}" + ), + metadata: HashMap::from([ + ("previous_instance_id".to_string(), incumbent_id), + ("authoritative_instance_id".to_string(), replacement_id), + ]), + }), + ) + } + Err(err) => { + let message = format!( + "Refused Docker replacement {replacement_id} because ownership intent could not be persisted: {err}" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerOwnershipPersistFailed", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("persist-failed:{replacement_id}"), + reason: "ContainerOwnershipPersistFailed", + message, + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ("candidate_instance_ids".to_string(), replacement_id), + ]), + }), + ) + } + } + } + TrackedContainerResolution::HandoffPending { + incumbent_index, + mut candidate_ids, + } => { + candidate_ids.sort(); + let incumbent_id = container_summary_id(&summaries[incumbent_index]) + .unwrap_or_default() + .to_string(); + let message = if candidate_ids.is_empty() { + format!( + "Docker instance {incumbent_id} is retained as a stopped compatibility backup; waiting for one canonical replacement" + ) + } else { + format!( + "Docker instance {incumbent_id} is retained as a stopped compatibility backup, but replacement candidates {} are ambiguous", + candidate_ids.join(",") + ) + }; + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerReplacementPending", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!( + "handoff-pending:{incumbent_id}:{}", + candidate_ids.join(",") + ), + reason: "ContainerReplacementPending", + message, + metadata: HashMap::from([ + ("authoritative_instance_id".to_string(), incumbent_id), + ( + "candidate_instance_ids".to_string(), + candidate_ids.join(","), + ), + ]), + }), + ) + } + TrackedContainerResolution::Rollback { index } => { + let failed_instance_id = tracked.authoritative_id.clone(); + let restored_instance_id = container_summary_id(&summaries[index]) + .unwrap_or_default() + .to_string(); + let mut next = tracked.clone(); + next.authoritative_id.clone_from(&restored_instance_id); + next.retired_ids.remove(&restored_instance_id); + next.retired_ids.insert(failed_instance_id.clone()); + next.replacement_intent = None; + match persist_docker_instance_ownership(&next, &self.config) { + Ok(()) => { + *tracked = next; + ( + TrackedSandboxObservation::Container(summaries[index].clone()), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!( + "rollback:{failed_instance_id}:{restored_instance_id}" + ), + reason: "SandboxInstanceRollback", + message: format!( + "Restored canonical Docker instance {restored_instance_id} after replacement {failed_instance_id} disappeared" + ), + metadata: HashMap::from([ + ("failed_instance_id".to_string(), failed_instance_id), + ( + "authoritative_instance_id".to_string(), + restored_instance_id, + ), + ]), + }), + ) + } + Err(err) => { + let message = format!( + "Refused Docker rollback to {restored_instance_id} because ownership intent could not be persisted: {err}" + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerOwnershipPersistFailed", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("rollback-persist-failed:{restored_instance_id}"), + reason: "ContainerOwnershipPersistFailed", + message, + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ( + "candidate_instance_ids".to_string(), + restored_instance_id, + ), + ]), + }), + ) + } + } + } + TrackedContainerResolution::Missing => { + let message = format!( + "Authoritative Docker instance {} is missing; refusing out-of-band replacement until an overlapping handoff is observed", + tracked.authoritative_id + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerInstanceMissing", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("missing:{}", tracked.authoritative_id), + reason: "ContainerInstanceMissing", + message, + metadata: HashMap::from([( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + )]), + }), + ) + } + TrackedContainerResolution::Conflict { mut candidate_ids } => { + candidate_ids.sort(); + let message = format!( + "Refused unexpected Docker replacement candidates {} because authoritative instance {} was not retained for an overlapping handoff", + candidate_ids.join(","), + tracked.authoritative_id + ); + ( + TrackedSandboxObservation::Conflict(ownership_conflict_snapshot( + sandbox_id, + tracked, + "ContainerIdentityConflict", + &message, + )), + Some(OwnershipNotice { + sandbox_id: sandbox_id.to_string(), + key: format!("conflict:{}", candidate_ids.join(",")), + reason: "ContainerIdentityConflict", + message, + metadata: HashMap::from([ + ( + "authoritative_instance_id".to_string(), + tracked.authoritative_id.clone(), + ), + ( + "candidate_instance_ids".to_string(), + candidate_ids.join(","), + ), + ]), + }), + ) + } + }; + + let should_publish = if let Some(notice) = notice.as_ref() { + if ownership.last_notices.get(sandbox_id) == Some(¬ice.key) { + false + } else { + ownership + .last_notices + .insert(sandbox_id.to_string(), notice.key.clone()); + true + } + } else { + ownership.last_notices.remove(sandbox_id); + false + }; + (observation, notice, should_publish) + }; + + if should_publish && let Some(notice) = notice { + let sandbox_id = notice.sandbox_id.clone(); + let key = notice.key.clone(); + if !self.publish_ownership_notice(notice) { + let mut ownership = self.instances.lock().await; + if ownership.last_notices.get(&sandbox_id) == Some(&key) { + ownership.last_notices.remove(&sandbox_id); + } + } + } + observation + } + + fn publish_ownership_notice(&self, notice: OwnershipNotice) -> bool { + warn!( + sandbox_id = %notice.sandbox_id, + reason = notice.reason, + message = %notice.message, + "Docker sandbox instance ownership event" + ); + let mut event = platform_event("docker", "Warning", notice.reason, notice.message); + event.metadata = notice.metadata; + self.publish_platform_event(notice.sandbox_id, event) } async fn ensure_image_available(&self, sandbox_id: &str, image: &str) -> Result<(), Status> { @@ -1511,7 +2665,10 @@ impl ComputeDriver for DockerComputeDriver { let request = request.into_inner(); require_sandbox_identifier(&request.sandbox_id, &request.sandbox_name)?; - let event_sandbox_id = request.sandbox_id.clone(); + let event_sandbox_id = self + .resolve_tracked_sandbox_id(&request.sandbox_id, &request.sandbox_name) + .await? + .unwrap_or_else(|| request.sandbox_id.clone()); let deleted = self .delete_sandbox_inner(&request.sandbox_id, &request.sandbox_name) .await?; @@ -1619,8 +2776,9 @@ fn pending_sandbox_snapshot( } fn pending_sandbox_matches(sandbox: &DriverSandbox, sandbox_id: &str, sandbox_name: &str) -> bool { - (!sandbox_id.is_empty() && sandbox.id == sandbox_id) - || (!sandbox_name.is_empty() && sandbox.name == sandbox_name) + (!sandbox_id.is_empty() || !sandbox_name.is_empty()) + && (sandbox_id.is_empty() || sandbox.id == sandbox_id) + && (sandbox_name.is_empty() || sandbox.name == sandbox_name) } fn provisioning_condition() -> DriverCondition { @@ -2190,6 +3348,149 @@ fn cleanup_sandbox_token_file_by_id(sandbox_id: &str, config: &DockerDriverRunti } } +fn docker_instance_state_path( + sandbox_id: &str, + config: &DockerDriverRuntimeConfig, +) -> Option { + if config.instance_state_dir.as_os_str().is_empty() { + return None; + } + let digest = Sha256::digest(sandbox_id.as_bytes()); + Some(config.instance_state_dir.join(format!("{digest:x}.json"))) +} + +fn load_docker_instance_ownership( + sandbox_id: &str, + sandbox_name: &str, + config: &DockerDriverRuntimeConfig, +) -> Result, String> { + let Some(path) = docker_instance_state_path(sandbox_id, config) else { + return Ok(None); + }; + let bytes = match std::fs::read(&path) { + Ok(bytes) => bytes, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(format!("read {}: {err}", path.display())), + }; + let persisted: PersistedDockerInstanceOwnership = serde_json::from_slice(&bytes) + .map_err(|err| format!("decode {}: {err}", path.display()))?; + if persisted.version != 1 { + return Err(format!( + "decode {}: unsupported ownership state version {}", + path.display(), + persisted.version + )); + } + if persisted.sandbox_id != sandbox_id || persisted.sandbox_name != sandbox_name { + return Err(format!( + "decode {}: ownership identity does not match sandbox", + path.display() + )); + } + if persisted.namespace != config.sandbox_namespace { + return Err(format!( + "decode {}: ownership namespace does not match driver namespace", + path.display() + )); + } + Ok(Some(persisted)) +} + +fn persist_docker_instance_ownership( + tracked: &TrackedSandboxInstances, + config: &DockerDriverRuntimeConfig, +) -> Result<(), String> { + let Some(path) = docker_instance_state_path(&tracked.sandbox_id, config) else { + return Ok(()); + }; + let parent = path + .parent() + .ok_or_else(|| format!("ownership state path {} has no parent", path.display()))?; + openshell_core::paths::create_dir_restricted(parent) + .map_err(|err| format!("create {}: {err}", parent.display()))?; + let mut retired_ids = tracked.retired_ids.iter().cloned().collect::>(); + retired_ids.sort(); + let bytes = serde_json::to_vec(&PersistedDockerInstanceOwnership { + version: 1, + sandbox_id: tracked.sandbox_id.clone(), + sandbox_name: tracked.sandbox_name.clone(), + namespace: tracked.namespace.clone(), + authoritative_id: tracked.authoritative_id.clone(), + retired_ids, + instance_generation: tracked.instance_generation.clone(), + replacement_intent: tracked.replacement_intent.clone(), + }) + .map_err(|err| format!("encode {}: {err}", path.display()))?; + let mut temp = tempfile::Builder::new() + .prefix(".instance-ownership-") + .tempfile_in(parent) + .map_err(|err| { + format!( + "create temporary ownership state in {}: {err}", + parent.display() + ) + })?; + temp.write_all(&bytes) + .map_err(|err| format!("write temporary ownership state: {err}"))?; + openshell_core::paths::set_file_owner_only(temp.path()) + .map_err(|err| format!("restrict temporary ownership state: {err}"))?; + temp.as_file() + .sync_all() + .map_err(|err| format!("sync temporary ownership state: {err}"))?; + temp.persist(&path) + .map_err(|err| format!("persist {}: {}", path.display(), err.error))?; + if let Err(err) = sync_directory(parent) { + // The rename already committed the new ownership state. Keep the + // in-memory transition aligned with that file and report only the + // reduced host-power-loss durability of the directory entry. + warn!( + path = %path.display(), + error = %err, + "Failed to sync committed Docker instance ownership directory" + ); + } + Ok(()) +} + +fn remove_docker_instance_ownership(sandbox_id: &str, config: &DockerDriverRuntimeConfig) { + let Some(path) = docker_instance_state_path(sandbox_id, config) else { + return; + }; + match std::fs::remove_file(&path) { + Ok(()) => { + if let Some(parent) = path.parent() + && let Err(err) = sync_directory(parent) + { + warn!( + sandbox_id = %sandbox_id, + path = %path.display(), + error = %err, + "Failed to sync Docker instance ownership state removal" + ); + } + } + Err(err) if err.kind() == std::io::ErrorKind::NotFound => {} + Err(err) => { + warn!( + sandbox_id = %sandbox_id, + path = %path.display(), + error = %err, + "Failed to remove Docker instance ownership state" + ); + } + } +} + +fn docker_instance_namespace_dir_name(namespace: &str) -> String { + format!("{:x}", Sha256::digest(namespace.as_bytes())) +} + +fn sync_directory(path: &Path) -> Result<(), String> { + std::fs::File::open(path) + .and_then(|directory| directory.sync_all()) + .map_err(|err| format!("sync directory {}: {err}", path.display())) +} + fn build_environment(sandbox: &DriverSandbox, config: &DockerDriverRuntimeConfig) -> Vec { let mut environment = HashMap::from([ ("HOME".to_string(), "/root".to_string()), @@ -2792,22 +4093,8 @@ fn parse_memory_limit(value: &str) -> Result, Status> { Ok(Some((amount * multiplier).round() as i64)) } -fn compare_container_summary_preference( - left: &ContainerSummary, - right: &ContainerSummary, -) -> Ordering { - let left_running = matches!(left.state, Some(ContainerSummaryStateEnum::RUNNING)); - let right_running = matches!(right.state, Some(ContainerSummaryStateEnum::RUNNING)); - - left_running - .cmp(&right_running) - .then_with(|| { - is_canonical_container_summary(left).cmp(&is_canonical_container_summary(right)) - }) - .then_with(|| left.created.cmp(&right.created)) - // Docker container IDs are unique and make equal-state, equal-time - // selection deterministic regardless of daemon response order. - .then_with(|| left.id.cmp(&right.id)) +fn container_summary_id(summary: &ContainerSummary) -> Option<&str> { + summary.id.as_deref().filter(|id| !id.is_empty()) } fn is_canonical_container_summary(summary: &ContainerSummary) -> bool { @@ -2832,6 +4119,332 @@ fn is_canonical_container_summary(summary: &ContainerSummary) -> bool { }) } +fn nemoclaw_gpu_backup_container_name( + summary: &ContainerSummary, + sandbox_id: &str, + sandbox_name: &str, +) -> Option { + let canonical_name = container_name_for_sandbox(&DriverSandbox { + id: sandbox_id.to_string(), + name: sandbox_name.to_string(), + ..Default::default() + }); + let expected_prefix = format!("{canonical_name}{NEMOCLAW_GPU_BACKUP_MARKER}"); + summary.names.as_ref().and_then(|names| { + names.iter().find_map(|name| { + let normalized = name.trim_start_matches('/'); + normalized + .strip_prefix(&expected_prefix) + .is_some_and(|suffix| { + !suffix.is_empty() && suffix.bytes().all(|byte| byte.is_ascii_digit()) + }) + .then(|| normalized.to_string()) + }) + }) +} + +fn is_nemoclaw_gpu_backup_container( + summary: &ContainerSummary, + sandbox_id: &str, + sandbox_name: &str, +) -> bool { + nemoclaw_gpu_backup_container_name(summary, sandbox_id, sandbox_name).is_some() +} + +fn container_instance_generation(summary: &ContainerSummary) -> Option<&str> { + summary + .labels + .as_ref() + .and_then(|labels| labels.get(LABEL_INSTANCE_GENERATION)) + .map(String::as_str) + .filter(|token| !token.is_empty()) +} + +fn container_state_allows_replacement_intent(state: Option) -> bool { + matches!( + state, + Some( + ContainerSummaryStateEnum::CREATED + | ContainerSummaryStateEnum::RUNNING + | ContainerSummaryStateEnum::RESTARTING + ) + ) +} + +fn container_state_allows_authorized_transition(state: Option) -> bool { + matches!( + state, + Some( + ContainerSummaryStateEnum::CREATED + | ContainerSummaryStateEnum::RUNNING + | ContainerSummaryStateEnum::RESTARTING + | ContainerSummaryStateEnum::EXITED + ) + ) +} + +fn replacement_lineage_matches( + incumbent: &ContainerSummary, + replacement: &ContainerSummary, + tracked: &TrackedSandboxInstances, +) -> bool { + !tracked.instance_generation.is_empty() + && container_instance_generation(incumbent) == Some(tracked.instance_generation.as_str()) + && container_instance_generation(replacement) == Some(tracked.instance_generation.as_str()) +} + +fn replacement_intent_observation_matches( + summaries: &[ContainerSummary], + tracked: &TrackedSandboxInstances, + intent: &DockerReplacementIntent, +) -> bool { + if tracked.authoritative_id != intent.incumbent_id || summaries.len() != 2 { + return false; + } + let Some(incumbent) = summaries + .iter() + .find(|summary| container_summary_id(summary) == Some(intent.incumbent_id.as_str())) + else { + return false; + }; + let Some(replacement) = summaries + .iter() + .find(|summary| container_summary_id(summary) == Some(intent.replacement_id.as_str())) + else { + return false; + }; + matches!(incumbent.state, Some(ContainerSummaryStateEnum::EXITED)) + && nemoclaw_gpu_backup_container_name(incumbent, &tracked.sandbox_id, &tracked.sandbox_name) + .as_deref() + == Some(intent.backup_name.as_str()) + && is_canonical_container_summary(replacement) + && container_state_allows_authorized_transition(replacement.state) + && replacement_lineage_matches(incumbent, replacement, tracked) +} + +fn replacement_intent_successor_only_matches( + summaries: &[ContainerSummary], + tracked: &TrackedSandboxInstances, + intent: &DockerReplacementIntent, +) -> bool { + if tracked.authoritative_id != intent.incumbent_id || summaries.len() != 1 { + return false; + } + let replacement = &summaries[0]; + container_summary_id(replacement) == Some(intent.replacement_id.as_str()) + && is_canonical_container_summary(replacement) + && container_state_allows_authorized_transition(replacement.state) + && !tracked.instance_generation.is_empty() + && container_instance_generation(replacement) == Some(tracked.instance_generation.as_str()) +} + +fn resolve_tracked_container( + summaries: &[ContainerSummary], + tracked: &TrackedSandboxInstances, +) -> TrackedContainerResolution { + if tracked.authoritative_id.is_empty() { + return TrackedContainerResolution::OwnershipUnresolved { + candidate_ids: summaries + .iter() + .filter_map(container_summary_id) + .map(str::to_string) + .collect(), + }; + } + if let Some(intent) = tracked.replacement_intent.as_ref() { + if replacement_intent_successor_only_matches(summaries, tracked, intent) { + return TrackedContainerResolution::ReplacementFromIntent { + replacement_index: 0, + }; + } + if !replacement_intent_observation_matches(summaries, tracked, intent) { + return TrackedContainerResolution::ReplacementIntentInvalidated { + candidate_ids: summaries + .iter() + .filter_map(container_summary_id) + .map(str::to_string) + .collect(), + }; + } + } + let authoritative_index = summaries.iter().position(|summary| { + container_summary_id(summary) == Some(tracked.authoritative_id.as_str()) + }); + + if let Some(index) = authoritative_index { + let incumbent = &summaries[index]; + let canonical_successors = summaries + .iter() + .enumerate() + .filter(|(candidate_index, summary)| { + *candidate_index != index + && is_canonical_container_summary(summary) + && container_summary_id(summary).is_some() + && container_state_allows_authorized_transition(summary.state) + }) + .map(|(candidate_index, _)| candidate_index) + .collect::>(); + let incumbent_stopped = matches!(incumbent.state, Some(ContainerSummaryStateEnum::EXITED)); + let incumbent_backup_name = nemoclaw_gpu_backup_container_name( + incumbent, + &tracked.sandbox_id, + &tracked.sandbox_name, + ); + let incumbent_is_backup = incumbent_backup_name.is_some(); + if incumbent_stopped && incumbent_is_backup && canonical_successors.len() == 1 { + let replacement_index = canonical_successors[0]; + let replacement_id = + container_summary_id(&summaries[replacement_index]).unwrap_or_default(); + let replacement_generation_matches = + replacement_lineage_matches(incumbent, &summaries[replacement_index], tracked); + let allowed_ids = [ + container_summary_id(incumbent).unwrap_or_default(), + container_summary_id(&summaries[replacement_index]).unwrap_or_default(), + ] + .into_iter() + .collect::>(); + let has_ambiguous_candidate = summaries.iter().any(|summary| { + let Some(candidate_id) = container_summary_id(summary) else { + return true; + }; + !allowed_ids.contains(candidate_id) + }); + if replacement_generation_matches && !has_ambiguous_candidate { + let backup_name = incumbent_backup_name.unwrap_or_default(); + let intent_matches = tracked.replacement_intent.as_ref().is_some_and(|intent| { + intent.incumbent_id == tracked.authoritative_id + && intent.replacement_id == replacement_id + && intent.backup_name == backup_name + }); + if intent_matches { + return TrackedContainerResolution::Replacement { + incumbent_index: index, + replacement_index, + }; + } + if container_state_allows_replacement_intent(summaries[replacement_index].state) { + return TrackedContainerResolution::ReplacementIntentRequired { + incumbent_index: index, + replacement_index, + backup_name, + }; + } + } + } + if incumbent_stopped && incumbent_is_backup { + return TrackedContainerResolution::HandoffPending { + incumbent_index: index, + candidate_ids: summaries + .iter() + .enumerate() + .filter(|(candidate_index, _)| *candidate_index != index) + .filter_map(|(_, summary)| container_summary_id(summary).map(str::to_string)) + .collect(), + }; + } + + let unexpected_ids = summaries + .iter() + .enumerate() + .filter(|(candidate_index, _)| *candidate_index != index) + .filter_map(|(_, summary)| { + let candidate_id = container_summary_id(summary)?; + let expected_retired_backup = tracked.retired_ids.contains(candidate_id) + && matches!(summary.state, Some(ContainerSummaryStateEnum::EXITED)) + && is_nemoclaw_gpu_backup_container( + summary, + &tracked.sandbox_id, + &tracked.sandbox_name, + ); + (!expected_retired_backup).then(|| candidate_id.to_string()) + }) + .collect(); + return TrackedContainerResolution::Authoritative { + index, + unexpected_ids, + }; + } + + let rollback_candidates = summaries + .iter() + .enumerate() + .filter(|(_, summary)| { + container_summary_id(summary).is_some_and(|id| tracked.retired_ids.contains(id)) + && is_canonical_container_summary(summary) + && container_state_allows_authorized_transition(summary.state) + }) + .map(|(index, _)| index) + .collect::>(); + if rollback_candidates.len() == 1 { + let rollback_index = rollback_candidates[0]; + let rollback_id = container_summary_id(&summaries[rollback_index]).unwrap_or_default(); + let only_known_retired = summaries.iter().all(|summary| { + container_summary_id(summary).is_some_and(|candidate_id| { + candidate_id == rollback_id + || (tracked.retired_ids.contains(candidate_id) + && matches!(summary.state, Some(ContainerSummaryStateEnum::EXITED)) + && is_nemoclaw_gpu_backup_container( + summary, + &tracked.sandbox_id, + &tracked.sandbox_name, + )) + }) + }); + if only_known_retired { + return TrackedContainerResolution::Rollback { + index: rollback_index, + }; + } + } + + if summaries.len() == 1 { + let candidate = &summaries[0]; + if is_canonical_container_summary(candidate) + && container_state_allows_replacement_intent(candidate.state) + && !tracked.instance_generation.is_empty() + && container_instance_generation(candidate) + == Some(tracked.instance_generation.as_str()) + { + return TrackedContainerResolution::ReplacementIntentRequiredWithoutIncumbent { + replacement_index: 0, + }; + } + } + + let candidate_ids = summaries + .iter() + .filter_map(container_summary_id) + .map(str::to_string) + .collect::>(); + if candidate_ids.is_empty() { + TrackedContainerResolution::Missing + } else { + TrackedContainerResolution::Conflict { candidate_ids } + } +} + +fn ownership_conflict_snapshot( + sandbox_id: &str, + tracked: &TrackedSandboxInstances, + reason: &str, + message: &str, +) -> DriverSandbox { + DriverSandbox { + id: sandbox_id.to_string(), + name: tracked.sandbox_name.clone(), + namespace: tracked.namespace.clone(), + spec: None, + status: Some(DriverSandboxStatus { + sandbox_name: tracked.sandbox_name.clone(), + instance_id: tracked.authoritative_id.clone(), + agent_fd: String::new(), + sandbox_fd: String::new(), + conditions: vec![error_condition(reason, message)], + deleting: false, + }), + } +} + fn matching_managed_container_summaries( containers: Vec, sandbox_namespace: &str, @@ -2860,33 +4473,6 @@ fn matching_managed_container_summaries( .collect() } -fn preferred_container_summaries_by_sandbox_id( - summaries: &[ContainerSummary], -) -> HashMap<&str, &ContainerSummary> { - let mut preferred = HashMap::new(); - for summary in summaries { - let Some(sandbox_id) = summary - .labels - .as_ref() - .and_then(|labels| labels.get(LABEL_SANDBOX_ID)) - else { - continue; - }; - - match preferred.entry(sandbox_id.as_str()) { - std::collections::hash_map::Entry::Vacant(entry) => { - entry.insert(summary); - } - std::collections::hash_map::Entry::Occupied(mut entry) => { - if compare_container_summary_preference(summary, entry.get()).is_gt() { - entry.insert(summary); - } - } - } - } - preferred -} - fn sandbox_from_container_summary( summary: &ContainerSummary, readiness: &dyn SupervisorReadiness, @@ -3465,7 +5051,7 @@ fn write_cache_binary_atomic(final_path: &Path, bytes: &[u8]) -> CoreResult<()> dir.display(), )) })?; - std::io::Write::write_all(&mut temp, bytes).map_err(|err| { + Write::write_all(&mut temp, bytes).map_err(|err| { Error::config(format!( "failed to write supervisor binary to temp file: {err}", )) diff --git a/crates/openshell-driver-docker/src/tests.rs b/crates/openshell-driver-docker/src/tests.rs index ffad748a7..25a436935 100644 --- a/crates/openshell-driver-docker/src/tests.rs +++ b/crates/openshell-driver-docker/src/tests.rs @@ -114,6 +114,7 @@ fn runtime_config() -> DockerDriverRuntimeConfig { allow_all_default_gpu: false, sandbox_pids_limit: DEFAULT_SANDBOX_PIDS_LIMIT, enable_bind_mounts: false, + instance_state_dir: PathBuf::new(), } } @@ -187,6 +188,7 @@ fn test_driver_with_config(config: DockerDriverRuntimeConfig) -> DockerComputeDr config, events: broadcast::channel(WATCH_BUFFER).0, pending: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + instances: Arc::new(tokio::sync::Mutex::new(DockerInstanceOwnership::default())), supervisor_readiness: Arc::new(DisconnectedSupervisorReadiness), gpu_selector: Arc::new(CdiGpuDefaultSelector::new( CdiGpuInventory::default(), @@ -1716,6 +1718,10 @@ fn managed_container_summary( (LABEL_SANDBOX_ID.to_string(), sandbox.id), (LABEL_SANDBOX_NAME.to_string(), sandbox.name), (LABEL_SANDBOX_NAMESPACE.to_string(), "default".to_string()), + ( + LABEL_INSTANCE_GENERATION.to_string(), + "instance-generation".to_string(), + ), ])), state: Some(state), created: Some(created), @@ -1723,49 +1729,897 @@ fn managed_container_summary( } } -fn selected_container_id(summaries: &[ContainerSummary]) -> &str { - preferred_container_summaries_by_sandbox_id(summaries) - .get("sbx-1") - .and_then(|summary| summary.id.as_deref()) - .expect("sandbox has a preferred container") +fn tracked_instances(authoritative_id: &str) -> TrackedSandboxInstances { + TrackedSandboxInstances { + sandbox_id: "sbx-1".to_string(), + sandbox_name: "demo".to_string(), + namespace: "default".to_string(), + authoritative_id: authoritative_id.to_string(), + retired_ids: HashSet::new(), + instance_generation: "instance-generation".to_string(), + replacement_intent: None, + } +} + +fn journal_runtime_config(temp: &TempDir, namespace: &str) -> DockerDriverRuntimeConfig { + let mut config = runtime_config(); + config.sandbox_namespace = namespace.to_string(); + config.instance_state_dir = temp + .path() + .join(docker_instance_namespace_dir_name(namespace)); + config +} + +fn rename_as_nemoclaw_backup(summary: &mut ContainerSummary, timestamp: u64) { + let canonical = summary + .names + .as_ref() + .and_then(|names| names.first()) + .expect("canonical name") + .trim_start_matches('/'); + summary.names = Some(vec![format!( + "/{canonical}{NEMOCLAW_GPU_BACKUP_MARKER}{timestamp}" + )]); +} + +fn replacement_ids( + summaries: &[ContainerSummary], + resolution: TrackedContainerResolution, +) -> (&str, &str) { + match resolution { + TrackedContainerResolution::Replacement { + incumbent_index, + replacement_index, + } => ( + container_summary_id(&summaries[incumbent_index]).unwrap(), + container_summary_id(&summaries[replacement_index]).unwrap(), + ), + other => panic!("expected replacement, got {other:?}"), + } +} + +#[test] +fn tracked_container_accepts_exited_backup_to_canonical_handoff_in_both_orders() { + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1_780_491_860_342); + let successor = managed_container_summary("successor", ContainerSummaryStateEnum::CREATED, 20); + let backup_name = incumbent + .names + .as_ref() + .and_then(|names| names.first()) + .expect("backup name") + .trim_start_matches('/') + .to_string(); + let mut tracked = tracked_instances("incumbent"); + + assert!(matches!( + resolve_tracked_container(&[incumbent.clone(), successor.clone()], &tracked), + TrackedContainerResolution::ReplacementIntentRequired { + incumbent_index: 0, + replacement_index: 1, + backup_name: observed_backup_name, + } if observed_backup_name == backup_name + )); + assert!(matches!( + resolve_tracked_container(&[successor.clone(), incumbent.clone()], &tracked), + TrackedContainerResolution::ReplacementIntentRequired { + incumbent_index: 1, + replacement_index: 0, + backup_name: observed_backup_name, + } if observed_backup_name == backup_name + )); + + tracked.replacement_intent = Some(DockerReplacementIntent { + incumbent_id: "incumbent".to_string(), + replacement_id: "successor".to_string(), + backup_name, + }); + + assert_eq!( + replacement_ids( + &[incumbent.clone(), successor.clone()], + resolve_tracked_container(&[incumbent.clone(), successor.clone()], &tracked), + ), + ("incumbent", "successor") + ); + assert_eq!( + replacement_ids( + &[successor.clone(), incumbent.clone()], + resolve_tracked_container(&[successor, incumbent], &tracked), + ), + ("incumbent", "successor") + ); +} + +#[test] +fn tracked_container_requires_driver_lineage_for_handoff() { + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + let mut successor = + managed_container_summary("successor", ContainerSummaryStateEnum::RUNNING, 20); + incumbent + .labels + .as_mut() + .unwrap() + .remove(LABEL_INSTANCE_GENERATION); + successor + .labels + .as_mut() + .unwrap() + .remove(LABEL_INSTANCE_GENERATION); + + assert!(matches!( + resolve_tracked_container( + &[incumbent.clone(), successor.clone()], + &tracked_instances("incumbent") + ), + TrackedContainerResolution::HandoffPending { .. } + )); + let mut legacy = tracked_instances("incumbent"); + legacy.instance_generation.clear(); + assert!(matches!( + resolve_tracked_container(&[incumbent.clone(), successor.clone()], &legacy), + TrackedContainerResolution::HandoffPending { .. } + )); + + incumbent.labels.as_mut().unwrap().insert( + LABEL_INSTANCE_GENERATION.to_string(), + "wrong-generation".to_string(), + ); + successor.labels.as_mut().unwrap().insert( + LABEL_INSTANCE_GENERATION.to_string(), + "wrong-generation".to_string(), + ); + assert!(matches!( + resolve_tracked_container(&[incumbent, successor], &tracked_instances("incumbent")), + TrackedContainerResolution::HandoffPending { .. } + )); +} + +#[test] +fn tracked_container_rejects_terminal_replacement_candidates() { + for state in [ + ContainerSummaryStateEnum::EXITED, + ContainerSummaryStateEnum::DEAD, + ContainerSummaryStateEnum::REMOVING, + ContainerSummaryStateEnum::EMPTY, + ContainerSummaryStateEnum::PAUSED, + ] { + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + let successor = managed_container_summary("successor", state, 20); + assert!(matches!( + resolve_tracked_container(&[incumbent, successor], &tracked_instances("incumbent")), + TrackedContainerResolution::HandoffPending { .. } + )); + } +} + +#[test] +fn tracked_container_invalidates_interrupted_intent_and_accepts_intended_successor_only() { + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + let backup_name = incumbent.names.as_ref().unwrap()[0] + .trim_start_matches('/') + .to_string(); + let successor = managed_container_summary("successor", ContainerSummaryStateEnum::RUNNING, 20); + let mut tracked = tracked_instances("incumbent"); + tracked.replacement_intent = Some(DockerReplacementIntent { + incumbent_id: "incumbent".to_string(), + replacement_id: "successor".to_string(), + backup_name, + }); + + assert_eq!( + resolve_tracked_container(std::slice::from_ref(&successor), &tracked), + TrackedContainerResolution::ReplacementFromIntent { + replacement_index: 0, + } + ); + assert!(matches!( + resolve_tracked_container(&[], &tracked), + TrackedContainerResolution::ReplacementIntentInvalidated { candidate_ids } + if candidate_ids.is_empty() + )); + assert!(matches!( + resolve_tracked_container( + &[incumbent, successor, managed_container_summary( + "third", + ContainerSummaryStateEnum::RUNNING, + 30, + )], + &tracked, + ), + TrackedContainerResolution::ReplacementIntentInvalidated { candidate_ids } + if candidate_ids.len() == 3 + )); +} + +#[test] +fn tracked_container_completes_authorized_intent_when_successor_was_stopped_for_restart() { + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + let backup_name = incumbent.names.as_ref().unwrap()[0] + .trim_start_matches('/') + .to_string(); + let successor = managed_container_summary("successor", ContainerSummaryStateEnum::EXITED, 20); + let mut tracked = tracked_instances("incumbent"); + tracked.replacement_intent = Some(DockerReplacementIntent { + incumbent_id: "incumbent".to_string(), + replacement_id: "successor".to_string(), + backup_name, + }); + + assert!(matches!( + resolve_tracked_container(&[incumbent, successor.clone()], &tracked), + TrackedContainerResolution::Replacement { + incumbent_index: 0, + replacement_index: 1, + } + )); + assert_eq!( + resolve_tracked_container(&[successor], &tracked), + TrackedContainerResolution::ReplacementFromIntent { + replacement_index: 0, + } + ); +} + +#[test] +fn tracked_container_rejects_non_exited_or_wrongly_named_incumbent_handoffs() { + let successor = managed_container_summary("successor", ContainerSummaryStateEnum::RUNNING, 20); + for state in [ + ContainerSummaryStateEnum::RUNNING, + ContainerSummaryStateEnum::RESTARTING, + ContainerSummaryStateEnum::CREATED, + ContainerSummaryStateEnum::PAUSED, + ContainerSummaryStateEnum::DEAD, + ContainerSummaryStateEnum::REMOVING, + ContainerSummaryStateEnum::EMPTY, + ] { + let mut incumbent = managed_container_summary("incumbent", state, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + assert!(matches!( + resolve_tracked_container( + &[incumbent, successor.clone()], + &tracked_instances("incumbent") + ), + TrackedContainerResolution::Authoritative { index: 0, .. } + )); + } + + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + incumbent.names = Some(vec!["/some-unreserved-backup".to_string()]); + assert!(matches!( + resolve_tracked_container(&[incumbent, successor], &tracked_instances("incumbent")), + TrackedContainerResolution::Authoritative { index: 0, .. } + )); +} + +#[test] +fn tracked_container_keeps_known_incumbent_over_running_duplicate() { + let incumbent = managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + let mut duplicate = + managed_container_summary("duplicate", ContainerSummaryStateEnum::RUNNING, 20); + duplicate.names = Some(vec!["/copied-labels-but-not-canonical".to_string()]); + + assert_eq!( + resolve_tracked_container(&[incumbent, duplicate], &tracked_instances("incumbent")), + TrackedContainerResolution::Authoritative { + index: 0, + unexpected_ids: vec!["duplicate".to_string()], + } + ); +} + +#[test] +fn tracked_container_rejects_ambiguous_canonical_successors() { + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + let first = managed_container_summary("first", ContainerSummaryStateEnum::CREATED, 20); + let second = managed_container_summary("second", ContainerSummaryStateEnum::RUNNING, 30); + + assert!(matches!( + resolve_tracked_container( + &[incumbent, first, second], + &tracked_instances("incumbent") + ), + TrackedContainerResolution::HandoffPending { + incumbent_index: 0, + candidate_ids, + } if candidate_ids.len() == 2 + )); +} + +#[test] +fn tracked_container_keeps_stopped_backup_pending_without_auto_resume() { + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + + assert_eq!( + resolve_tracked_container(&[incumbent], &tracked_instances("incumbent")), + TrackedContainerResolution::HandoffPending { + incumbent_index: 0, + candidate_ids: Vec::new(), + } + ); +} + +#[test] +fn tracked_container_requires_generation_and_intent_after_observed_gap() { + let tracked = tracked_instances("incumbent"); + assert_eq!( + resolve_tracked_container(&[], &tracked), + TrackedContainerResolution::Missing + ); + + let replacement = + managed_container_summary("replacement", ContainerSummaryStateEnum::RUNNING, 20); + assert_eq!( + resolve_tracked_container(std::slice::from_ref(&replacement), &tracked), + TrackedContainerResolution::ReplacementIntentRequiredWithoutIncumbent { + replacement_index: 0, + } + ); + + let mut unproven = replacement; + unproven + .labels + .as_mut() + .unwrap() + .remove(LABEL_INSTANCE_GENERATION); + assert_eq!( + resolve_tracked_container(&[unproven], &tracked), + TrackedContainerResolution::Conflict { + candidate_ids: vec!["replacement".to_string()], + } + ); +} + +#[test] +fn tracked_container_allows_only_previously_retired_instance_to_roll_back() { + let mut tracked = tracked_instances("replacement"); + tracked.retired_ids.insert("incumbent".to_string()); + let restored = managed_container_summary("incumbent", ContainerSummaryStateEnum::RUNNING, 10); + + assert_eq!( + resolve_tracked_container(&[restored], &tracked), + TrackedContainerResolution::Rollback { index: 0 } + ); + + let restored_stopped = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + assert_eq!( + resolve_tracked_container(&[restored_stopped], &tracked), + TrackedContainerResolution::Rollback { index: 0 } + ); + + let mut unknown = managed_container_summary("unknown", ContainerSummaryStateEnum::RUNNING, 20); + unknown + .labels + .as_mut() + .unwrap() + .remove(LABEL_INSTANCE_GENERATION); + assert_eq!( + resolve_tracked_container(&[unknown], &tracked), + TrackedContainerResolution::Conflict { + candidate_ids: vec!["unknown".to_string()], + } + ); } #[test] -fn preferred_container_summary_selects_running_in_both_list_orders() { - let running = managed_container_summary("running", ContainerSummaryStateEnum::RUNNING, 10); - let newer_exited = - managed_container_summary("newer-exited", ContainerSummaryStateEnum::EXITED, 20); +fn tracked_container_reports_running_retired_backup_as_conflict() { + let mut tracked = tracked_instances("replacement"); + tracked + .retired_ids + .extend(["incumbent".to_string(), "older".to_string()]); + let mut replacement = + managed_container_summary("replacement", ContainerSummaryStateEnum::RUNNING, 20); + replacement + .labels + .as_mut() + .unwrap() + .remove(LABEL_INSTANCE_GENERATION); + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::RUNNING, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); assert_eq!( - selected_container_id(&[running.clone(), newer_exited.clone()]), - "running" + resolve_tracked_container(&[replacement, incumbent], &tracked), + TrackedContainerResolution::Authoritative { + index: 0, + unexpected_ids: vec!["incumbent".to_string()], + } ); - assert_eq!(selected_container_id(&[newer_exited, running]), "running"); + + let restored = managed_container_summary("incumbent", ContainerSummaryStateEnum::RUNNING, 10); + let mut older = managed_container_summary("older", ContainerSummaryStateEnum::RUNNING, 5); + rename_as_nemoclaw_backup(&mut older, 1000); + assert!(matches!( + resolve_tracked_container(&[restored, older], &tracked), + TrackedContainerResolution::Conflict { .. } + )); +} + +#[test] +fn docker_instance_namespace_directory_is_injective_and_path_safe() { + let slash = docker_instance_namespace_dir_name("a/b"); + let dash = docker_instance_namespace_dir_name("a-b"); + assert_ne!(slash, dash); + assert_ne!(docker_instance_namespace_dir_name(".."), ".."); + assert_ne!(docker_instance_namespace_dir_name("."), "."); + assert_eq!(slash.len(), 64); + assert!(slash.bytes().all(|byte| byte.is_ascii_hexdigit())); +} + +#[test] +fn docker_instance_ownership_journal_round_trips_with_restricted_permissions() { + let temp = TempDir::new().unwrap(); + let config = journal_runtime_config(&temp, "tenant/a"); + let mut tracked = tracked_instances("incumbent"); + tracked.namespace.clone_from(&config.sandbox_namespace); + tracked.retired_ids.insert("retired".to_string()); + tracked.replacement_intent = Some(DockerReplacementIntent { + incumbent_id: "incumbent".to_string(), + replacement_id: "successor".to_string(), + backup_name: "openshell-demo-sbx-1-nemoclaw-gpu-backup-1234".to_string(), + }); + + persist_docker_instance_ownership(&tracked, &config).unwrap(); + let path = docker_instance_state_path(&tracked.sandbox_id, &config).unwrap(); + let persisted = + load_docker_instance_ownership(&tracked.sandbox_id, &tracked.sandbox_name, &config) + .unwrap() + .unwrap(); + assert_eq!(persisted.namespace, "tenant/a"); + assert_eq!(persisted.authoritative_id, "incumbent"); + assert_eq!(persisted.retired_ids, vec!["retired".to_string()]); + assert_eq!(persisted.instance_generation, "instance-generation"); + assert_eq!(persisted.replacement_intent, tracked.replacement_intent); + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + assert_eq!( + fs::metadata(&path).unwrap().permissions().mode() & 0o777, + 0o600 + ); + assert_eq!( + fs::metadata(path.parent().unwrap()) + .unwrap() + .permissions() + .mode() + & 0o777, + 0o700 + ); + } } #[test] -fn preferred_container_summary_selects_newer_same_state_in_both_list_orders() { - let older = managed_container_summary("older", ContainerSummaryStateEnum::EXITED, 10); - let newer = managed_container_summary("newer", ContainerSummaryStateEnum::EXITED, 20); +fn docker_instance_ownership_journal_rejects_identity_namespace_version_and_corruption() { + let temp = TempDir::new().unwrap(); + let config = journal_runtime_config(&temp, "tenant-a"); + let mut tracked = tracked_instances("incumbent"); + tracked.namespace.clone_from(&config.sandbox_namespace); + persist_docker_instance_ownership(&tracked, &config).unwrap(); + + assert!(load_docker_instance_ownership("sbx-1", "wrong-name", &config).is_err()); + let mut other_namespace = config.clone(); + other_namespace.sandbox_namespace = "tenant-b".to_string(); + assert!(load_docker_instance_ownership("sbx-1", "demo", &other_namespace).is_err()); + + let path = docker_instance_state_path("sbx-1", &config).unwrap(); + let mut value: serde_json::Value = serde_json::from_slice(&fs::read(&path).unwrap()).unwrap(); + value["version"] = serde_json::json!(99); + fs::write(&path, serde_json::to_vec(&value).unwrap()).unwrap(); + assert!(load_docker_instance_ownership("sbx-1", "demo", &config).is_err()); + + fs::write(&path, b"not-json").unwrap(); + assert!(load_docker_instance_ownership("sbx-1", "demo", &config).is_err()); +} + +#[tokio::test] +async fn docker_instance_ownership_journal_survives_intent_promotion_and_rollback_store_lag() { + let temp = TempDir::new().unwrap(); + let config = journal_runtime_config(&temp, "default"); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + let first = test_driver_with_config(config.clone()); + first + .track_created_instance(&sandbox, "incumbent", "instance-generation", None) + .await + .unwrap(); + let mut incumbent = + managed_container_summary("incumbent", ContainerSummaryStateEnum::EXITED, 10); + rename_as_nemoclaw_backup(&mut incumbent, 1234); + let successor = managed_container_summary("successor", ContainerSummaryStateEnum::RUNNING, 20); + + assert!(matches!( + first + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + &[incumbent.clone(), successor.clone()], + ) + .await, + TrackedSandboxObservation::Conflict(_) + )); + + let restarted = test_driver_with_config(config.clone()); + restarted + .track_persisted_instance(&sandbox.id, &sandbox.name, "incumbent") + .await + .unwrap(); + let stopped_successor = ContainerSummary { + state: Some(ContainerSummaryStateEnum::EXITED), + ..successor.clone() + }; + assert!(matches!( + restarted + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + std::slice::from_ref(&stopped_successor), + ) + .await, + TrackedSandboxObservation::Container(summary) + if container_summary_id(&summary) == Some("successor") + )); + + let store_still_incumbent = test_driver_with_config(config.clone()); + store_still_incumbent + .track_persisted_instance(&sandbox.id, &sandbox.name, "incumbent") + .await + .unwrap(); + { + let ownership = store_still_incumbent.instances.lock().await; + let tracked = ownership.sandboxes.get(&sandbox.id).unwrap(); + assert_eq!(tracked.authoritative_id, "successor"); + assert!(tracked.retired_ids.contains("incumbent")); + } + + let restored = managed_container_summary("incumbent", ContainerSummaryStateEnum::RUNNING, 30); + assert!(matches!( + store_still_incumbent + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + std::slice::from_ref(&restored), + ) + .await, + TrackedSandboxObservation::Container(summary) + if container_summary_id(&summary) == Some("incumbent") + )); + + let store_still_successor = test_driver_with_config(config); + store_still_successor + .track_persisted_instance(&sandbox.id, &sandbox.name, "successor") + .await + .unwrap(); + let ownership = store_still_successor.instances.lock().await; + let tracked = ownership.sandboxes.get(&sandbox.id).unwrap(); + assert_eq!(tracked.authoritative_id, "incumbent"); + assert!(tracked.retired_ids.contains("successor")); +} + +#[tokio::test] +async fn driver_generation_recovers_replacement_after_observed_gap_and_restart() { + let temp = TempDir::new().unwrap(); + let config = journal_runtime_config(&temp, "default"); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + let first = test_driver_with_config(config.clone()); + first + .track_created_instance(&sandbox, "incumbent", "instance-generation", None) + .await + .unwrap(); + assert!(matches!( + first + .resolve_tracked_sandbox_observation(&sandbox.id, &sandbox.name, &[]) + .await, + TrackedSandboxObservation::Conflict(_) + )); + + let successor = managed_container_summary("successor", ContainerSummaryStateEnum::RUNNING, 20); + assert!(matches!( + first + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + std::slice::from_ref(&successor), + ) + .await, + TrackedSandboxObservation::Conflict(_) + )); + + let restarted = test_driver_with_config(config); + restarted + .track_persisted_instance(&sandbox.id, &sandbox.name, "incumbent") + .await + .unwrap(); + assert!(matches!( + restarted + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + std::slice::from_ref(&successor), + ) + .await, + TrackedSandboxObservation::Container(summary) + if container_summary_id(&summary) == Some("successor") + )); +} + +#[tokio::test] +async fn forgetting_unmapped_sandbox_removes_ownership_journal() { + let temp = TempDir::new().unwrap(); + let config = journal_runtime_config(&temp, "default"); + let tracked = tracked_instances("incumbent"); + persist_docker_instance_ownership(&tracked, &config).unwrap(); + let path = docker_instance_state_path("sbx-1", &config).unwrap(); + assert!(path.exists()); + + let driver = test_driver_with_config(config); + driver.forget_tracked_sandbox("sbx-1", "demo").await; + assert!(!path.exists()); +} + +#[tokio::test] +async fn cancelled_create_does_not_publish_authority_or_leave_journal() { + let temp = TempDir::new().unwrap(); + let config = journal_runtime_config(&temp, "default"); + let driver = test_driver_with_config(config.clone()); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + let cancelled = AtomicBool::new(true); + + let err = driver + .track_created_instance( + &sandbox, + "incumbent", + "instance-generation", + Some(&cancelled), + ) + .await + .unwrap_err(); + assert_eq!(err.code(), tonic::Code::Cancelled); + assert!( + !docker_instance_state_path("sbx-1", &config) + .unwrap() + .exists() + ); + assert!(driver.instances.lock().await.sandboxes.is_empty()); +} + +#[tokio::test] +async fn pending_delete_cancels_reservation_before_provision_start_gate() { + let driver = test_driver_with_config(runtime_config()); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + let cancellation = driver.reserve_pending_sandbox(&sandbox).await.unwrap(); + assert!(!cancellation.load(AtomicOrdering::Acquire)); + + let removed = driver + .remove_pending_sandbox(&sandbox.id, &sandbox.name) + .await + .expect("pending reservation"); + assert!(removed.task.is_none()); + assert!(cancellation.load(AtomicOrdering::Acquire)); + assert!(removed.cancelled.load(AtomicOrdering::Acquire)); +} + +#[tokio::test] +async fn ownership_conflict_notice_is_operator_visible_and_deduplicated() { + let driver = test_driver_with_config(runtime_config()); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + driver + .track_created_instance(&sandbox, "incumbent", "instance-generation", None) + .await + .unwrap(); + let mut replacement = + managed_container_summary("replacement", ContainerSummaryStateEnum::RUNNING, 20); + replacement + .labels + .as_mut() + .unwrap() + .remove(LABEL_INSTANCE_GENERATION); + let mut rx = driver.events.subscribe(); + + for _ in 0..2 { + assert!(matches!( + driver + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + std::slice::from_ref(&replacement), + ) + .await, + TrackedSandboxObservation::Conflict(_) + )); + } + let event = rx.try_recv().expect("first conflict publishes a warning"); + let watch_sandboxes_event::Payload::PlatformEvent(event) = event.payload.unwrap() else { + panic!("expected platform event"); + }; + let event = event.event.unwrap(); + assert_eq!(event.r#type, "Warning"); + assert_eq!(event.reason, "ContainerIdentityConflict"); assert_eq!( - selected_container_id(&[older.clone(), newer.clone()]), - "newer" + event.metadata.get("authoritative_instance_id"), + Some(&"incumbent".to_string()) ); - assert_eq!(selected_container_id(&[newer, older]), "newer"); + assert_eq!( + event.metadata.get("candidate_instance_ids"), + Some(&"replacement".to_string()) + ); + assert!(matches!( + rx.try_recv(), + Err(broadcast::error::TryRecvError::Empty) + )); +} + +#[tokio::test] +async fn ownership_notice_before_subscription_is_retried_for_first_watcher() { + let driver = test_driver_with_config(runtime_config()); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + driver + .track_created_instance(&sandbox, "incumbent", "instance-generation", None) + .await + .unwrap(); + let mut replacement = + managed_container_summary("replacement", ContainerSummaryStateEnum::RUNNING, 20); + replacement + .labels + .as_mut() + .unwrap() + .remove(LABEL_INSTANCE_GENERATION); + + assert!(matches!( + driver + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + std::slice::from_ref(&replacement), + ) + .await, + TrackedSandboxObservation::Conflict(_) + )); + + let mut rx = driver.events.subscribe(); + assert!(matches!( + driver + .resolve_tracked_sandbox_observation( + &sandbox.id, + &sandbox.name, + std::slice::from_ref(&replacement), + ) + .await, + TrackedSandboxObservation::Conflict(_) + )); + let event = rx.try_recv().expect("notice is retried after subscription"); + assert!(matches!( + event.payload, + Some(watch_sandboxes_event::Payload::PlatformEvent(_)) + )); } #[test] -fn preferred_container_summary_selects_canonical_name_for_equal_time_clones() { - let canonical = managed_container_summary("aaa", ContainerSummaryStateEnum::EXITED, 10); - let mut backup = managed_container_summary("zzz", ContainerSummaryStateEnum::EXITED, 10); - backup.names = Some(vec!["/openshell-demo-nemoclaw-gpu-backup-1234".to_string()]); +fn conflict_snapshot_prevents_delete_event_across_recreate_gap() { + let tracked = tracked_instances("incumbent"); + let previous_sandbox = sandbox_from_container_summary( + &managed_container_summary("incumbent", ContainerSummaryStateEnum::RUNNING, 10), + &DisconnectedSupervisorReadiness, + ) + .unwrap(); + let conflict = ownership_conflict_snapshot( + "sbx-1", + &tracked, + "ContainerInstanceMissing", + "authoritative instance is missing", + ); + let previous = HashMap::from([("sbx-1".to_string(), previous_sandbox)]); + let current = HashMap::from([("sbx-1".to_string(), conflict)]); + let (events, mut rx) = broadcast::channel(WATCH_BUFFER); + + emit_snapshot_diff(&events, &previous, ¤t); + + let event = rx.try_recv().expect("conflict snapshot update"); + assert!(matches!( + event.payload, + Some(watch_sandboxes_event::Payload::Sandbox(_)) + )); + assert!(matches!( + rx.try_recv(), + Err(broadcast::error::TryRecvError::Empty) + )); +} + +#[tokio::test] +async fn forgetting_explicitly_deleted_sandbox_removes_retained_authority() { + let driver = test_driver_with_config(runtime_config()); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + driver + .track_created_instance(&sandbox, "incumbent", "instance-generation", None) + .await + .unwrap(); + + driver + .forget_tracked_sandbox(&sandbox.id, &sandbox.name) + .await; + + assert!(matches!( + driver + .resolve_tracked_sandbox_observation(&sandbox.id, &sandbox.name, &[]) + .await, + TrackedSandboxObservation::Untracked + )); +} + +#[tokio::test] +async fn tracked_identity_resolves_name_only_and_rejects_mismatched_name() { + let driver = test_driver_with_config(runtime_config()); + let sandbox = DriverSandbox { + id: "sbx-1".to_string(), + name: "demo".to_string(), + ..Default::default() + }; + driver + .track_created_instance(&sandbox, "incumbent", "instance-generation", None) + .await + .unwrap(); + + assert_eq!( + driver.resolve_tracked_sandbox_id("", "demo").await.unwrap(), + Some("sbx-1".to_string()) + ); + assert_eq!( + driver + .resolve_tracked_sandbox_id("sbx-1", "wrong-name") + .await + .unwrap_err() + .code(), + tonic::Code::NotFound + ); + driver.forget_tracked_sandbox("", "demo").await; assert_eq!( - selected_container_id(&[canonical.clone(), backup.clone()]), - "aaa" + driver.resolve_tracked_sandbox_id("", "demo").await.unwrap(), + None ); - assert_eq!(selected_container_id(&[backup, canonical]), "aaa"); } #[test] diff --git a/crates/openshell-server/src/compute/mod.rs b/crates/openshell-server/src/compute/mod.rs index 3a92cd209..f84cfa913 100644 --- a/crates/openshell-server/src/compute/mod.rs +++ b/crates/openshell-server/src/compute/mod.rs @@ -87,13 +87,46 @@ impl ShutdownCleanup for DockerComputeDriver { /// `Ok(false)` if no backend resource exists. #[tonic::async_trait] trait StartupResume: Send + Sync { - async fn resume_sandbox(&self, sandbox_id: &str, sandbox_name: &str) -> Result; + async fn resume_sandbox( + &self, + sandbox_id: &str, + sandbox_name: &str, + expected_instance_id: &str, + should_start: bool, + ) -> Result; + + async fn complete_delete(&self, sandbox_id: &str, sandbox_name: &str) -> Result; + + async fn complete_restore(&self) -> Result<(), String>; } #[tonic::async_trait] impl StartupResume for DockerComputeDriver { - async fn resume_sandbox(&self, sandbox_id: &str, sandbox_name: &str) -> Result { - Self::resume_sandbox(self, sandbox_id, sandbox_name) + async fn resume_sandbox( + &self, + sandbox_id: &str, + sandbox_name: &str, + expected_instance_id: &str, + should_start: bool, + ) -> Result { + Self::resume_sandbox( + self, + sandbox_id, + sandbox_name, + expected_instance_id, + should_start, + ) + .await + .map_err(|err| err.to_string()) + } + + async fn complete_restore(&self) -> Result<(), String> { + self.complete_instance_restore().await; + Ok(()) + } + + async fn complete_delete(&self, sandbox_id: &str, sandbox_name: &str) -> Result { + self.complete_persisted_delete(sandbox_id, sandbox_name) .await .map_err(|err| err.to_string()) } @@ -739,45 +772,99 @@ impl ComputeRuntime { /// Resume sandboxes whose store records say they should be running. /// Drivers that do not auto-restart compute resources across gateway /// restarts (currently only Docker) implement `StartupResume`. For - /// each sandbox in the store whose phase is not `Deleting` or - /// `Error`, we ask the driver to resume the underlying resource. If - /// the driver reports that the resource no longer exists or fails to - /// start, the sandbox is moved to the `Error` phase so the failure - /// surfaces in the UI. + /// each non-deleting sandbox, we first restore the driver's durable + /// instance ownership. Sandboxes whose phase should be running are then + /// resumed. Ownership conflicts remain in the store and are surfaced by + /// the driver's precise conflict snapshot instead of being treated as a + /// deletion. /// /// Should be called once at gateway startup, before watchers spawn, /// so the watch loop sees the post-resume state on its first poll. pub async fn resume_persisted_sandboxes(&self) -> Result<(), String> { + const RESTORE_PAGE_SIZE: u32 = 1000; + let Some(resume) = &self.startup_resume else { return Ok(()); }; - let records = self - .store - .list(Sandbox::object_type(), 1000, 0) - .await - .map_err(|e| e.to_string())?; - let mut resumed = 0usize; - let mut missing = 0usize; - let mut failed = 0usize; + let mut unresolved = 0usize; + let mut completed_deletes = 0usize; + let mut offset = 0_u32; + let mut records_to_restore = Vec::new(); - for record in records { - let sandbox = match Sandbox::decode(record.payload.as_slice()) { - Ok(sandbox) => sandbox, - Err(err) => { - warn!(error = %err, "Failed to decode sandbox record during startup resume"); - continue; - } - }; + loop { + let records = self + .store + .list(Sandbox::object_type(), RESTORE_PAGE_SIZE, offset) + .await + .map_err(|e| e.to_string())?; + if records.is_empty() { + break; + } + let record_count = records.len(); + offset = offset.saturating_add(u32::try_from(record_count).unwrap_or(u32::MAX)); + records_to_restore.extend(records); + if record_count < RESTORE_PAGE_SIZE as usize { + break; + } + } + + for record in records_to_restore { + let sandbox = Sandbox::decode(record.payload.as_slice()) + .map_err(|err| format!("decode sandbox record during startup resume: {err}"))?; let phase = SandboxPhase::try_from(sandbox.phase()).unwrap_or(SandboxPhase::Unknown); + // Finish a persisted deletion before restoring ownership. This + // covers crashes both before backend cleanup and after backend + // cleanup but before the driver's Deleted event. + if phase == SandboxPhase::Deleting { + resume + .complete_delete(sandbox.object_id(), sandbox.object_name()) + .await?; + self.apply_deleted(sandbox.object_id()).await?; + completed_deletes += 1; + continue; + } + let expected_instance_id = sandbox + .status + .as_ref() + .map(|status| status.agent_pod.as_str()) + .unwrap_or_default(); if !sandbox_phase_should_be_running(phase) { + let resolved = resume + .resume_sandbox( + sandbox.object_id(), + sandbox.object_name(), + expected_instance_id, + false, + ) + .await + .map_err(|err| { + format!( + "restore Docker instance ownership for sandbox {} during gateway startup: {err}", + sandbox.object_id() + ) + })?; + if !resolved { + warn!( + sandbox_id = %sandbox.object_id(), + sandbox_name = %sandbox.object_name(), + ?phase, + "Docker instance ownership is unresolved; retaining durable state" + ); + unresolved += 1; + } continue; } match resume - .resume_sandbox(sandbox.object_id(), sandbox.object_name()) + .resume_sandbox( + sandbox.object_id(), + sandbox.object_name(), + expected_instance_id, + true, + ) .await { Ok(true) => { @@ -790,91 +877,35 @@ impl ComputeRuntime { resumed += 1; } Ok(false) => { - // Backend resource is gone but the store still - // remembers the sandbox. Mark Error so the UI - // surfaces the inconsistency; the reconcile loop - // will eventually prune it after the orphan grace - // period. warn!( sandbox_id = %sandbox.object_id(), sandbox_name = %sandbox.object_name(), - "Cannot resume sandbox: backend resource is missing" + "Cannot resume sandbox: Docker instance ownership is unresolved; retaining durable state" ); - self.mark_sandbox_error( - &sandbox, - "BackendResourceMissing", - "Sandbox container disappeared while the gateway was offline", - ) - .await; - missing += 1; + unresolved += 1; } Err(err) => { - warn!( - sandbox_id = %sandbox.object_id(), - sandbox_name = %sandbox.object_name(), - error = %err, - "Failed to resume sandbox during gateway startup" - ); - self.mark_sandbox_error( - &sandbox, - "ResumeFailed", - &format!("Failed to resume sandbox during gateway startup: {err}"), - ) - .await; - failed += 1; + return Err(format!( + "resume sandbox {} during gateway startup: {err}", + sandbox.object_id() + )); } } } - if resumed > 0 || missing > 0 || failed > 0 { + resume.complete_restore().await?; + + if resumed > 0 || unresolved > 0 || completed_deletes > 0 { info!( resumed, - missing_backend = missing, - failed, + unresolved_ownership = unresolved, + completed_deletes, "Sandbox resume sweep complete" ); } Ok(()) } - async fn mark_sandbox_error(&self, sandbox: &Sandbox, reason: &str, message: &str) { - let _guard = self.sync_lock.lock().await; - let sandbox_id = sandbox.object_id().to_string(); - let reason = reason.to_string(); - let message = message.to_string(); - match self - .store - .update_message_cas::(&sandbox_id, 0, |s| { - s.set_phase(SandboxPhase::Error as i32); - let name = s.object_name().to_string(); - upsert_ready_condition( - &mut s.status, - &name, - SandboxCondition { - r#type: "Ready".to_string(), - status: "False".to_string(), - reason: reason.clone(), - message: message.clone(), - last_transition_time: String::new(), - }, - ); - }) - .await - { - Ok(updated) => { - self.sandbox_index.update_from_sandbox(&updated); - self.sandbox_watch_bus.notify(&sandbox_id); - } - Err(err) => { - warn!( - sandbox_id = %sandbox_id, - error = %err, - "Failed to persist sandbox error state during startup resume" - ); - } - } - } - async fn lease_coordinator(self: Arc, mut shutdown_rx: watch::Receiver) { use lease::{LEASE_ACQUIRE_INTERVAL, LEASE_TTL, ReconcilerLease}; @@ -1139,7 +1170,8 @@ impl ComputeRuntime { let sandbox_name = incoming.name.clone(); let supervisor_promoted = session_connected - && matches!(phase, SandboxPhase::Provisioning | SandboxPhase::Unknown); + && matches!(phase, SandboxPhase::Provisioning | SandboxPhase::Unknown) + && !driver_status_blocks_supervisor_promotion(incoming.status.as_ref()); if supervisor_promoted { phase = SandboxPhase::Ready; } @@ -1205,7 +1237,8 @@ impl ComputeRuntime { .as_ref() .map_or(old_phase, |status| derive_phase(Some(status))); let supervisor_promoted = session_connected - && matches!(phase, SandboxPhase::Provisioning | SandboxPhase::Unknown); + && matches!(phase, SandboxPhase::Provisioning | SandboxPhase::Unknown) + && !driver_status_blocks_supervisor_promotion(incoming.status.as_ref()); if supervisor_promoted { phase = SandboxPhase::Ready; } @@ -1310,6 +1343,9 @@ impl ComputeRuntime { let sandbox_name = sandbox.object_name().to_string(); if connected { + if sandbox_status_blocks_supervisor_promotion(sandbox.status.as_ref()) { + return; + } ensure_supervisor_ready_status(&mut sandbox.status, &sandbox_name); sandbox.set_phase(SandboxPhase::Ready as i32); } else if current_phase == SandboxPhase::Ready { @@ -1949,6 +1985,37 @@ fn derive_phase(status: Option<&DriverSandboxStatus>) -> SandboxPhase { SandboxPhase::Unknown } +fn ownership_reason_blocks_supervisor_promotion(reason: &str) -> bool { + matches!( + reason.to_ascii_lowercase().as_str(), + "containerreplacementpending" + | "containerownershipunresolved" + | "containerinstancemissing" + | "containeridentityconflict" + | "containerownershippersistfailed" + ) +} + +fn driver_status_blocks_supervisor_promotion(status: Option<&DriverSandboxStatus>) -> bool { + status.is_some_and(|status| { + status.conditions.iter().any(|condition| { + condition.r#type == "Ready" + && condition.status.eq_ignore_ascii_case("false") + && ownership_reason_blocks_supervisor_promotion(&condition.reason) + }) + }) +} + +fn sandbox_status_blocks_supervisor_promotion(status: Option<&SandboxStatus>) -> bool { + status.is_some_and(|status| { + status.conditions.iter().any(|condition| { + condition.r#type == "Ready" + && condition.status.eq_ignore_ascii_case("false") + && ownership_reason_blocks_supervisor_promotion(&condition.reason) + }) + }) +} + fn rewrite_user_facing_conditions(status: &mut Option, spec: Option<&SandboxSpec>) { let gpu_requested = spec .and_then(|sandbox_spec| sandbox_spec.resource_requirements.as_ref()) @@ -1993,6 +2060,7 @@ fn is_terminal_failure_reason(reason: &str) -> bool { "starting", "containerstarting", "containercreated", + "containerreplacementpending", "healthcheckstarting", "inspectfailed", ]; @@ -2474,6 +2542,10 @@ mod tests { "ContainerCreated", "Podman created the container before starting it", ), + ( + "ContainerReplacementPending", + "Compatibility backup retained while canonical replacement starts", + ), ]; for (reason, message) in transient_cases { @@ -2886,6 +2958,165 @@ mod tests { assert_eq!(ready.message, "Pod is Ready"); } + #[tokio::test] + async fn replacement_conflict_updates_preserve_durable_and_owned_sandbox_state() { + let runtime = test_runtime(Arc::new(TestDriver::default())).await; + let mut sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Ready); + let metadata = sandbox.metadata.as_mut().unwrap(); + metadata.labels = HashMap::from([ + ("team".to_string(), "nemoclaw".to_string()), + ("purpose".to_string(), "gpu".to_string()), + ]); + metadata.created_at_ms = 4242; + sandbox.spec = Some(SandboxSpec { + resource_requirements: Some(openshell_core::proto::ResourceRequirements { + gpu: Some(openshell_core::proto::GpuResourceRequirements { count: Some(1) }), + }), + ..Default::default() + }); + sandbox.status = Some(SandboxStatus { + sandbox_name: "sandbox-a".to_string(), + agent_pod: "incumbent-container".to_string(), + conditions: vec![SandboxCondition { + r#type: "Ready".to_string(), + status: "True".to_string(), + reason: "DependenciesReady".to_string(), + message: "Supervisor session connected".to_string(), + last_transition_time: String::new(), + }], + current_policy_version: 7, + ..Default::default() + }); + let original_spec = sandbox.spec.clone(); + let original_labels = sandbox.metadata.as_ref().unwrap().labels.clone(); + runtime.store.put_message(&sandbox).await.unwrap(); + runtime.sandbox_index.update_from_sandbox(&sandbox); + runtime + .store + .put( + SANDBOX_SETTINGS_OBJECT_TYPE, + "settings-sb-1", + sandbox.object_name(), + br#"{"revision":1,"settings":{"theme":"dark"}}"#, + None, + ) + .await + .unwrap(); + let session = ssh_session_record("session-1", sandbox.object_id()); + runtime.store.put_message(&session).await.unwrap(); + + let mut watch_rx = runtime.sandbox_watch_bus.subscribe(sandbox.object_id()); + let mut log_rx = runtime.tracing_log_bus.subscribe(sandbox.object_id()); + let mut platform_rx = runtime + .tracing_log_bus + .platform_event_bus + .subscribe(sandbox.object_id()); + runtime + .tracing_log_bus + .publish_external(openshell_core::proto::SandboxLogLine { + sandbox_id: sandbox.object_id().to_string(), + timestamp_ms: 1000, + level: "INFO".to_string(), + target: "ownership-test".to_string(), + message: "retained log".to_string(), + source: "gateway".to_string(), + fields: HashMap::new(), + }); + runtime.tracing_log_bus.platform_event_bus.publish( + sandbox.object_id(), + openshell_core::proto::SandboxStreamEvent { payload: None }, + ); + assert!(log_rx.try_recv().is_ok()); + assert!(platform_rx.try_recv().is_ok()); + + for (instance_id, reason, ready_status) in [ + ("incumbent-container", "ContainerInstanceMissing", "False"), + ("incumbent-container", "ContainerIdentityConflict", "False"), + ("replacement-container", "SupervisorConnected", "True"), + ] { + runtime + .apply_sandbox_update(DriverSandbox { + id: sandbox.object_id().to_string(), + name: sandbox.object_name().to_string(), + namespace: "default".to_string(), + spec: None, + status: Some(DriverSandboxStatus { + sandbox_name: sandbox.object_name().to_string(), + instance_id: instance_id.to_string(), + agent_fd: String::new(), + sandbox_fd: String::new(), + conditions: vec![DriverCondition { + r#type: "Ready".to_string(), + status: ready_status.to_string(), + reason: reason.to_string(), + message: reason.to_string(), + last_transition_time: String::new(), + }], + deleting: false, + }), + }) + .await + .unwrap(); + } + + let stored = runtime + .store + .get_message::(sandbox.object_id()) + .await + .unwrap() + .unwrap(); + assert_eq!(stored.spec, original_spec); + assert_eq!(stored.metadata.as_ref().unwrap().labels, original_labels); + assert_eq!(stored.metadata.as_ref().unwrap().created_at_ms, 4242); + assert_eq!(stored.current_policy_version(), 7); + assert_eq!( + stored.status.as_ref().unwrap().agent_pod, + "replacement-container" + ); + assert!( + runtime + .store + .get_by_name(SANDBOX_SETTINGS_OBJECT_TYPE, sandbox.object_name()) + .await + .unwrap() + .is_some() + ); + assert!( + runtime + .store + .get_message::(session.object_id()) + .await + .unwrap() + .is_some() + ); + assert_eq!( + runtime.tracing_log_bus.tail(sandbox.object_id(), 10).len(), + 1 + ); + assert_eq!( + runtime + .tracing_log_bus + .platform_event_bus + .tail(sandbox.object_id(), 10) + .len(), + 1 + ); + + while watch_rx.try_recv().is_ok() {} + assert!(matches!( + watch_rx.try_recv(), + Err(tokio::sync::broadcast::error::TryRecvError::Empty) + )); + assert!(matches!( + log_rx.try_recv(), + Err(tokio::sync::broadcast::error::TryRecvError::Empty) + )); + assert!(matches!( + platform_rx.try_recv(), + Err(tokio::sync::broadcast::error::TryRecvError::Empty) + )); + } + #[tokio::test] async fn apply_sandbox_update_promotes_connected_supervisor_session_to_ready() { let runtime = test_runtime(Arc::new(TestDriver::default())).await; @@ -2933,6 +3164,48 @@ mod tests { assert_eq!(ready.message, "Supervisor session connected"); } + #[tokio::test] + async fn connected_supervisor_does_not_override_replacement_pending_update() { + let runtime = test_runtime(Arc::new(TestDriver::default())).await; + let sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Ready); + runtime.store.put_message(&sandbox).await.unwrap(); + register_test_supervisor_session(&runtime, "sb-1"); + + runtime + .apply_sandbox_update(DriverSandbox { + id: "sb-1".to_string(), + name: "sandbox-a".to_string(), + namespace: "default".to_string(), + spec: None, + status: Some(make_driver_status(make_driver_condition( + "ContainerReplacementPending", + "replacement intent requires another observation", + ))), + }) + .await + .unwrap(); + + let stored = runtime + .store + .get_message::("sb-1") + .await + .unwrap() + .unwrap(); + assert_eq!(stored.phase(), SandboxPhase::Provisioning as i32); + let ready = stored + .status + .as_ref() + .and_then(|status| { + status + .conditions + .iter() + .find(|condition| condition.r#type == "Ready") + }) + .unwrap(); + assert_eq!(ready.status, "False"); + assert_eq!(ready.reason, "ContainerReplacementPending"); + } + #[tokio::test] async fn supervisor_session_connected_promotes_store_state_without_driver_refresh() { let runtime = test_runtime(Arc::new(TestDriver::default())).await; @@ -2953,6 +3226,47 @@ mod tests { ); } + #[tokio::test] + async fn supervisor_session_connected_does_not_override_replacement_pending_store_state() { + let runtime = test_runtime(Arc::new(TestDriver::default())).await; + let mut sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Provisioning); + sandbox.status = Some(SandboxStatus { + sandbox_name: "sandbox-a".to_string(), + conditions: vec![SandboxCondition { + r#type: "Ready".to_string(), + status: "False".to_string(), + reason: "ContainerReplacementPending".to_string(), + message: "replacement intent requires another observation".to_string(), + last_transition_time: String::new(), + }], + ..Default::default() + }); + sandbox.set_phase(SandboxPhase::Provisioning as i32); + runtime.store.put_message(&sandbox).await.unwrap(); + + runtime.supervisor_session_connected("sb-1").await.unwrap(); + + let stored = runtime + .store + .get_message::("sb-1") + .await + .unwrap() + .unwrap(); + assert_eq!(stored.phase(), SandboxPhase::Provisioning as i32); + let ready = stored + .status + .as_ref() + .and_then(|status| { + status + .conditions + .iter() + .find(|condition| condition.r#type == "Ready") + }) + .unwrap(); + assert_eq!(ready.status, "False"); + assert_eq!(ready.reason, "ContainerReplacementPending"); + } + #[tokio::test] async fn supervisor_session_disconnected_demotes_ready_sandbox() { let runtime = test_runtime(Arc::new(TestDriver::default())).await; @@ -3237,8 +3551,10 @@ mod tests { #[derive(Default)] struct RecordingResume { - calls: Mutex>, + calls: Mutex>, + delete_calls: Mutex>, results: Mutex>>, + restore_completed: Mutex, } impl RecordingResume { @@ -3249,9 +3565,17 @@ mod tests { .insert(sandbox_id.to_string(), result); } - async fn calls(&self) -> Vec<(String, String)> { + async fn calls(&self) -> Vec<(String, String, String, bool)> { self.calls.lock().await.clone() } + + async fn restore_completed(&self) -> bool { + *self.restore_completed.lock().await + } + + async fn delete_calls(&self) -> Vec<(String, String)> { + self.delete_calls.lock().await.clone() + } } #[tonic::async_trait] @@ -3260,11 +3584,15 @@ mod tests { &self, sandbox_id: &str, sandbox_name: &str, + expected_instance_id: &str, + should_start: bool, ) -> Result { - self.calls - .lock() - .await - .push((sandbox_id.to_string(), sandbox_name.to_string())); + self.calls.lock().await.push(( + sandbox_id.to_string(), + sandbox_name.to_string(), + expected_instance_id.to_string(), + should_start, + )); self.results .lock() .await @@ -3272,10 +3600,27 @@ mod tests { .cloned() .unwrap_or(Ok(true)) } + + async fn complete_restore(&self) -> Result<(), String> { + *self.restore_completed.lock().await = true; + Ok(()) + } + + async fn complete_delete( + &self, + sandbox_id: &str, + sandbox_name: &str, + ) -> Result { + self.delete_calls + .lock() + .await + .push((sandbox_id.to_string(), sandbox_name.to_string())); + Ok(true) + } } #[tokio::test] - async fn resume_persisted_sandboxes_resumes_running_phases() { + async fn resume_persisted_sandboxes_registers_owned_non_deleting_records() { let resume = Arc::new(RecordingResume::default()); let runtime = test_runtime_with_resume(Arc::new(TestDriver::default()), Some(resume.clone())).await; @@ -3288,32 +3633,72 @@ mod tests { ("sb-deleting", "deleting", SandboxPhase::Deleting), ("sb-error", "error", SandboxPhase::Error), ] { - let sandbox = sandbox_record(id, name, phase); + let mut sandbox = sandbox_record(id, name, phase); + sandbox.status = Some(SandboxStatus { + sandbox_name: name.to_string(), + agent_pod: format!("instance-{id}"), + ..Default::default() + }); + sandbox.set_phase(phase as i32); runtime.store.put_message(&sandbox).await.unwrap(); } runtime.resume_persisted_sandboxes().await.unwrap(); + assert!(resume.restore_completed().await); + assert_eq!( + resume.delete_calls().await, + vec![("sb-deleting".to_string(), "deleting".to_string())] + ); + assert!( + runtime + .store + .get_message::("sb-deleting") + .await + .unwrap() + .is_none() + ); - let mut called_ids = resume - .calls() - .await - .into_iter() - .map(|(id, _)| id) - .collect::>(); - called_ids.sort(); + let mut calls = resume.calls().await; + calls.sort_by(|left, right| left.0.cmp(&right.0)); assert_eq!( - called_ids, + calls, vec![ - "sb-prov".to_string(), - "sb-ready".to_string(), - "sb-unknown".to_string(), - "sb-unspecified".to_string(), + ( + "sb-error".to_string(), + "error".to_string(), + "instance-sb-error".to_string(), + false, + ), + ( + "sb-prov".to_string(), + "prov".to_string(), + "instance-sb-prov".to_string(), + true, + ), + ( + "sb-ready".to_string(), + "ready".to_string(), + "instance-sb-ready".to_string(), + true, + ), + ( + "sb-unknown".to_string(), + "unknown".to_string(), + "instance-sb-unknown".to_string(), + true, + ), + ( + "sb-unspecified".to_string(), + "unspecified".to_string(), + "instance-sb-unspecified".to_string(), + true, + ), ] ); } #[tokio::test] - async fn resume_persisted_sandboxes_marks_missing_backend_as_error() { + async fn resume_persisted_sandboxes_retains_unresolved_backend_state() { let resume = Arc::new(RecordingResume::default()); resume.set_result("sb-1", Ok(false)).await; let runtime = @@ -3332,18 +3717,13 @@ mod tests { .unwrap(); assert_eq!( SandboxPhase::try_from(stored.phase()).unwrap(), - SandboxPhase::Error + SandboxPhase::Ready ); - let ready = stored - .status - .as_ref() - .and_then(|s| s.conditions.iter().find(|c| c.r#type == "Ready")) - .expect("Ready condition present"); - assert_eq!(ready.reason, "BackendResourceMissing"); + assert!(resume.restore_completed().await); } #[tokio::test] - async fn resume_persisted_sandboxes_marks_failed_resume_as_error() { + async fn resume_persisted_sandboxes_fails_startup_without_completing_restore() { let resume = Arc::new(RecordingResume::default()); resume .set_result("sb-1", Err("docker daemon angry".to_string())) @@ -3354,7 +3734,9 @@ mod tests { let sandbox = sandbox_record("sb-1", "broken", SandboxPhase::Provisioning); runtime.store.put_message(&sandbox).await.unwrap(); - runtime.resume_persisted_sandboxes().await.unwrap(); + let err = runtime.resume_persisted_sandboxes().await.unwrap_err(); + assert!(err.contains("docker daemon angry")); + assert!(!resume.restore_completed().await); let stored = runtime .store @@ -3364,15 +3746,8 @@ mod tests { .unwrap(); assert_eq!( SandboxPhase::try_from(stored.phase()).unwrap(), - SandboxPhase::Error + SandboxPhase::Provisioning ); - let ready = stored - .status - .as_ref() - .and_then(|s| s.conditions.iter().find(|c| c.r#type == "Ready")) - .expect("Ready condition present"); - assert_eq!(ready.reason, "ResumeFailed"); - assert!(ready.message.contains("docker daemon angry")); } #[tokio::test] diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index 6462ccbbf..e9e98fdba 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -369,9 +369,15 @@ pub(crate) async fn run_server( // shutdown so the running compute state matches the persisted store. // Runs before watchers spawn so the watch loop sees the post-resume // snapshot on its first poll. - if let Err(err) = state.compute.resume_persisted_sandboxes().await { - warn!(error = %err, "Failed to resume persisted sandboxes during startup"); - } + state + .compute + .resume_persisted_sandboxes() + .await + .map_err(|err| { + Error::execution(format!( + "failed to restore persisted sandbox ownership during startup: {err}" + )) + })?; state.compute.spawn_watchers(shutdown_rx.clone()); ssh_sessions::spawn_session_reaper(store.clone(), Duration::from_secs(3600));