diff --git a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs index ded9880b9..59785e16d 100644 --- a/hyperactor_mesh/src/proc_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/proc_mesh/mesh_agent.rs @@ -550,6 +550,9 @@ impl Handler> for ProcMeshAgent { #[async_trait] impl Handler for ProcMeshAgent { async fn handle(&mut self, cx: &Context, message: resource::Stop) -> anyhow::Result<()> { + + use crate::v1::StatusOverlay; + // We don't remove the actor from the state map, instead we just store // its state as Stopped. let actor = self.actor_states.get_mut(&message.name); @@ -599,7 +602,17 @@ impl Handler for ProcMeshAgent { } } }; - message.reply.send(cx, (rank, status).into())?; + + // Send a sparse overlay update. If rank is unknown, emit an + // empty overlay. + let overlay = if rank == usize::MAX { + StatusOverlay::new() + } else { + StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)]) + .expect("valid single-run overlay") + }; + message.reply.send(cx, overlay)?; + Ok(()) } } @@ -611,6 +624,9 @@ impl Handler for ProcMeshAgent { cx: &Context, get_rank_status: resource::GetRankStatus, ) -> anyhow::Result<()> { + use crate::resource::Status; + use crate::v1::StatusOverlay; + let (rank, status) = match self.actor_states.get(&get_rank_status.name) { Some(ActorInstanceState { spawn: Ok(actor_id), @@ -641,12 +657,21 @@ impl Handler for ProcMeshAgent { spawn: Err(e), create_rank, .. - }) => (*create_rank, resource::Status::Failed(e.to_string())), + }) => (*create_rank, Status::Failed(e.to_string())), // TODO: represent unknown rank - None => (usize::MAX, resource::Status::NotExist), + None => (usize::MAX, Status::NotExist), + }; + + // Send a sparse overlay update. If rank is unknown, emit an + // empty overlay. + let overlay = if rank == usize::MAX { + StatusOverlay::new() + } else { + StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)]) + .expect("valid single-run overlay") }; + get_rank_status.reply.send(cx, overlay)?; - get_rank_status.reply.send(cx, (rank, status).into())?; Ok(()) } } diff --git a/hyperactor_mesh/src/resource.rs b/hyperactor_mesh/src/resource.rs index 0aa246b4f..c0b7133fc 100644 --- a/hyperactor_mesh/src/resource.rs +++ b/hyperactor_mesh/src/resource.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::fmt; use std::fmt::Debug; use std::hash::Hash; -use std::marker::PhantomData; use std::mem::replace; use std::mem::take; use std::ops::Deref; @@ -31,18 +30,17 @@ use hyperactor::PortRef; use hyperactor::RefClient; use hyperactor::RemoteMessage; use hyperactor::Unbind; -use hyperactor::accum::Accumulator; -use hyperactor::accum::CommReducer; -use hyperactor::accum::ReducerFactory; -use hyperactor::accum::ReducerSpec; use hyperactor::mailbox::PortReceiver; use hyperactor::message::Bind; use hyperactor::message::Bindings; use hyperactor::message::Unbind; +use ndslice::Region; +use ndslice::ViewExt; use serde::Deserialize; use serde::Serialize; use crate::v1::Name; +use crate::v1::StatusOverlay; /// The current lifecycle status of a resource. #[derive( @@ -120,8 +118,12 @@ impl Bind for Rank { } } -/// Get the status of a resource at a rank. This message is designed to be -/// cast and efficiently accumulated. +/// Get the status of a resource across the mesh. +/// +/// This message is cast to all ranks; each rank replies with a sparse +/// status **overlay**. The comm reducer merges overlays (right-wins) +/// and the accumulator applies them to produce **full StatusMesh +/// snapshots** on the receiver side. #[derive( Clone, Debug, @@ -137,34 +139,50 @@ impl Bind for Rank { pub struct GetRankStatus { /// The name of the resource. pub name: Name, - /// The status of the rank. + /// Sparse status updates (overlays) from a rank. #[binding(include)] - pub reply: PortRef>, + pub reply: PortRef, } impl GetRankStatus { pub async fn wait( - mut rx: PortReceiver>, + mut rx: PortReceiver, num_ranks: usize, max_idle_time: Duration, - ) -> Result, RankedValues> { + region: Region, // used only for fallback + ) -> Result { + debug_assert_eq!(region.num_ranks(), num_ranks, "region/num_ranks mismatch"); + let mut alarm = hyperactor::time::Alarm::new(); alarm.arm(max_idle_time); - let mut statuses = RankedValues::default(); + + // Fallback snapshot if we time out before receiving anything. + let mut snapshot = + crate::v1::StatusMesh::from_single(region, crate::resource::Status::NotExist); + loop { let mut sleeper = alarm.sleeper(); tokio::select! { - _ = sleeper.sleep() => return Err(statuses), - new_statuses = rx.recv() => { - match new_statuses { - Ok(new_statuses) => statuses = new_statuses, - Err(_) => return Err(statuses), + _ = sleeper.sleep() => return Err(snapshot), + next = rx.recv() => { + match next { + Ok(mesh) => { snapshot = mesh; } // latest-wins snapshot + Err(_) => return Err(snapshot), } } } + alarm.arm(max_idle_time); - if statuses.rank(num_ranks) == num_ranks { - break Ok(statuses); + + // Completion: once every rank (among the first + // `num_ranks`) has reported at least something (i.e. + // moved off NotExist). + if snapshot + .values() + .take(num_ranks) + .all(|s| !matches!(s, crate::resource::Status::NotExist)) + { + break Ok(snapshot); } } } @@ -232,7 +250,7 @@ pub struct Stop { pub name: Name, /// The status of the rank. #[binding(include)] - pub reply: PortRef>, + pub reply: PortRef, } /// Retrieve the current state of the resource. @@ -323,7 +341,7 @@ impl RankedValues { pub fn materialized_iter(&self, until: usize) -> impl Iterator + '_ { assert_eq!(self.rank(until), until, "insufficient rank"); self.iter() - .flat_map(|(range, value)| std::iter::repeat(value).take(range.end - range.start)) + .flat_map(|(range, value)| std::iter::repeat_n(value, range.end - range.start)) } } @@ -514,44 +532,6 @@ impl FromIterator<(Range, T)> for RankedValues { } } -impl Accumulator for RankedValues { - type State = Self; - type Update = Self; - - fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> { - state.merge_from(update); - Ok(()) - } - - fn reducer_spec(&self) -> Option { - Some(ReducerSpec { - typehash: as Named>::typehash(), - builder_params: None, - }) - } -} - -#[derive(Named)] -struct RankedValuesReducer(std::marker::PhantomData); - -impl CommReducer for RankedValuesReducer { - type Update = RankedValues; - - fn reduce(&self, mut left: Self::Update, right: Self::Update) -> anyhow::Result { - left.merge_from(right); - Ok(left) - } -} - -// register for concrete types: - -hyperactor::submit! { - ReducerFactory { - typehash_f: as Named>::typehash, - builder_f: |_| Ok(Box::new(RankedValuesReducer::(PhantomData))), - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/hyperactor_mesh/src/v1.rs b/hyperactor_mesh/src/v1.rs index 7a738304c..3d7b0e2b5 100644 --- a/hyperactor_mesh/src/v1.rs +++ b/hyperactor_mesh/src/v1.rs @@ -33,6 +33,25 @@ use serde::Deserialize; use serde::Serialize; pub use value_mesh::ValueMesh; +/// A mesh of per-rank lifecycle statuses. +/// +/// `StatusMesh` is `ValueMesh` and supports dense or +/// compressed encodings. Updates are applied via sparse overlays with +/// **last-writer-wins** semantics (see +/// [`ValueMesh::merge_from_overlay`]). The mesh's `Region` defines +/// the rank space; all updates must match that region. +pub type StatusMesh = ValueMesh; + +/// A sparse set of `(Range, Status)` updates for a +/// [`StatusMesh`]. +/// +/// `StatusOverlay` carries **normalized** runs (sorted, +/// non-overlapping, and coalesced). Applying an overlay to a +/// `StatusMesh` uses **right-wins** semantics on overlap and +/// preserves first-appearance order in the compressed table. +/// Construct via `ValueOverlay::try_from_runs` after normalizing. +pub type StatusOverlay = value_mesh::ValueOverlay; + use crate::resource; use crate::resource::RankedValues; use crate::resource::Status; diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index 187d800f2..493deb833 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -15,6 +15,7 @@ use hyperactor::config::CONFIG; use hyperactor::config::ConfigAttr; use hyperactor::declare_attrs; use tokio::time::timeout; + pub mod mesh_agent; use std::collections::HashSet; @@ -539,21 +540,45 @@ impl HostMeshRef { .concat(&per_host) .map_err(|err| v1::Error::ConfigurationError(err.into()))?; + let region: Region = extent.clone().into(); let mesh_name = Name::new(name); + + // Helper: legacy shim for error types that still require + // RankedValues. TODO(shayne-fletcher): Delete this + // shim once Error::ActorSpawnError carries a StatusMesh + // (ValueMesh) directly. At that point, use the mesh + // as-is and remove `mesh_to_rankedvalues_*` calls below. + fn mesh_to_rankedvalues_with_default( + mesh: &crate::v1::StatusMesh, + default_fill: Status, + len: usize, + ) -> RankedValues { + let mut out = RankedValues::from((0..len, default_fill)); + for (i, s) in mesh.values().enumerate() { + if !matches!(s, Status::NotExist) { + out.merge_from(RankedValues::from((i..i + 1, s.clone()))); + } + } + out + } + let mut procs = Vec::new(); - let num_ranks = self.region().num_ranks() * per_host.num_ranks(); + let num_ranks = region.num_ranks(); + // Accumulator outputs full StatusMesh snapshots; seed with + // NotExist. let (port, rx) = cx.mailbox().open_accum_port_opts( - RankedValues::default(), + crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist), Some(ReducerOpts { max_update_interval: Some(Duration::from_millis(50)), }), ); - // We CreateOrUpdate each proc, and then fence on getting statuses back. - // This is currently necessary because otherwise there is a race between - // the procs being created, and subsequent messages becoming unroutable - // (the agent actor manages the local muxer). We can solve this by allowing - // buffering in the host-level muxer. + // Create or update each proc, then fence on receiving status + // overlays. This prevents a race where procs become + // addressable before their local muxers are ready, which + // could make early messages unroutable. A future improvement + // would allow buffering in the host-level muxer to eliminate + // the need for this synchronization step. let mut proc_names = Vec::new(); for (host_rank, host) in self.ranks.iter().enumerate() { for per_host_rank in 0..per_host.num_ranks() { @@ -589,9 +614,24 @@ impl HostMeshRef { let start_time = RealClock.now(); - match GetRankStatus::wait(rx, num_ranks, config::global::get(PROC_SPAWN_MAX_IDLE)).await { + // Wait on accumulated StatusMesh snapshots until complete or + // timeout. + match GetRankStatus::wait( + rx, + num_ranks, + config::global::get(PROC_SPAWN_MAX_IDLE), + region.clone(), // fallback mesh if nothing arrives + ) + .await + { Ok(statuses) => { - if let Some((rank, status)) = statuses.first_terminating() { + // If any rank is terminating, surface a + // ProcCreationError pointing at that rank. + if let Some((rank, status)) = statuses + .values() + .enumerate() + .find(|(_, s)| s.is_terminating()) + { let proc_name = &proc_names[rank]; let host_rank = rank / per_host.num_ranks(); let mesh_agent = self.ranks[host_rank].mesh_agent(); @@ -612,18 +652,20 @@ impl HostMeshRef { let proc_name = Name::new(format!("{}-{}", name, rank % per_host.num_ranks())); return Err(v1::Error::ProcCreationError { state, - mesh_agent, host_rank, + mesh_agent, }); } } Err(complete) => { - // Fill the remaining statuses with a timeout error. - let mut statuses = - RankedValues::from((0..num_ranks, Status::Timeout(start_time.elapsed()))); - statuses.merge_from(complete); - - return Err(v1::Error::ProcSpawnError { statuses }); + // Fill remaining ranks with a timeout status via the + // legacy shim. + let legacy = mesh_to_rankedvalues_with_default( + &complete, + Status::Timeout(start_time.elapsed()), + num_ranks, + ); + return Err(v1::Error::ProcSpawnError { statuses: legacy }); } } diff --git a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs index f626526ba..b213ea8e2 100644 --- a/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs +++ b/hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs @@ -155,20 +155,22 @@ impl Handler for HostMeshAgent { cx: &Context, get_rank_status: resource::GetRankStatus, ) -> anyhow::Result<()> { - let Some(created) = self.created.get(&get_rank_status.name) else { - // TODO: how can we get the host's rank here? we should model its absence explicitly. - get_rank_status - .reply - .send(cx, (usize::MAX, resource::Status::NotExist).into())?; - return Ok(()); - }; + use crate::resource::Status; + use crate::v1::StatusOverlay; - let rank_status = match created { - (rank, Ok(_)) => (*rank, resource::Status::Running), - (rank, Err(e)) => (*rank, resource::Status::Failed(e.to_string())), + let (rank, status) = match self.created.get(&get_rank_status.name) { + Some((rank, Ok(_))) => (*rank, Status::Running), + Some((rank, Err(e))) => (*rank, Status::Failed(e.to_string())), + None => (usize::MAX, Status::NotExist), }; - get_rank_status.reply.send(cx, rank_status.into())?; + let overlay = if rank == usize::MAX { + StatusOverlay::new() + } else { + StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)]) + .expect("valid single-run overlay") + }; + get_rank_status.reply.send(cx, overlay)?; Ok(()) } } diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index e3353d65d..e6ce061dd 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -674,7 +674,7 @@ impl ProcMeshRef { /// - `A: Actor` - the actor actually runs inside each proc. /// - `A: Referable` - so we can return typed `ActorRef`s /// inside the `ActorMesh`. - /// - `A::Params: RemoteMessage` — spawn parameters must be + /// - `A::Params: RemoteMessage` - spawn parameters must be /// serializable and routable. pub(crate) async fn spawn_with_name( &self, @@ -707,13 +707,27 @@ impl ProcMeshRef { }, )?; + let region = self.region().clone(); + // Open an accum port that *receives overlays* and *emits full + // meshes*. + // + // NOTE: Mailbox initializes the accumulator state via + // `Default`, which is an *empty* ValueMesh (0 ranks). Our + // Accumulator> implementation detects this on + // the first update and replaces it with the caller-supplied + // template (the `self` passed into open_accum_port), which we + // seed here as "full NotExist over the target region". let (port, rx) = cx.mailbox().open_accum_port_opts( - RankedValues::default(), + // Initial state for the accumulator: full mesh seeded to + // NotExist. + crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist), Some(ReducerOpts { max_update_interval: Some(Duration::from_millis(50)), }), ); + // Send a message to all ranks. They reply with overlays to + // `port`. self.agent_mesh().cast( cx, resource::GetRankStatus { @@ -722,33 +736,69 @@ impl ProcMeshRef { }, )?; + // Helper: legacy shim for error types that still require + // RankedValues. TODO(shayne-fletcher): Delete this + // shim once Error::ActorSpawnError carries a StatusMesh + // (ValueMesh) directly. At that point, use the mesh + // as-is and remove `mesh_to_rankedvalues_*` calls below. + fn mesh_to_rankedvalues_with_default( + mesh: &crate::v1::StatusMesh, + default: Status, + len: usize, + ) -> RankedValues { + let mut rv = RankedValues::from((0..len, default)); + for (i, s) in mesh.values().enumerate() { + if !matches!(s, Status::NotExist) { + rv.merge_from(RankedValues::from((i..i + 1, s.clone()))); + } + } + rv + } + let start_time = RealClock.now(); - // These will fail if there are any supervision events that occurred - // on the proc. + // Wait for all ranks to report a terminal or running status. + // If any proc reports a failure (via supervision) or the mesh + // times out, `wait()` returns Err with the final snapshot. + // + // `rx` is the accumulator output stream: each time reduced + // overlays are applied, it emits a new StatusMesh snapshot. + // `wait()` loops on it, deciding when the stream is + // "complete" (no more NotExist) or times out. match GetRankStatus::wait( rx, self.ranks.len(), config::global::get(ACTOR_SPAWN_MAX_IDLE), + region.clone(), // fallback ) .await { Ok(statuses) => { - if statuses.first_terminating().is_none() { + // Spawn succeeds only if no rank has reported a + // supervision/terminal state. This preserves the old + // `first_terminating().is_none()` semantics. + let has_terminating = statuses.values().any(|s| s.is_terminating()); + if !has_terminating { Ok(ActorMesh::new(self.clone(), name)) } else { - Err(Error::ActorSpawnError { statuses }) + let legacy = mesh_to_rankedvalues_with_default( + &statuses, + Status::NotExist, + self.ranks.len(), + ); + Err(Error::ActorSpawnError { statuses: legacy }) } } Err(complete) => { - // Fill the remaining statuses with a timeout error. - let mut statuses = RankedValues::from(( - 0..self.ranks.len(), - Status::Timeout(start_time.elapsed()), - )); - statuses.merge_from(complete); - - Err(Error::ActorSpawnError { statuses }) + // Fill remaining ranks with a timeout status, now + // handled via the legacy shim. + let elapsed = start_time.elapsed(); + let legacy = mesh_to_rankedvalues_with_default( + &complete, + Status::Timeout(elapsed), + self.ranks.len(), + ); + Err(Error::ActorSpawnError { statuses: legacy }) } } } @@ -759,8 +809,20 @@ impl ProcMeshRef { cx: &impl context::Actor, mesh_name: Name, ) -> v1::Result<()> { + let region = self.region().clone(); + // Open an accum port that *receives overlays* and *emits full + // meshes*. + // + // NOTE: Mailbox initializes the accumulator state via + // `Default`, which is an *empty* ValueMesh (0 ranks). Our + // Accumulator> implementation detects this on + // the first update and replaces it with the caller-supplied + // template (the `self` passed into open_accum_port), which we + // seed here as "full NotExist over the target region". let (port, rx) = cx.mailbox().open_accum_port_opts( - RankedValues::default(), + // Initial state for the accumulator: full mesh seeded to + // NotExist. + crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist), Some(ReducerOpts { max_update_interval: Some(Duration::from_millis(50)), }), @@ -774,24 +836,59 @@ impl ProcMeshRef { )?; let start_time = RealClock.now(); + // Helper: legacy shim for error types that still require + // RankedValues. TODO(shayne-fletcher): Delete this + // shim once Error::ActorSpawnError carries a StatusMesh + // (ValueMesh) directly. At that point, use the mesh + // as-is and remove `mesh_to_rankedvalues_*` calls below. + fn mesh_to_rankedvalues_with_default( + mesh: &crate::v1::StatusMesh, + default_fill: Status, + len: usize, + ) -> RankedValues { + let mut out = RankedValues::from((0..len, default_fill)); + for (i, s) in mesh.values().enumerate() { + if !matches!(s, Status::NotExist) { + out.merge_from(RankedValues::from((i..i + 1, s.clone()))); + } + } + out + } + // Reuse actor spawn idle time. let max_idle_time = config::global::get(ACTOR_SPAWN_MAX_IDLE); - match GetRankStatus::wait(rx, self.ranks.len(), max_idle_time).await { + match GetRankStatus::wait( + rx, + self.ranks.len(), + max_idle_time, + region.clone(), // fallback mesh if nothing arrives + ) + .await + { Ok(statuses) => { - if statuses.first_failed().is_none() { + let has_failed = statuses + .values() + .any(|s| matches!(s, Status::Failed(_) | Status::Timeout(_))); + if !has_failed { Ok(()) } else { - Err(Error::ActorStopError { statuses }) + let legacy = mesh_to_rankedvalues_with_default( + &statuses, + Status::NotExist, + self.ranks.len(), + ); + Err(Error::ActorStopError { statuses: legacy }) } } Err(complete) => { - // Fill the remaining statuses with a timeout error. - let mut statuses = RankedValues::from(( - 0..self.ranks.len(), + // Fill remaining ranks with a timeout status via the + // legacy shim. + let legacy = mesh_to_rankedvalues_with_default( + &complete, Status::Timeout(start_time.elapsed()), - )); - statuses.merge_from(complete); - Err(Error::ActorStopError { statuses }) + self.ranks.len(), + ); + Err(Error::ActorStopError { statuses: legacy }) } } } diff --git a/hyperactor_mesh/src/v1/value_mesh.rs b/hyperactor_mesh/src/v1/value_mesh.rs index 4c7b49a9c..fd20c3e11 100644 --- a/hyperactor_mesh/src/v1/value_mesh.rs +++ b/hyperactor_mesh/src/v1/value_mesh.rs @@ -6,35 +6,196 @@ * LICENSE file in the root directory of this source tree. */ +use std::cmp::Ordering; +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::hash::Hash; +use std::hash::Hasher; +use std::marker::PhantomData; use std::mem; use std::mem::MaybeUninit; +use std::ops::Range; use std::ptr; use std::ptr::NonNull; use futures::Future; +use hyperactor::Named; +use hyperactor::accum::Accumulator; +use hyperactor::accum::CommReducer; +use hyperactor::accum::ReducerFactory; +use hyperactor::accum::ReducerSpec; +use ndslice::extent; use ndslice::view; use ndslice::view::Ranked; use ndslice::view::Region; +use serde::Deserialize; +use serde::Serialize; -/// A mesh of values, where each value is associated with a rank. +mod rle; +mod value_overlay; +pub use value_overlay::BuildError; +pub use value_overlay::ValueOverlay; + +/// A mesh of values, one per rank in `region`. +/// +/// The internal representation (`rep`) may be dense or compressed, +/// but externally the mesh always behaves as a complete mapping from +/// rank index → value. /// -/// # Invariant -/// The mesh is *complete*: `ranks.len()` always equals -/// `region.num_ranks()`. Every rank in the region has exactly one -/// associated value. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] // only if T implements +/// # Invariants +/// - Complete: every rank in `region` has exactly one value. +/// - Order: iteration and indexing follow the region's linearization. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements pub struct ValueMesh { + /// The logical multidimensional domain of the mesh. + /// + /// Determines the number of ranks (`region.num_ranks()`) and the + /// order in which they are traversed. region: Region, - ranks: Vec, + + /// The underlying storage representation. + /// + /// - `Rep::Dense` stores a `Vec` with one value per rank. + /// - `Rep::Compressed` stores a run-length encoded table of + /// unique values plus `(Range, u32)` pairs describing + /// contiguous runs of identical values. + /// + /// The representation is an internal optimization detail; all + /// public APIs (e.g. `values()`, `get()`, slicing) behave as if + /// the mesh were dense. + rep: Rep, +} + +/// A single run-length–encoded (RLE) segment within a [`ValueMesh`]. +/// +/// Each `Run` represents a contiguous range of ranks `[start, end)` +/// that all share the same value, referenced indirectly via a table +/// index `id`. This allows compact storage of large regions with +/// repeated values. +/// +/// Runs are serialized in a stable, portable format using `u64` for +/// range bounds (`start`, `end`) to avoid platform‐dependent `usize` +/// encoding differences. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +struct Run { + /// Inclusive start of the contiguous range of ranks (0-based). + start: u64, + /// Exclusive end of the contiguous range of ranks (0-based). + end: u64, + /// Index into the value table for this run's shared value. + id: u32, +} + +impl Run { + /// Creates a new `Run` covering ranks `[start, end)` that all + /// share the same table entry `id`. + /// + /// Converts `usize` bounds to `u64` for stable serialization. + fn new(start: usize, end: usize, id: u32) -> Self { + Self { + start: start as u64, + end: end as u64, + id, + } + } +} + +impl TryFrom for (Range, u32) { + type Error = &'static str; + + /// Converts a serialized [`Run`] back into its in-memory form + /// `(Range, u32)`. + /// + /// Performs checked conversion of the 64-bit wire fields back + /// into `usize` indices, returning an error if either bound + /// exceeds the platform's addressable range. This ensures safe + /// round-tripping between the serialized wire format and native + /// representation. + fn try_from(r: Run) -> Result { + let start = usize::try_from(r.start).map_err(|_| "run.start too large")?; + let end = usize::try_from(r.end).map_err(|_| "run.end too large")?; + Ok((start..end, r.id)) + } +} + +/// Internal storage representation for a [`ValueMesh`]. +/// +/// This enum abstracts how the per-rank values are stored. +/// Externally, both variants behave identically — the difference is +/// purely in memory layout and access strategy. +/// +/// - [`Rep::Dense`] stores one value per rank, directly. +/// - [`Rep::Compressed`] stores a compact run-length-encoded form, +/// reusing identical values across contiguous ranks. +/// +/// Users of [`ValueMesh`] normally never interact with `Rep` +/// directly; all iteration and slicing APIs present a dense logical +/// view. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements +#[serde(tag = "rep", rename_all = "snake_case")] +enum Rep { + /// Fully expanded representation: one element per rank. + /// + /// The length of `values` is always equal to + /// `region.num_ranks()`. This form is simple and fast for + /// iteration and mutation but uses more memory when large runs of + /// repeated values are present. + Dense { + /// Flat list of values, one per rank in the region's + /// linearization order. + values: Vec, + }, + + /// Run-length-encoded representation. + /// + /// Each run `(Range, id)` indicates that the ranks within + /// `Range` (half-open `[start, end)`) share the same value at + /// `table[id]`. The `table` stores each distinct value once. + /// + /// # Invariants + /// - Runs are non-empty and contiguous (`r.start < r.end`). + /// - Runs collectively cover `0..region.num_ranks()` with no gaps + /// or overlaps. + /// - `id` indexes into `table` (`id < table.len()`). + Compressed { + /// The deduplicated set of unique values referenced by runs. + table: Vec, + + /// List of `(range, table_id)` pairs describing contiguous + /// runs of identical values in region order. + runs: Vec, + }, +} + +// At this time, Default is used primarily to satisfy the mailbox +// Accumulator bound. It constructs an empty (zero-rank) mesh. Other +// contexts may also use this as a generic "empty mesh" initializer +// when a concrete region is not yet known. +impl Default for ValueMesh { + fn default() -> Self { + Self::empty() + } } impl ValueMesh { + /// Returns an *empty* mesh: a 1-dimensional region of length 0. + /// + /// This differs from a *dimensionless* (0-D) region, which + /// represents a single scalar element. A zero-length 1-D region + /// has **no elements at all**, making it the natural `Default` + /// placeholder for accumulator state initialization. + pub fn empty() -> Self { + // zero-rank region; no constraints on T + let region = extent!(r = 0).into(); + Self::new_unchecked(region, Vec::::new()) + } + /// Creates a new `ValueMesh` for `region` with exactly one value /// per rank. /// /// # Invariants /// This constructor validates that the number of provided values - /// (`ranks.len()`) matches the region’s cardinality + /// (`ranks.len()`) matches the region's cardinality /// (`region.num_ranks()`). A value mesh must be complete: every /// rank in `region` has a corresponding `T`. /// @@ -47,7 +208,10 @@ impl ValueMesh { if actual != expected { return Err(crate::v1::Error::InvalidRankCardinality { expected, actual }); } - Ok(Self { region, ranks }) + Ok(Self { + region, + rep: Rep::Dense { values: ranks }, + }) } /// Constructs a `ValueMesh` without checking cardinality. Caller @@ -55,7 +219,201 @@ impl ValueMesh { #[inline] pub(crate) fn new_unchecked(region: Region, ranks: Vec) -> Self { debug_assert_eq!(region.num_ranks(), ranks.len()); - Self { region, ranks } + Self { + region, + rep: Rep::Dense { values: ranks }, + } + } +} + +impl ValueMesh { + /// Builds a `ValueMesh` that assigns the single value `s` to + /// every rank in `region`, without materializing a dense + /// `Vec`. The result is stored in compressed (RLE) form as a + /// single run `[0..N)`. + /// + /// If `region.num_ranks() == 0`, the mesh contains no runs (and + /// an empty table), regardless of `s`. + pub fn from_single(region: Region, s: T) -> Self { + let n = region.num_ranks(); + if n == 0 { + return Self { + region, + rep: Rep::Compressed { + table: Vec::new(), + runs: Vec::new(), + }, + }; + } + + let table = vec![s]; + let runs = vec![Run::new(0, n, 0)]; + Self { + region, + rep: Rep::Compressed { table, runs }, + } + } +} + +impl ValueMesh { + /// Builds a [`ValueMesh`] covering the region, filled with + /// `T::default()`. + /// + /// Equivalent to [`ValueMesh::from_single(region, + /// T::default())`]. + pub fn from_default(region: Region) -> Self { + ValueMesh::::from_single(region, T::default()) + } +} + +impl ValueMesh { + /// Builds a compressed mesh from a default value and a set of + /// disjoint ranges that override the default. + /// + /// - `ranges` may be in any order; they must be non-empty, + /// in-bounds, and non-overlapping. + /// - Unspecified ranks are filled with `default`. + /// - Result is stored in RLE form; no dense `Vec` is + /// materialized. + pub fn from_ranges_with_default( + region: Region, + default: T, + mut ranges: Vec<(Range, T)>, + ) -> crate::v1::Result { + let n = region.num_ranks(); + + if n == 0 { + return Ok(Self { + region, + rep: Rep::Compressed { + table: Vec::new(), + runs: Vec::new(), + }, + }); + } + + // Validate: non-empty, in-bounds; then sort. + for (r, _) in &ranges { + if r.is_empty() { + return Err(crate::v1::Error::InvalidRankCardinality { + expected: n, + actual: 0, + }); // TODO: this surfaces the error but its not a great fit + } + if r.end > n { + return Err(crate::v1::Error::InvalidRankCardinality { + expected: n, + actual: r.end, + }); + } + } + ranges.sort_by_key(|(r, _)| (r.start, r.end)); + + // Validate: non-overlapping. + for w in ranges.windows(2) { + let (a, _) = &w[0]; + let (b, _) = &w[1]; + if a.end > b.start { + // Overlap + return Err(crate::v1::Error::InvalidRankCardinality { + expected: n, + actual: b.start, // TODO: this surfaces the error but is a bad fit + }); + } + } + + // Internal index: value -> table id (assigned once). + let mut index: HashMap = HashMap::with_capacity(1 + ranges.len()); + let mut next_id: u32 = 0; + + // Helper: assign or reuse an id by value, taking ownership of + // v. + let id_of = |v: T, index: &mut HashMap, next_id: &mut u32| -> u32 { + match index.entry(v) { + Entry::Occupied(o) => *o.get(), + Entry::Vacant(vac) => { + let id = *next_id; + *next_id += 1; + vac.insert(id); + id + } + } + }; + + let default_id = id_of(default, &mut index, &mut next_id); + + let mut runs: Vec = Vec::with_capacity(1 + 2 * ranges.len()); + let mut cursor = 0usize; + + for (r, v) in ranges.into_iter() { + // Fill default gap if any. + if cursor < r.start { + runs.push(Run::new(cursor, r.start, default_id)); + } + // Override block. + let id = id_of(v, &mut index, &mut next_id); + runs.push(Run::new(r.start, r.end, id)); + cursor = r.end; + } + + // Trailing default tail. + if cursor < n { + runs.push(Run::new(cursor, n, default_id)); + } + + // Materialize table in id order without cloning: move keys + // out of the map. + let mut table_slots: Vec> = Vec::new(); + table_slots.resize_with(next_id as usize, || None); + + for (t, id) in index.into_iter() { + table_slots[id as usize] = Some(t); + } + + let table: Vec = table_slots + .into_iter() + .map(|o| o.expect("every id must be assigned")) + .collect(); + + Ok(Self { + region, + rep: Rep::Compressed { table, runs }, + }) + } + + /// Builds a [`ValueMesh`] from a fully materialized dense vector + /// of per-rank values, then compresses it into run-length–encoded + /// form if possible. + /// + /// This constructor is intended for callers that already have one + /// value per rank (e.g. computed or received data) but wish to + /// store it efficiently. + /// + /// # Parameters + /// - `region`: The logical region describing the mesh's shape and + /// rank order. + /// - `values`: A dense vector of values, one per rank in + /// `region`. + /// + /// # Returns + /// A [`ValueMesh`] whose internal representation is `Compressed` + /// if any adjacent elements are equal, or `Dense` if no + /// compression was possible. + /// + /// # Errors + /// Returns an error if the number of provided `values` does not + /// match the number of ranks in `region`. + /// + /// # Examples + /// ```ignore + /// let region: Region = extent!(n = 5).into(); + /// let mesh = ValueMesh::from_dense(region, vec![1, 1, 2, 2, 3]).unwrap(); + /// // Internally compressed to three runs: [1, 1], [2, 2], [3] + /// ``` + pub fn from_dense(region: Region, values: Vec) -> crate::v1::Result { + let mut vm = Self::new(region, values)?; + vm.compress_adjacent_in_place(); + Ok(vm) } } @@ -63,8 +421,17 @@ impl ValueMesh { /// Await all futures in the mesh, yielding a `ValueMesh` of their /// outputs. pub async fn join(self) -> ValueMesh { - let ValueMesh { region, ranks } = self; - ValueMesh::new_unchecked(region, futures::future::join_all(ranks).await) + let ValueMesh { region, rep } = self; + + match rep { + Rep::Dense { values } => { + let results = futures::future::join_all(values).await; + ValueMesh::new_unchecked(region, results) + } + Rep::Compressed { .. } => { + unreachable!("join() not implemented for compressed meshes") + } + } } } @@ -72,21 +439,74 @@ impl ValueMesh> { /// Transposes a `ValueMesh>` into a /// `Result, E>`. pub fn transpose(self) -> Result, E> { - let ValueMesh { region, ranks } = self; - let ranks = ranks.into_iter().collect::, E>>()?; - Ok(ValueMesh::new_unchecked(region, ranks)) + let ValueMesh { region, rep } = self; + + match rep { + Rep::Dense { values } => { + let values = values.into_iter().collect::, E>>()?; + Ok(ValueMesh::new_unchecked(region, values)) + } + Rep::Compressed { table, runs } => { + let table: Vec = table.into_iter().collect::, E>>()?; + Ok(ValueMesh { + region, + rep: Rep::Compressed { table, runs }, + }) + } + } } } impl view::Ranked for ValueMesh { type Item = T; + /// Returns the region that defines this mesh's shape and rank + /// order. fn region(&self) -> &Region { &self.region } + /// Looks up the value at the given linearized rank. + /// + /// Works transparently for both dense and compressed + /// representations: + /// - In the dense case, it simply indexes into the `values` + /// vector. + /// - In the compressed case, it performs a binary search over run + /// boundaries to find which run contains the given rank, then + /// returns the corresponding entry from `table`. + /// + /// Returns `None` if the rank is out of bounds. fn get(&self, rank: usize) -> Option<&Self::Item> { - self.ranks.get(rank) + if rank >= self.region.num_ranks() { + return None; + } + + match &self.rep { + Rep::Dense { values } => values.get(rank), + + Rep::Compressed { table, runs } => { + let rank = rank as u64; + + // Binary search over runs: find the one whose range + // contains `rank`. + let idx = runs + .binary_search_by(|run| { + if run.end <= rank { + Ordering::Less + } else if run.start > rank { + Ordering::Greater + } else { + Ordering::Equal + } + }) + .ok()?; + + // Map the run's table ID to its actual value. + let id = runs[idx].id as usize; + table.get(id) + } + } } } @@ -150,7 +570,7 @@ impl view::BuildFromRegionIndexed for ValueMesh { let mut filled = 0usize; // Drop guard: cleans up initialized elements on early exit. - // Stores raw, non-borrowed pointers (`NonNull`), so we don’t + // Stores raw, non-borrowed pointers (`NonNull`), so we don't // hold Rust references for the whole scope. This allows // mutating `buf`/`bits` inside the loop while still letting // the guard access them if dropped early. @@ -358,6 +778,250 @@ impl view::BuildFromRegionIndexed for ValueMesh { } } +impl ValueMesh { + /// Compresses the mesh in place using run-length encoding (RLE). + /// + /// This method scans the mesh's dense values, coalescing adjacent + /// runs of identical elements into a compact [`Rep::Compressed`] + /// representation. It replaces the internal storage (`rep`) with + /// the compressed form. + /// + /// # Behavior + /// - If the mesh is already compressed, this is a **no-op**. + /// - If the mesh is dense, it consumes the current `Vec` and + /// rebuilds the representation as a run table plus value table. + /// - Only *adjacent* equal values are merged; non-contiguous + /// duplicates remain distinct. + /// + /// # Requirements + /// - `T` must implement [`PartialEq`] (to detect equal values). + /// + /// This operation is lossless: expanding the compressed mesh back + /// into a dense vector yields the same sequence of values. + pub fn compress_adjacent_in_place(&mut self) { + self.compress_adjacent_in_place_by(|a, b| a == b) + } +} + +impl ValueMesh { + /// Materializes this mesh into a vector of `(Range, T)` + /// runs. + /// + /// For a dense representation, this walks the value vector and + /// groups adjacent equal values into contiguous runs. The result + /// is equivalent to what would be stored in a compressed + /// representation, but the mesh itself is not mutated or + /// re-encoded. This is purely a read-only view. + /// + /// For a compressed representation, the stored runs are simply + /// cloned. + /// + /// This method is intended for inspection, testing, and + /// diff/merge operations that need a uniform view of value runs + /// without changing the underlying representation. + fn materialized_runs(&self) -> Vec<(Range, T)> { + match &self.rep { + Rep::Dense { values } => { + // Coalesce adjacent equals into runs. + let mut out = Vec::new(); + if values.is_empty() { + return out; + } + let mut start = 0usize; + for i in 1..values.len() { + if values[i] != values[i - 1] { + out.push((start..i, values[i - 1].clone())); + start = i; + } + } + out.push((start..values.len(), values.last().unwrap().clone())); + out + } + Rep::Compressed { table, runs } => runs + .iter() + .map(|r| { + let id = r.id as usize; + ((r.start as usize..r.end as usize), table[id].clone()) + }) + .collect(), + } + } +} + +impl ValueMesh { + /// Merge a sparse overlay into this mesh. + /// + /// Overlay segments are applied with **last-writer-wins** + /// precedence on overlap (identical to `RankedValues::merge_from` + /// behavior). The result is stored compressed. + pub fn merge_from_overlay(&mut self, overlay: ValueOverlay) -> Result<(), BuildError> { + let n = self.region.num_ranks(); + + // Bounds validation (structure already validated by + // ValueOverlay). + for (r, _) in overlay.runs() { + if r.end > n { + return Err(BuildError::OutOfBounds { + range: r.clone(), + region_len: n, + }); + } + } + + // Left: current mesh as normalized value-bearing runs. + let left = self.materialized_runs(); + // Right: overlay runs (already sorted, non-overlapping, + // coalesced). + let right: Vec<(std::ops::Range, T)> = overlay.runs().cloned().collect(); + + // Merge with overlay precedence, reusing the same splitting + // strategy as RankedValues::merge_from. + let merged = rle::merge_value_runs(left, right); + + // Re-encode to compressed representation: + let (table, raw_runs) = rle::rle_from_value_runs(merged); + let runs = raw_runs + .into_iter() + .map(|(r, id)| Run::new(r.start, r.end, id)) + .collect(); + self.rep = Rep::Compressed { table, runs }; + + Ok(()) + } +} + +impl ValueMesh { + /// Compresses the mesh in place using a custom equivalence + /// predicate. + /// + /// This is a generalized form of [`compress_adjacent_in_place`] + /// that merges adjacent values according to an arbitrary + /// predicate `same(a, b)`, rather than relying on `PartialEq`. + /// + /// # Behavior + /// - If the mesh is already compressed, this is a **no-op**. + /// - Otherwise, consumes the dense `Vec` and replaces it with + /// a run-length encoded (`Rep::Compressed`) representation, + /// where consecutive elements satisfying `same(a, b)` are + /// coalesced into a single run. + /// + /// # Requirements + /// - The predicate must be reflexive and symmetric for + /// correctness. + /// + /// This operation is lossless: expanding the compressed mesh + /// reproduces the original sequence exactly under the same + /// equivalence. + pub fn compress_adjacent_in_place_by(&mut self, same: F) + where + F: FnMut(&T, &T) -> bool, + { + let values = match &mut self.rep { + Rep::Dense { values } => std::mem::take(values), + Rep::Compressed { .. } => return, + }; + let (table, raw_runs) = rle::rle_from_dense(values, same); + let runs = raw_runs + .into_iter() + .map(|(r, id)| Run::new(r.start, r.end, id)) + .collect(); + self.rep = Rep::Compressed { table, runs }; + } +} + +/// Accumulates sparse overlay updates into an authoritative mesh. +/// +/// Lifecycle: +/// - Mailbox initializes `State` via `Default` (empty mesh). +/// - On the first update, the accumulator clones `self` (the template +/// passed to `open_accum_port_opts`) into `state`. Callers pass a +/// template such as `StatusMesh::from_single(region, NotExist)`. +/// - Each overlay update is merged with right-wins semantics via +/// `merge_from_overlay`, and the accumulator emits a *full* +/// ValueMesh. +/// +/// The accumulator's state is a [`ValueMesh`] and its updates are +/// [`ValueOverlay`] instances. On each update, the overlay’s +/// normalized runs are merged into the mesh using +/// [`ValueMesh::merge_from_overlay`] with right-wins semantics. +/// +/// This enables incremental reduction of sparse updates across +/// distributed reducers without materializing dense data. +impl Accumulator for ValueMesh +where + T: Eq + Clone + Named, +{ + type State = Self; + type Update = ValueOverlay; + + fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> { + // Mailbox starts with A::State::default() (empty). On the + // first update, re-initialize to our template (self), which + // the caller constructed as "full NotExist over the target + // region". + if state.region().num_ranks() == 0 { + *state = self.clone(); + } + + // Apply sparse delta into the authoritative mesh + // (right-wins). + state.merge_from_overlay(update)?; + Ok(()) + } + + fn reducer_spec(&self) -> Option { + Some(ReducerSpec { + typehash: as Named>::typehash(), + builder_params: None, + }) + } +} + +/// Marker reducer type for [`ValueOverlay`]. +/// +/// This reducer carries no state; it exists only to bind a concrete +/// type parameter `T` to the [`CommReducer`] implementation below. +/// Reduction is purely functional and uses right-wins merge semantics +/// defined in [`merge_value_runs`]. +#[derive(Named)] +struct ValueOverlayReducer(std::marker::PhantomData); + +/// Reducer for sparse overlay updates. +/// +/// Combines two [`ValueOverlay`] updates using right-wins +/// semantics: overlapping runs from `right` overwrite those in +/// `left`. The merged runs are then normalized and validated via +/// [`ValueOverlay::try_from_runs`]. +/// +/// Used by the corresponding [`Accumulator`] to perform distributed +/// reduction of incremental mesh updates. +impl CommReducer for ValueOverlayReducer +where + T: Eq + Clone + Named, +{ + type Update = ValueOverlay; + + // Last-writer-wins merge of two sparse overlays. + fn reduce(&self, left: Self::Update, right: Self::Update) -> anyhow::Result { + // 1) Merge runs with right precedence. + let merged = crate::v1::value_mesh::rle::merge_value_runs( + left.runs().cloned().collect(), + right.runs().cloned().collect(), + ); + // 2) Re-normalize to an overlay (validates, coalesces). + Ok(ValueOverlay::try_from_runs(merged)?) + } +} + +// register for concrete types: + +hyperactor::submit! { + ReducerFactory { + typehash_f: as Named>::typehash, + builder_f: |_| Ok(Box::new(ValueOverlayReducer::(PhantomData))), + } +} + #[cfg(test)] mod tests { use std::convert::Infallible; @@ -378,9 +1042,11 @@ mod tests { use ndslice::view::CollectMeshExt; use ndslice::view::MapIntoExt; use ndslice::view::Ranked; + use ndslice::view::RankedSliceable; use ndslice::view::ViewExt; use proptest::prelude::*; use proptest::strategy::ValueTree; + use serde_json; use super::*; @@ -818,7 +1484,10 @@ mod tests { let doubled: ValueMesh<_> = vm.map_into(|x| x * 2); assert_eq!(doubled.region, region); - assert_eq!(doubled.ranks, vec![0, 2, 4, 6, 8, 10]); + assert_eq!( + doubled.values().collect::>(), + vec![0, 2, 4, 6, 8, 10] + ); } #[test] @@ -831,7 +1500,7 @@ mod tests { let lens: ValueMesh<_> = vm.map_into(|s| s.len()); assert_eq!(lens.region, region); - assert_eq!(lens.ranks, vec![1, 1, 1, 1]); + assert_eq!(lens.values().collect::>(), vec![1, 1, 1, 1]); } #[test] @@ -900,7 +1569,7 @@ mod tests { assert_eq!(pending.region, region); // Drive the ready futures without a runtime and collect results. - let results: Vec<_> = pending.ranks.into_iter().map(poll_now).collect(); + let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect(); assert_eq!(results, vec![11, 21, 31, 41]); } @@ -911,7 +1580,7 @@ mod tests { let out: ValueMesh<_> = vm.map_into(|x| x * x); assert_eq!(out.region, region); - assert_eq!(out.ranks, vec![49]); + assert_eq!(out.values().collect::>(), vec![49]); } #[test] @@ -929,4 +1598,336 @@ mod tests { assert_eq!(projected.values().collect::>(), vec![10, 20, 30]); assert_eq!(projected.region(), ®ion); } + + #[test] + fn rle_roundtrip_all_equal() { + let region: Region = extent!(n = 6).into(); + let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]); + + // Compress and ensure logical equality preserved. + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]); + + // Random access still works. + for i in 0..region.num_ranks() { + assert_eq!(vm.get(i), Some(&42)); + } + assert_eq!(vm.get(region.num_ranks()), None); // out-of-bounds + } + + #[test] + fn rle_roundtrip_alternating() { + let region: Region = extent!(n = 6).into(); + let original = vec![1, 2, 1, 2, 1, 2]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, original); + + // Spot-check random access after compression. + assert_eq!(vm.get(0), Some(&1)); + assert_eq!(vm.get(1), Some(&2)); + assert_eq!(vm.get(3), Some(&2)); + assert_eq!(vm.get(5), Some(&2)); + } + + #[test] + fn rle_roundtrip_blocky_and_slice() { + // Blocks: 0,0,0 | 1,1 | 2,2,2,2 | 3 + let region: Region = extent!(n = 10).into(); + let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, original); + + // Slice a middle subregion [3..8) → [1,1,2,2,2] + let sub_region = region.range("n", 3..8).unwrap(); + let sliced = vm.sliced(sub_region); + let sliced_vec: Vec<_> = sliced.values().collect(); + assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]); + } + + #[test] + fn rle_idempotent_noop_on_second_call() { + let region: Region = extent!(n = 7).into(); + let original = vec![9, 9, 9, 8, 8, 9, 9]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let once: Vec<_> = vm.values().collect(); + assert_eq!(once, original); + + // Calling again should be a no-op and still yield identical + // values. + vm.compress_adjacent_in_place(); + let twice: Vec<_> = vm.values().collect(); + assert_eq!(twice, original); + } + + #[test] + fn rle_works_after_build_indexed() { + // Build with shuffled pairs, then compress and verify + // semantics. + let region: Region = extent!(x = 2, y = 3).into(); // 6 + let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)]; + let mut vm = pairs + .into_iter() + .collect_indexed::>(region.clone()) + .unwrap(); + + // Should compress to 6 runs of length 1; still must + // round-trip. + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]); + // Spot-check get() + assert_eq!(vm.get(4), Some(&40)); + } + + #[test] + fn rle_handles_singleton_mesh() { + let region: Region = extent!(n = 1).into(); + let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![123]); + assert_eq!(vm.get(0), Some(&123)); + assert_eq!(vm.get(1), None); + } + + #[test] + fn test_dense_round_trip() { + // Build a simple dense mesh of 5 integers. + let region: Region = extent!(x = 5).into(); + let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap(); + + let json = serde_json::to_string_pretty(&dense).unwrap(); + let restored: ValueMesh = serde_json::from_str(&json).unwrap(); + + assert_eq!(dense, restored); + + // Dense meshes should stay dense on the wire: check the + // tagged variant. + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + // enum tag is nested: {"rep": {"rep":"dense", ...}} + let tag = v + .get("rep") + .and_then(|o| o.get("rep")) + .and_then(|s| s.as_str()); + assert_eq!(tag, Some("dense")); + } + + #[test] + fn test_compressed_round_trip() { + // Build a dense mesh, compress it, and verify it stays + // compressed on the wire. + let region: Region = extent!(x = 10).into(); + let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap(); + mesh.compress_adjacent_in_place(); + + let json = serde_json::to_string_pretty(&mesh).unwrap(); + let restored: ValueMesh = serde_json::from_str(&json).unwrap(); + + // Logical equality preserved. + assert_eq!(mesh, restored); + + // Compressed meshes should stay compressed on the wire. + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + // enum tag is nested: {"rep": {"rep":"compressed", ...}} + let tag = v + .get("rep") + .and_then(|o| o.get("rep")) + .and_then(|s| s.as_str()); + assert_eq!(tag, Some("compressed")); + } + + #[test] + fn test_stable_run_encoding() { + let run = Run::new(0, 10, 42); + let json = serde_json::to_string(&run).unwrap(); + let decoded: Run = serde_json::from_str(&json).unwrap(); + + assert_eq!(run, decoded); + assert_eq!(run.start, 0); + assert_eq!(run.end, 10); + assert_eq!(run.id, 42); + + // Ensure conversion back to Range works. + let (range, id): (Range, u32) = run.try_into().unwrap(); + assert_eq!(range, 0..10); + assert_eq!(id, 42); + } + + #[test] + fn from_single_builds_single_run() { + let region: Region = extent!(n = 6).into(); + let vm = ValueMesh::from_single(region.clone(), 7); + + assert_eq!(vm.region(), ®ion); + assert_eq!(vm.values().collect::>(), vec![7, 7, 7, 7, 7, 7]); + assert_eq!(vm.get(0), Some(&7)); + assert_eq!(vm.get(5), Some(&7)); + assert_eq!(vm.get(6), None); + } + + #[test] + fn from_default_builds_with_default_value() { + let region: Region = extent!(n = 6).into(); + let vm = ValueMesh::::from_default(region.clone()); + + assert_eq!(vm.region(), ®ion); + // i32::default() == 0 + assert_eq!(vm.values().collect::>(), vec![0, 0, 0, 0, 0, 0]); + assert_eq!(vm.get(0), Some(&0)); + assert_eq!(vm.get(5), Some(&0)); + } + + #[test] + fn test_default_vs_single_equivalence() { + let region: Region = extent!(x = 4).into(); + let d1 = ValueMesh::::from_default(region.clone()); + let d2 = ValueMesh::from_single(region.clone(), 0); + assert_eq!(d1, d2); + } + + #[test] + fn build_from_ranges_with_default_basic() { + let region: Region = extent!(n = 10).into(); + let vm = ValueMesh::from_ranges_with_default( + region.clone(), + 0, // default + vec![(2..4, 1), (6..9, 2)], + ) + .unwrap(); + + assert_eq!(vm.region(), ®ion); + assert_eq!( + vm.values().collect::>(), + vec![0, 0, 1, 1, 0, 0, 2, 2, 2, 0] + ); + + // Internal shape: [0..2)->0, [2..4)->1, [4..6)->0, [6..9)->2, + // [9..10)->0 + if let Rep::Compressed { table, runs } = &vm.rep { + // Table is small and de-duplicated. + assert!(table.len() <= 3); + assert_eq!(runs.len(), 5); + } else { + panic!("expected compressed"); + } + } + + #[test] + fn build_from_ranges_with_default_edge_cases() { + let region: Region = extent!(n = 5).into(); + + // Full override covers entire region. + let vm = ValueMesh::from_ranges_with_default(region.clone(), 9, vec![(0..5, 3)]).unwrap(); + assert_eq!(vm.values().collect::>(), vec![3, 3, 3, 3, 3]); + + // Adjacent overrides and default gaps. + let vm = ValueMesh::from_ranges_with_default(region.clone(), 0, vec![(1..2, 7), (2..4, 7)]) + .unwrap(); + assert_eq!(vm.values().collect::>(), vec![0, 7, 7, 7, 0]); + + // Empty region. + let empty_region: Region = extent!(n = 0).into(); + let vm = ValueMesh::from_ranges_with_default(empty_region.clone(), 42, vec![]).unwrap(); + assert_eq!(vm.values().collect::>(), Vec::::new()); + } + + #[test] + fn from_dense_builds_and_compresses() { + let region: Region = extent!(n = 6).into(); + let mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 2, 2, 3, 3]).unwrap(); + + assert_eq!(mesh.region(), ®ion); + assert!(matches!(mesh.rep, Rep::Compressed { .. })); + assert_eq!(mesh.values().collect::>(), vec![1, 1, 2, 2, 3, 3]); + + // Spot-check indexing. + assert_eq!(mesh.get(0), Some(&1)); + assert_eq!(mesh.get(3), Some(&2)); + assert_eq!(mesh.get(5), Some(&3)); + } + + #[test] + fn merge_from_overlay_basic() { + // Base mesh with two contiguous runs. + let region: Region = extent!(n = 8).into(); + let mut mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3]).unwrap(); + + // Overlay replaces middle segment [2..6) with 9s. + let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9)]).unwrap(); + + mesh.merge_from_overlay(overlay).unwrap(); + + // Materialize back into ranges to inspect. + let out = mesh.materialized_runs(); + + // Expected: left prefix (0..2)=1, replaced middle (2..6)=9, tail (6..8)=3. + assert_eq!(out, vec![(0..2, 1), (2..6, 9), (6..8, 3)]); + } + + #[test] + fn merge_from_overlay_multiple_spans() { + // Build mesh with alternating runs. + let region: Region = extent!(m = 12).into(); + let mut mesh = + ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]) + .unwrap(); + + // Overlay has a run that spans across the boundary of two + // left runs and another disjoint run later. + let overlay = ValueOverlay::try_from_runs(vec![(2..6, 9), (9..11, 8)]).unwrap(); + + mesh.merge_from_overlay(overlay).unwrap(); + let out = mesh.materialized_runs(); + + // Expected after merge and re-compression: + // (0..2,1) untouched + // (2..6,9) overwrite of part of [1,2] runs + // (6..9,3) left tail survives + // (9..11,8) overwrite inside [4] run + // (11..12,4) leftover tail + assert_eq!( + out, + vec![(0..2, 1), (2..6, 9), (6..9, 3), (9..11, 8), (11..12, 4)] + ); + } + + #[test] + fn merge_from_overlay_crosses_row_boundary() { + // 2 x 5 region -> 10 linear ranks in row-major order. + let region: Region = extent!(rows = 2, cols = 5).into(); + + // Dense values laid out row-major: + // row 0: [1, 1, 1, 2, 2] + // row 1: [3, 3, 4, 4, 4] + let mut mesh = + ValueMesh::from_dense(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 4, 4, 4]).unwrap(); + + // Overlay that crosses the row boundary: + // linear ranks [3..7) -> 9 + // - tail of row 0: indices 3,4 (the two 2s) + // - head of row 1: indices 5,6 (the two 3s) + let overlay = ValueOverlay::try_from_runs(vec![(3..7, 9)]).unwrap(); + + mesh.merge_from_overlay(overlay).unwrap(); + + // After merge, the dense view should be: + // [1,1,1, 9,9, 9,9, 4,4,4] + let flat: Vec<_> = mesh.values().collect(); + assert_eq!(flat, vec![1, 1, 1, 9, 9, 9, 9, 4, 4, 4]); + + // And the materialized runs should reflect that: + // (0..3,1) | (3..7,9) | (7..10,4) + let runs = mesh.materialized_runs(); + assert_eq!(runs, vec![(0..3, 1), (3..7, 9), (7..10, 4)]); + } } diff --git a/hyperactor_mesh/src/v1/value_mesh/rle.rs b/hyperactor_mesh/src/v1/value_mesh/rle.rs new file mode 100644 index 000000000..8574198dd --- /dev/null +++ b/hyperactor_mesh/src/v1/value_mesh/rle.rs @@ -0,0 +1,490 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +use std::mem::replace; +use std::ops::Range; + +/// Run-length encodes a dense sequence into a table of unique values +/// and a run list, using a caller-provided equivalence predicate. +/// +/// Consumes the dense `values` vector and produces: +/// - `table`: the deduplicated values in **first-occurrence order** +/// - `runs`: a list of `(start..end, id)` where `id` indexes into +/// `table` +/// +/// Two adjacent elements `a`, `b` belong to the same run iff `same(a, +/// b)` returns `true`. In practice `same` should behave like an +/// equivalence relation over adjacent elements (reflexive and +/// symmetric; transitive is not required globally because only +/// **adjacency** is consulted). +/// +/// # Parameters +/// - `values`: dense sequence; exactly one value per rank (consumed) +/// - `same`: predicate deciding adjacency-equality (`same(&a, &b)` => +/// merge) +/// +/// # Returns +/// `(table, runs)` where: +/// - `table.len() >= 0` +/// - `runs` is empty iff `values` is empty +/// +/// # Invariants on the output +/// - `runs` is sorted by `start` and **non-overlapping** +/// - `runs` **exactly covers** `0..values.len()` with contiguous, +/// non-empty half-open intervals +/// - Adjacent runs always refer to **different** `id`s under `same` +/// - For every `(r, id)` in `runs`, `id < table.len()` +/// - For every index `i` in `r`, `values[i]` is `same`-equal to +/// `table[id]` +/// - `table` preserves first-appearance order of distinct (under +/// `same`) values encountered while scanning left-to-right +/// +/// # Examples +/// ``` +/// // values: [A, A, B, B, B, A] +/// // table: [A, B, A] +/// // runs: [(0..2, 0), (2..5, 1), (5..6, 2)] +/// ``` +/// +/// Note: `table` preserves **first-appearance order of each distinct +/// contiguous run**, not global uniqueness - the same logical value +/// may appear multiple times in `table` if it occurs in disjoint +/// runs. +pub fn rle_from_dense(values: Vec, mut same: F) -> (Vec, Vec<(Range, u32)>) +where + F: FnMut(&T, &T) -> bool, +{ + if values.is_empty() { + return (Vec::new(), Vec::new()); + } + + // Consume `values` so we can move (not clone) elements into the table. + let mut iter = values.into_iter(); + let first = iter.next().unwrap(); + + let mut table: Vec = Vec::new(); + let mut runs: Vec<(Range, u32)> = Vec::new(); + + let mut start = 0usize; + table.push(first); // move, not clone + let mut cur_id: u32 = 0; + let mut len = 1usize; + + // Walk through all subsequent elements, closing and opening runs + // whenever the value changes. + for (i, v) in iter.enumerate() { + let idx = i + 1; + if !same(&v, &table[cur_id as usize]) { + runs.push((start..idx, cur_id)); + + // Start a new run + start = idx; + table.push(v); // move, not clone + cur_id = (table.len() - 1) as u32; + } + len = idx + 1; + } + + // Close the final run + runs.push((start..len, cur_id)); + + (table, runs) +} + +/// Converts **normalized** value-bearing runs into a deduplicated +/// table and a run list, coalescing adjacent runs that carry the same +/// value. +/// +/// # Input +/// +/// - `runs`: a vector of `(start..end, value)` pairs that is already +/// **normalized**: +/// - sorted by `(start, end)` in ascending order, +/// - non-empty (`start < end`), +/// - **non-overlapping** (touching is allowed). +/// +/// # Behavior +/// +/// - Builds `table` by **global** deduplication of values in +/// first-appearance order. The same logical value appearing in +/// disjoint places will share a single entry in `table`. +/// - Produces `out` as a list of `(start..end, id)`, where `id` +/// indexes into `table`. +/// - If two consecutive input runs are **touching** (`prev.end == +/// next.start`) and their values are `==`, they are **coalesced** +/// into a single output run referencing the same `id`. +/// +/// # Output invariants +/// - `out` is sorted by `start` and **non-overlapping**; touching +/// runs have different `id`s. +/// - Each `(r, id)` in `out` satisfies `id < table.len()`. +/// - `table` preserves **first-appearance order** of distinct values +/// across the entire input. +/// - The union of ranges in `out` equals the union of ranges in the +/// input `runs`. +/// +/// # Preconditions +/// This function assumes the input `runs` is normalized as described +/// above; it does **not** revalidate or resort the input. Supplying +/// unsorted or overlapping ranges results in unspecified behavior. +/// +/// # Example +/// ``` +/// // Input runs (already sorted & non-overlapping): +/// // [(0..2, A), (2..5, A), (5..6, B), (8..10, A)] +/// // After coalescing equal-adjacent: +/// // [(0..5, A), (5..6, B), (8..10, A)] +/// // table = [A, B] +/// // out = [(0..5, 0), (5..6, 1), (8..10, 0)] +/// ``` +pub fn rle_from_value_runs( + mut runs: Vec<(Range, T)>, +) -> (Vec, Vec<(Range, u32)>) { + if runs.is_empty() { + return (Vec::new(), Vec::new()); + } + // Runs are already normalized by caller (sorted, + // non-overlapping). + let mut table: Vec = Vec::new(); + let mut out: Vec<(Range, u32)> = Vec::new(); + + let mut push_run = |range: Range, v: &T| { + // De-dup table. + let id = if let Some(idx) = table.iter().position(|x| x == v) { + idx as u32 + } else { + table.push(v.clone()); + (table.len() - 1) as u32 + }; + // Coalesce equal-adjacent ids. + if let Some((last_r, last_id)) = out.last_mut() { + if last_r.end == range.start && *last_id == id { + last_r.end = range.end; + return; + } + } + out.push((range, id)); + }; + + for (r, v) in runs.drain(..) { + push_run(r, &v); + } + (table, out) +} + +/// True iff the two half-open ranges overlap. +#[inline] +pub(crate) fn ranges_overlap(a: &Range, b: &Range) -> bool { + a.start < b.end && b.start < a.end +} + +/// Merge two normalized `(Range, T)` run lists with "right wins" +/// overwrite semantics. +/// +/// - Inputs must each be sorted, non-overlapping, coalesced +/// (equal-adjacent merged). +/// - Output is sorted, non-overlapping, coalesced. +/// - On overlaps, `right_in` overwrites `left_in` (last-writer-wins). +pub(crate) fn merge_value_runs( + left_in: Vec<(Range, T)>, + right_in: Vec<(Range, T)>, +) -> Vec<(Range, T)> { + // `out` will hold the merged, coalesced result. + let mut out: Vec<(Range, T)> = Vec::new(); + + // Local helper that appends a run butg coalesces equal-adjacent + // (like RankedValues::append) + let mut append = |range: Range, value: T| { + if let Some((last_r, last_v)) = out.last_mut() { + if last_r.end == range.start && *last_v == value { + last_r.end = range.end; + return; + } + } + out.push((range, value)); + }; + + // Turn each input into forward iterators. + let mut left_iter = left_in.into_iter(); + let mut right_iter = right_in.into_iter(); + + // `left` and `right` are the current cursor items + // (`Option<(Range, T)>`). + let mut left = left_iter.next(); + let mut right = right_iter.next(); + + // Main merge loop: runs as long as both sides have a current + // item. + // + // Invariant: all runs emitted so far are sorted, non-overlapping, + // and coalesced; `left` and `right` point to the next unprocessed + // run on each side. + while left.is_some() && right.is_some() { + // Mutable refs to the current (range, val) pairs so we can + // adjust their start as we carve off emitted pieces. + let (left_ranks, left_value) = left.as_mut().unwrap(); + let (right_ranks, right_value) = right.as_mut().unwrap(); + + if ranges_overlap(left_ranks, right_ranks) { + // Overlap case (half-open ranges): a.start < b.end && + // b.start < a.end. + if *left_value == *right_value { + // Equal-value overlap: merge into one unified run. + // + // We extend the emitted range up to right.end — not + // max(left.end, right.end). This choice ensures: + // - The entire right run is consumed and the right + // iterator advances (guaranteeing progress). + // - If the left run extends further, we just shrink + // its start and handle its tail next iteration. + // This avoids overlap or lookahead while keeping + // output normalized. + let ranks = (left_ranks.start.min(right_ranks.start))..right_ranks.end; + // `replace` consumes the whole right run and advances + // to the next. + + // Consume the right run entirely and move right + // forward. + let (_, value) = replace(&mut right, right_iter.next()).unwrap(); + // Advance `left_ranks.start` to the end of what we + // just emitted; if left run is now empty, advance the + // left iterator. + left_ranks.start = ranks.end; + if left_ranks.is_empty() { + left = left_iter.next(); + } + // Append (coalescing if it touches the previous + // output with the same value). + append(ranks, value); + } else if left_ranks.start < right_ranks.start { + // Different values; left starts first. Emit the + // prefix of left up to right.start — this portion + // cannot be overwritten and can be finalized. + let ranks = left_ranks.start..right_ranks.start; + left_ranks.start = ranks.end; + append(ranks, left_value.clone()); + } else { + // Different values; right starts earlier or equal. + // "Right wins" — emit the right chunk as-is, consuming it + // fully and moving left forward as needed. Then make sure + // no leftover left segments still overlap that right range. + let (ranks, value) = replace(&mut right, right_iter.next()).unwrap(); + + // Clamp the current left start so it never extends into + // the just-emitted right region. + left_ranks.start = left_ranks.start.max(ranks.end); + if left_ranks.is_empty() { + left = left_iter.next(); + } + + // Trim or skip any following left runs that still overlap + // this right run's extent. + while let Some((next_r, _)) = left.as_mut() { + if next_r.start < ranks.end { + next_r.start = next_r.start.max(ranks.end); + if next_r.is_empty() { + left = left_iter.next(); + continue; + } + } + break; + } + append(ranks, value); + } + } else if left_ranks.start < right_ranks.start { + // No overlap, left starts earlier. + let (ranks, value) = replace(&mut left, left_iter.next()).unwrap(); + append(ranks, value); + } else { + // Nov overlap, right starts earlier. + let (ranks, value) = replace(&mut right, right_iter.next()).unwrap(); + append(ranks, value); + } + } + + // Drain whichever side still has runs. They are guaranteed to + // follow all previously emitted ranges. + while let Some((r, v)) = left { + append(r, v); + left = left_iter.next(); + } + while let Some((r, v)) = right { + append(r, v); + right = right_iter.next(); + } + + // Postcondition: `out` is sorted, non-overlapping, coalesced. All + // overlaps resolved by "right-wins" semantics. + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn merge_disjoint_right_after_left() { + let left = vec![(0..5, 1)]; + let right = vec![(7..9, 2)]; + let out = merge_value_runs(left, right); + assert_eq!(out, vec![(0..5, 1), (7..9, 2)]); + } + + #[test] + fn merge_disjoint_right_before_left() { + let left = vec![(7..9, 1)]; + let right = vec![(0..5, 2)]; + let out = merge_value_runs(left, right); + assert_eq!(out, vec![(0..5, 2), (7..9, 1)]); + } + + #[test] + fn overlap_right_wins_simple() { + let left = vec![(0..10, 1)]; + let right = vec![(3..6, 2)]; + let out = merge_value_runs(left, right); + // left prefix, right overwrite, left suffix + assert_eq!(out, vec![(0..3, 1), (3..6, 2), (6..10, 1)]); + } + + #[test] + fn overlap_equal_values_union_to_right_end() { + let left = vec![(0..4, 5)]; + let right = vec![(2..6, 5)]; + let out = merge_value_runs(left, right); + // same value: union emits [0..6) as two pieces depending on + // algorithm's "extend to right.end" rule, but coalescing + // should produce a single run: + assert_eq!(out, vec![(0..6, 5)]); + } + + #[test] + fn overlap_equal_values_with_left_longer() { + let left = vec![(0..8, 5)]; + let right = vec![(2..6, 5)]; + let out = merge_value_runs(left, right); + // equal case extends to right.end first, then left tail + // remains and should coalesce to one; because they’re + // touching & equal: + assert_eq!(out, vec![(0..8, 5)]); + } + + #[test] + fn overlap_right_starts_earlier_right_wins() { + let left = vec![(4..10, 1)]; + let right = vec![(2..6, 2)]; + let out = merge_value_runs(left, right); + // Right chunk first, then left remainder: + assert_eq!(out, vec![(2..6, 2), (6..10, 1)]); + } + + #[test] + fn touching_coalesces_equal() { + let left = vec![(0..3, 1), (3..6, 1)]; + let right = vec![]; + let out = merge_value_runs(left, right); + assert_eq!(out, vec![(0..6, 1)]); + } + + #[test] + fn multi_overlap_mixed_values() { + let left = vec![(0..5, 1), (5..10, 1), (10..15, 3)]; + let right = vec![(3..7, 2), (12..20, 4)]; + let out = merge_value_runs(left, right); + assert_eq!( + out, + vec![(0..3, 1), (3..7, 2), (7..10, 1), (10..12, 3), (12..20, 4)] + ); + } + + #[test] + fn overlap_mixed_values_right_inside_left() { + // Right run sits strictly within a left run of a different + // value. + let left = vec![(0..10, 1)]; + let right = vec![(3..7, 2)]; + let out = merge_value_runs(left, right); + assert_eq!( + out, + vec![(0..3, 1), (3..7, 2), (7..10, 1)], + "right overwrites interior portion of left run" + ); + } + + #[test] + fn overlap_mixed_values_right_spans_multiple_left_runs() { + // Right spans two left runs, overwriting parts of both. + let left = vec![(0..5, 1), (5..10, 2)]; + let right = vec![(3..7, 9)]; + let out = merge_value_runs(left, right); + assert_eq!( + out, + vec![(0..3, 1), (3..7, 9), (7..10, 2)], + "right spans two left runs; left prefix/tail preserved" + ); + } + + #[test] + fn overlap_mixed_values_right_starts_before_left() { + // Right begins before left and overlaps into it. + let left = vec![(5..10, 1)]; + let right = vec![(0..7, 2)]; + let out = merge_value_runs(left, right); + assert_eq!( + out, + vec![(0..7, 2), (7..10, 1)], + "right overwrites head and extends beyond left start" + ); + } + + #[test] + fn overlap_mixed_values_right_ends_after_left() { + // Right starts inside left but extends beyond its end. + let left = vec![(0..5, 1)]; + let right = vec![(3..8, 2)]; + let out = merge_value_runs(left, right); + assert_eq!( + out, + vec![(0..3, 1), (3..8, 2)], + "right overwrites tail and extends past left" + ); + } + + #[test] + fn overlap_mixed_values_multiple_cascading_overlaps() { + // Stress: multiple right runs each cutting through several + // lefts. + let left = vec![(0..4, 1), (4..8, 1), (8..12, 2), (12..16, 3)]; + let right = vec![(2..6, 9), (10..14, 9)]; + let out = merge_value_runs(left, right); + assert_eq!( + out, + vec![ + (0..2, 1), + (2..6, 9), + (6..8, 1), + (8..10, 2), + (10..14, 9), + (14..16, 3) + ], + "multiple right runs spanning multiple left runs, non-overlapping output" + ); + } + + #[test] + fn empty_inputs() { + let out = merge_value_runs::(vec![], vec![]); + assert!(out.is_empty()); + + let out = merge_value_runs(vec![(0..2, 9)], vec![]); + assert_eq!(out, vec![(0..2, 9)]); + + let out = merge_value_runs(vec![], vec![(5..7, 4)]); + assert_eq!(out, vec![(5..7, 4)]); + } +} diff --git a/hyperactor_mesh/src/v1/value_mesh/value_overlay.rs b/hyperactor_mesh/src/v1/value_mesh/value_overlay.rs new file mode 100644 index 000000000..e778ae232 --- /dev/null +++ b/hyperactor_mesh/src/v1/value_mesh/value_overlay.rs @@ -0,0 +1,340 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +use std::fmt; +use std::ops::Range; + +use hyperactor::Named; +use serde::Deserialize; +use serde::Serialize; + +/// Builder error for overlays (structure only; region bounds are +/// checked at merge time). +/// +/// Note: serialization assumes identical pointer width between sender +/// and receiver, as `Range` is not portable across +/// architectures. TODO: introduce a wire‐stable run type. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum BuildError { + /// A run with an empty range (`start == end`) was provided. + EmptyRange, + + /// Two runs overlap or are unsorted: `prev.end > next.start`. The + /// offending ranges are returned for debugging. + OverlappingRanges { + prev: Range, + next: Range, + }, + + /// A run exceeds the region bounds when applying an overlay + /// merge. + OutOfBounds { + range: Range, + region_len: usize, + }, +} + +impl fmt::Display for BuildError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BuildError::EmptyRange => { + write!(f, "a run with an empty range (start == end) was provided") + } + BuildError::OverlappingRanges { prev, next } => write!( + f, + "overlapping or unsorted runs: prev={:?}, next={:?}", + prev, next + ), + BuildError::OutOfBounds { range, region_len } => write!( + f, + "range {:?} exceeds region bounds (len={})", + range, region_len + ), + } + } +} + +impl std::error::Error for BuildError {} + +/// A sparse overlay of rank ranges and values, used to assemble or +/// patch a [`ValueMesh`] without materializing per-rank data. +/// +/// Unlike `ValueMesh`, which always represents a complete, gap-free +/// mapping over a [`Region`], a `ValueOverlay` is intentionally +/// partial: it may describe only the ranks that have changed. This +/// allows callers to build and merge small, incremental updates +/// efficiently, while preserving the `ValueMesh` invariants after +/// merge. +/// +/// Invariants: +/// - Runs are sorted by `(start, end)`. +/// - Runs are non-empty and non-overlapping. +/// - Adjacent runs with equal values are coalesced. +/// - Region bounds are validated when the overlay is merged, not on +/// insert. +/// +/// Note: serialization assumes identical pointer width between sender +/// and receiver, as `Range` is not portable across +/// architectures. TODO: introduce a wire‐stable run type. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Named, Default)] +pub struct ValueOverlay { + runs: Vec<(Range, T)>, +} + +impl ValueOverlay { + /// Creates an empty overlay. + pub fn new() -> Self { + Self { runs: Vec::new() } + } + + /// Returns an iterator over the internal runs. + pub fn runs(&self) -> impl Iterator, T)> { + self.runs.iter() + } + + /// Current number of runs. + pub fn len(&self) -> usize { + self.runs.len() + } + + /// Returns `true` if the overlay contains no runs. This indicates + /// that no ranges have been added — i.e., the overlay represents + /// an empty or no-op patch. + pub fn is_empty(&self) -> bool { + self.runs.is_empty() + } +} + +impl ValueOverlay { + /// Adds a `(range, value)` run while maintaining the overlay + /// invariants. + /// + /// Fast path: + /// - If `range` is **after** the last run (`last.end <= + /// range.start`), this is an O(1) append. If it **touches** the + /// last run and `value` is equal, the two runs are **coalesced** + /// by extending `end`. + /// + /// Slow path: + /// - If the input is **unsorted** or **overlaps** the last run, + /// the method falls back to a full **normalize** (sort + + /// coalesce + overlap check), making this call **O(n log n)** + /// in the number of runs. + /// + /// Errors: + /// - Returns `BuildError::EmptyRange` if `range.is_empty()`. + /// + /// Notes & guidance: + /// - Use this for **already-sorted, non-overlapping** appends to + /// get the cheap fast path. + /// - For **bulk or unsorted** inserts, prefer + /// `ValueOverlay::try_from_runs` (collect → sort → coalesce) to + /// avoid repeated re-normalization. + /// - Adjacent equal-value runs are coalesced automatically. + /// + /// # Examples + /// ```ignore + /// let mut ov = ValueOverlay::new(); + /// ov.push_run(0..3, 1).unwrap(); // append + /// ov.push_run(3..5, 1).unwrap(); // coalesces to (0..5, 1) + /// + /// // Unsorted input triggers normalize (O(n log n)): + /// ov.push_run(1..2, 2).unwrap(); // re-sorts, checks overlaps, coalesces + /// ``` + pub fn push_run(&mut self, range: Range, value: T) -> Result<(), BuildError> { + // Reject empty ranges. + if range.is_empty() { + return Err(BuildError::EmptyRange); + } + + // Look at the last run. + match self.runs.last_mut() { + // The common case is appending in sorted order. Fast-path + // append if new run is after the last and + // non-overlapping. + Some((last_r, last_v)) if last_r.end <= range.start => { + if last_r.end == range.start && *last_v == value { + // Coalesce equal-adjacent. + last_r.end = range.end; + return Ok(()); + } + self.runs.push((range, value)); + Ok(()) + } + // The overlay was previously empty or, the caller + // inserted out of order (unsorted input). + _ => { + // Slow path. Re-sort, merge and validate the full + // runs vector. + self.runs.push((range, value)); + Self::normalize(&mut self.runs) + } + } + } + + /// Sorts, checks for overlaps, and coalesces equal-adjacent runs + /// in-place. + fn normalize(v: &mut Vec<(Range, T)>) -> Result<(), BuildError> { + // Early exit for empty overlays. + if v.is_empty() { + return Ok(()); + } + + // After this, ever later range has start >= prev.start. If + // any later start < prev.end it's an overlap. + v.sort_by_key(|(r, _)| (r.start, r.end)); + + // Build a fresh vector to collect cleaned using drain(..) on + // the input avoiding clone(). + let mut out: Vec<(Range, T)> = Vec::with_capacity(v.len()); + for (r, val) in v.drain(..) { + if let Some((prev_r, prev_v)) = out.last_mut() { + // If the next run's start is before the previous run's + // end we have an overlapping interval. + if r.start < prev_r.end { + return Err(BuildError::OverlappingRanges { + prev: prev_r.clone(), + next: r, + }); + } + // If the previous run touches the new run and has the + // same value, merge them by extending the end + // boundary. + if prev_r.end == r.start && *prev_v == val { + // Coalesce equal-adjacent. + prev_r.end = r.end; + continue; + } + } + // Otherwise, push as a new independent run. + out.push((r, val)); + } + + // Replace the old vector. + *v = out; + + // Invariant: Runs is sorted, non-overlapping and coalesced. + Ok(()) + } + + /// Builds an overlay from arbitrary runs, validating structure + /// and coalescing equal-adjacent. + pub fn try_from_runs(runs: I) -> Result + where + I: IntoIterator, T)>, + { + // We need a modifiable buffer to sort and normalize so we + // eagerly collect the iterator. + let mut v: Vec<(Range, T)> = runs.into_iter().collect(); + + // Reject empties up-front. Empty intervals are structurally + // invalid for an overlay. Fail fast. + for (r, _) in &v { + if r.is_empty() { + return Err(BuildError::EmptyRange); + } + } + + // Sort by (start, end). + v.sort_by_key(|(r, _)| (r.start, r.end)); + + // Normalize (validate + coalesce). + Self::normalize(&mut v)?; + + // Invariant: Runs is sorted, non-overlapping and coalesced. + Ok(Self { runs: v }) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn push_run_appends_and_coalesces() { + let mut ov = ValueOverlay::new(); + + // First insert. + ov.push_run(0..3, 1).unwrap(); + assert_eq!(ov.runs, vec![(0..3, 1)]); + + // Non-overlapping append. + ov.push_run(5..7, 2).unwrap(); + assert_eq!(ov.runs, vec![(0..3, 1), (5..7, 2)]); + + // Coalesce equal-adjacent (touching with same value). + ov.push_run(7..10, 2).unwrap(); + assert_eq!(ov.runs, vec![(0..3, 1), (5..10, 2)]); + } + + #[test] + fn push_run_detects_overlap() { + let mut ov = ValueOverlay::new(); + ov.push_run(0..3, 1).unwrap(); + + // Overlaps 2..4 with existing 0..3. + let err = ov.push_run(2..4, 9).unwrap_err(); + assert!(matches!(err, BuildError::OverlappingRanges { .. })); + } + + #[test] + fn push_run_handles_unsorted_inserts() { + let mut ov = ValueOverlay::new(); + // Insert out of order; normalize should sort and coalesce. + ov.push_run(10..12, 3).unwrap(); + ov.push_run(5..8, 2).unwrap(); // Unsorted relative to last. + ov.push_run(8..10, 2).unwrap(); // Coalesce with previous. + + assert_eq!(ov.runs, vec![(5..10, 2), (10..12, 3)]); + } + + #[test] + fn try_from_runs_builds_and_coalesces() { + use super::ValueOverlay; + + // Unsorted, with adjacent equal-value ranges that should + // coalesce. + let ov = ValueOverlay::try_from_runs(vec![(8..10, 2), (5..8, 2), (12..14, 3)]).unwrap(); + + assert_eq!(ov.runs, vec![(5..10, 2), (12..14, 3)]); + } + + #[test] + fn try_from_runs_rejects_overlap_and_empty() { + // Overlap should error. + let err = ValueOverlay::try_from_runs(vec![(0..3, 1), (2..5, 2)]).unwrap_err(); + assert!(matches!(err, BuildError::OverlappingRanges { .. })); + + // Empty range should error. + let err = ValueOverlay::try_from_runs(vec![(0..0, 1)]).unwrap_err(); + assert!(matches!(err, BuildError::EmptyRange)); + } + + #[test] + fn is_empty_reflects_state() { + let mut ov = ValueOverlay::::new(); + assert!(ov.is_empty()); + + ov.push_run(0..1, 7).unwrap(); + assert!(!ov.is_empty()); + } + + #[test] + fn normalize_sorts_coalesces_and_detects_overlap() { + // 1) Sort + coalesce equal-adjacent. + let mut v = vec![(5..7, 2), (3..5, 2), (7..9, 2)]; // unsorted, all value=2 + ValueOverlay::::normalize(&mut v).unwrap(); + assert_eq!(v, vec![(3..9, 2)]); + + // 2) Overlap triggers error. + let mut v = vec![(3..6, 1), (5..8, 2)]; + let err = ValueOverlay::::normalize(&mut v).unwrap_err(); + assert!(matches!(err, BuildError::OverlappingRanges { .. })); + } +} diff --git a/monarch_hyperactor/src/value_mesh.rs b/monarch_hyperactor/src/value_mesh.rs index d518064b1..64d9fd9ac 100644 --- a/monarch_hyperactor/src/value_mesh.rs +++ b/monarch_hyperactor/src/value_mesh.rs @@ -39,8 +39,17 @@ impl PyValueMesh { let vals: Vec> = values.extract()?; // Build & validate cardinality against region. - let inner = > as BuildFromRegion>>::build_dense(region, vals) - .map_err(|e| PyValueError::new_err(e.to_string()))?; + let mut inner = + > as BuildFromRegion>>::build_dense(region, vals) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + // Coalesce adjacent identical Python objects (same pointer + // identity). For Py, we treat equality as object + // identity: consecutive references to the *same* object + // pointer are merged into RLE runs. This tends to compress + // sentinel/categorical/boolean data, but not freshly + // allocated numerics/strings. + inner.compress_adjacent_in_place_by(|a, b| a.as_ptr() == b.as_ptr()); Ok(Self { inner }) } @@ -71,6 +80,7 @@ impl PyValueMesh { // Py. `unwrap` is safe because the bounds have been // checked. let v: Py = self.inner.get(rank).unwrap().clone(); + Ok(v) } @@ -84,9 +94,19 @@ impl PyValueMesh { // Preserve the shape's original Slice (offset/strides). let s = shape.get_inner(); let region = Region::new(s.labels().to_vec(), s.slice().clone()); - let inner = > as ndslice::view::BuildFromRegionIndexed>> - ::build_indexed(region, pairs) - .map_err(|e| PyValueError::new_err(e.to_string()))?; + let mut inner = > as ndslice::view::BuildFromRegionIndexed< + Py, + >>::build_indexed(region, pairs) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + // Coalesce adjacent identical Python objects (same pointer + // identity). For Py, we treat equality as object + // identity: consecutive references to the *same* object + // pointer are merged into RLE runs. This tends to compress + // sentinel/categorical/boolean data, but not freshly + // allocated numerics/strings. + inner.compress_adjacent_in_place_by(|a, b| a.as_ptr() == b.as_ptr()); + Ok(Self { inner }) } }