Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions hyperactor_mesh/src/proc_mesh/mesh_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
cx: &Context<Self>,
get_rank_status: resource::GetRankStatus,
) -> anyhow::Result<()> {
use crate::resource::Status;
use crate::v1::StatusOverlay;

let (rank, status) = match self.actor_states.get(&get_rank_status.name) {
Some(ActorInstanceState {
spawn: Ok(actor_id),
Expand All @@ -560,9 +563,9 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
(
*create_rank,
if supervision_events.is_empty() {
resource::Status::Running
Status::Running
} else {
resource::Status::Failed(format!(
Status::Failed(format!(
"because of supervision events: {:?}",
supervision_events
))
Expand All @@ -572,12 +575,21 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
Some(ActorInstanceState {
spawn: Err(e),
create_rank,
}) => (*create_rank, resource::Status::Failed(e.to_string())),
}) => (*create_rank, Status::Failed(e.to_string())),
// TODO: represent unknown rank
None => (usize::MAX, resource::Status::NotExist),
None => (usize::MAX, Status::NotExist),
};

// Send a sparse overlay update. If rank is unknown, emit an
// empty overlay.
let overlay = if rank == usize::MAX {
StatusOverlay::new()
} else {
StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
.expect("valid single-run overlay")
};
get_rank_status.reply.send(cx, overlay)?;

get_rank_status.reply.send(cx, (rank, status).into())?;
Ok(())
}
}
Expand Down
96 changes: 38 additions & 58 deletions hyperactor_mesh/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::mem::replace;
use std::mem::take;
use std::ops::Deref;
Expand All @@ -31,18 +30,17 @@ use hyperactor::PortRef;
use hyperactor::RefClient;
use hyperactor::RemoteMessage;
use hyperactor::Unbind;
use hyperactor::accum::Accumulator;
use hyperactor::accum::CommReducer;
use hyperactor::accum::ReducerFactory;
use hyperactor::accum::ReducerSpec;
use hyperactor::mailbox::PortReceiver;
use hyperactor::message::Bind;
use hyperactor::message::Bindings;
use hyperactor::message::Unbind;
use ndslice::Region;
use ndslice::ViewExt;
use serde::Deserialize;
use serde::Serialize;

use crate::v1::Name;
use crate::v1::StatusOverlay;

/// The current lifecycle status of a resource.
#[derive(
Expand Down Expand Up @@ -120,8 +118,12 @@ impl Bind for Rank {
}
}

/// Get the status of a resource at a rank. This message is designed to be
/// cast and efficiently accumulated.
/// Get the status of a resource across the mesh.
///
/// This message is cast to all ranks; each rank replies with a sparse
/// status **overlay**. The comm reducer merges overlays (right-wins)
/// and the accumulator applies them to produce **full StatusMesh
/// snapshots** on the receiver side.
#[derive(
Clone,
Debug,
Expand All @@ -137,34 +139,50 @@ impl Bind for Rank {
pub struct GetRankStatus {
/// The name of the resource.
pub name: Name,
/// The status of the rank.
/// Sparse status updates (overlays) from a rank.
#[binding(include)]
pub reply: PortRef<RankedValues<Status>>,
pub reply: PortRef<StatusOverlay>,
}

impl GetRankStatus {
pub async fn wait(
mut rx: PortReceiver<RankedValues<Status>>,
mut rx: PortReceiver<crate::v1::StatusMesh>,
num_ranks: usize,
max_idle_time: Duration,
) -> Result<RankedValues<Status>, RankedValues<Status>> {
region: Region, // used only for fallback
) -> Result<crate::v1::StatusMesh, crate::v1::StatusMesh> {
debug_assert_eq!(region.num_ranks(), num_ranks, "region/num_ranks mismatch");

let mut alarm = hyperactor::time::Alarm::new();
alarm.arm(max_idle_time);
let mut statuses = RankedValues::default();

// Fallback snapshot if we time out before receiving anything.
let mut snapshot =
crate::v1::StatusMesh::from_single(region, crate::resource::Status::NotExist);

loop {
let mut sleeper = alarm.sleeper();
tokio::select! {
_ = sleeper.sleep() => return Err(statuses),
new_statuses = rx.recv() => {
match new_statuses {
Ok(new_statuses) => statuses = new_statuses,
Err(_) => return Err(statuses),
_ = sleeper.sleep() => return Err(snapshot),
next = rx.recv() => {
match next {
Ok(mesh) => { snapshot = mesh; } // latest-wins snapshot
Err(_) => return Err(snapshot),
}
}
}

alarm.arm(max_idle_time);
if statuses.rank(num_ranks) == num_ranks {
break Ok(statuses);

// Completion: once every rank (among the first
// `num_ranks`) has reported at least something (i.e.
// moved off NotExist).
if snapshot
.values()
.take(num_ranks)
.all(|s| !matches!(s, crate::resource::Status::NotExist))
{
break Ok(snapshot);
}
}
}
Expand Down Expand Up @@ -302,7 +320,7 @@ impl<T: Clone> RankedValues<T> {
pub fn materialized_iter(&self, until: usize) -> impl Iterator<Item = &T> + '_ {
assert_eq!(self.rank(until), until, "insufficient rank");
self.iter()
.flat_map(|(range, value)| std::iter::repeat(value).take(range.end - range.start))
.flat_map(|(range, value)| std::iter::repeat_n(value, range.end - range.start))
}
}

Expand Down Expand Up @@ -486,44 +504,6 @@ impl<T> FromIterator<(Range<usize>, T)> for RankedValues<T> {
}
}

impl<T: Eq + Clone + Named> Accumulator for RankedValues<T> {
type State = Self;
type Update = Self;

fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
state.merge_from(update);
Ok(())
}

fn reducer_spec(&self) -> Option<ReducerSpec> {
Some(ReducerSpec {
typehash: <RankedValuesReducer<T> as Named>::typehash(),
builder_params: None,
})
}
}

#[derive(Named)]
struct RankedValuesReducer<T>(std::marker::PhantomData<T>);

impl<T: Hash + Eq + Ord + Clone> CommReducer for RankedValuesReducer<T> {
type Update = RankedValues<T>;

fn reduce(&self, mut left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
left.merge_from(right);
Ok(left)
}
}

// register for concrete types:

hyperactor::submit! {
ReducerFactory {
typehash_f: <RankedValuesReducer<Status> as Named>::typehash,
builder_f: |_| Ok(Box::new(RankedValuesReducer::<Status>(PhantomData))),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
19 changes: 19 additions & 0 deletions hyperactor_mesh/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ use serde::Deserialize;
use serde::Serialize;
pub use value_mesh::ValueMesh;

/// A mesh of per-rank lifecycle statuses.
///
/// `StatusMesh` is `ValueMesh<Status>` and supports dense or
/// compressed encodings. Updates are applied via sparse overlays with
/// **last-writer-wins** semantics (see
/// [`ValueMesh::merge_from_overlay`]). The mesh's `Region` defines
/// the rank space; all updates must match that region.
pub type StatusMesh = ValueMesh<Status>;

/// A sparse set of `(Range<usize>, Status)` updates for a
/// [`StatusMesh`].
///
/// `StatusOverlay` carries **normalized** runs (sorted,
/// non-overlapping, and coalesced). Applying an overlay to a
/// `StatusMesh` uses **right-wins** semantics on overlap and
/// preserves first-appearance order in the compressed table.
/// Construct via `ValueOverlay::try_from_runs` after normalizing.
pub type StatusOverlay = value_mesh::ValueOverlay<Status>;

use crate::resource;
use crate::resource::RankedValues;
use crate::resource::Status;
Expand Down
74 changes: 58 additions & 16 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use hyperactor::config::CONFIG;
use hyperactor::config::ConfigAttr;
use hyperactor::declare_attrs;
use tokio::time::timeout;

pub mod mesh_agent;

use std::collections::HashSet;
Expand Down Expand Up @@ -539,21 +540,45 @@ impl HostMeshRef {
.concat(&per_host)
.map_err(|err| v1::Error::ConfigurationError(err.into()))?;

let region: Region = extent.clone().into();
let mesh_name = Name::new(name);

// Helper: legacy shim for error types that still require
// RankedValues<Status>. TODO(shayne-fletcher): Delete this
// shim once Error::ActorSpawnError carries a StatusMesh
// (ValueMesh<Status>) directly. At that point, use the mesh
// as-is and remove `mesh_to_rankedvalues_*` calls below.
fn mesh_to_rankedvalues_with_default(
mesh: &crate::v1::StatusMesh,
default_fill: Status,
len: usize,
) -> RankedValues<Status> {
let mut out = RankedValues::from((0..len, default_fill));
for (i, s) in mesh.values().enumerate() {
if !matches!(s, Status::NotExist) {
out.merge_from(RankedValues::from((i..i + 1, s.clone())));
}
}
out
}

let mut procs = Vec::new();
let num_ranks = self.region().num_ranks() * per_host.num_ranks();
let num_ranks = region.num_ranks();
// Accumulator outputs full StatusMesh snapshots; seed with
// NotExist.
let (port, rx) = cx.mailbox().open_accum_port_opts(
RankedValues::default(),
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
Some(ReducerOpts {
max_update_interval: Some(Duration::from_millis(50)),
}),
);

// We CreateOrUpdate each proc, and then fence on getting statuses back.
// This is currently necessary because otherwise there is a race between
// the procs being created, and subsequent messages becoming unroutable
// (the agent actor manages the local muxer). We can solve this by allowing
// buffering in the host-level muxer.
// Create or update each proc, then fence on receiving status
// overlays. This prevents a race where procs become
// addressable before their local muxers are ready, which
// could make early messages unroutable. A future improvement
// would allow buffering in the host-level muxer to eliminate
// the need for this synchronization step.
let mut proc_names = Vec::new();
for (host_rank, host) in self.ranks.iter().enumerate() {
for per_host_rank in 0..per_host.num_ranks() {
Expand Down Expand Up @@ -589,9 +614,24 @@ impl HostMeshRef {

let start_time = RealClock.now();

match GetRankStatus::wait(rx, num_ranks, config::global::get(PROC_SPAWN_MAX_IDLE)).await {
// Wait on accumulated StatusMesh snapshots until complete or
// timeout.
match GetRankStatus::wait(
rx,
num_ranks,
config::global::get(PROC_SPAWN_MAX_IDLE),
region.clone(), // fallback mesh if nothing arrives
)
.await
{
Ok(statuses) => {
if let Some((rank, status)) = statuses.first_terminating() {
// If any rank is terminating, surface a
// ProcCreationError pointing at that rank.
if let Some((rank, status)) = statuses
.values()
.enumerate()
.find(|(_, s)| s.is_terminating())
{
let proc_name = &proc_names[rank];
let host_rank = rank / per_host.num_ranks();
let mesh_agent = self.ranks[host_rank].mesh_agent();
Expand All @@ -612,18 +652,20 @@ impl HostMeshRef {
let proc_name = Name::new(format!("{}-{}", name, rank % per_host.num_ranks()));
return Err(v1::Error::ProcCreationError {
state,
mesh_agent,
host_rank,
mesh_agent,
});
}
}
Err(complete) => {
// Fill the remaining statuses with a timeout error.
let mut statuses =
RankedValues::from((0..num_ranks, Status::Timeout(start_time.elapsed())));
statuses.merge_from(complete);

return Err(v1::Error::ProcSpawnError { statuses });
// Fill remaining ranks with a timeout status via the
// legacy shim.
let legacy = mesh_to_rankedvalues_with_default(
&complete,
Status::Timeout(start_time.elapsed()),
num_ranks,
);
return Err(v1::Error::ProcSpawnError { statuses: legacy });
}
}

Expand Down
24 changes: 13 additions & 11 deletions hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,22 @@ impl Handler<resource::GetRankStatus> for HostMeshAgent {
cx: &Context<Self>,
get_rank_status: resource::GetRankStatus,
) -> anyhow::Result<()> {
let Some(created) = self.created.get(&get_rank_status.name) else {
// TODO: how can we get the host's rank here? we should model its absence explicitly.
get_rank_status
.reply
.send(cx, (usize::MAX, resource::Status::NotExist).into())?;
return Ok(());
};
use crate::resource::Status;
use crate::v1::StatusOverlay;

let rank_status = match created {
(rank, Ok(_)) => (*rank, resource::Status::Running),
(rank, Err(e)) => (*rank, resource::Status::Failed(e.to_string())),
let (rank, status) = match self.created.get(&get_rank_status.name) {
Some((rank, Ok(_))) => (*rank, Status::Running),
Some((rank, Err(e))) => (*rank, Status::Failed(e.to_string())),
None => (usize::MAX, Status::NotExist),
};
get_rank_status.reply.send(cx, rank_status.into())?;

let overlay = if rank == usize::MAX {
StatusOverlay::new()
} else {
StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
.expect("valid single-run overlay")
};
get_rank_status.reply.send(cx, overlay)?;
Ok(())
}
}
Expand Down
Loading