Skip to content

Commit 216826b

Browse files
shayne-fletchermeta-codesync[bot]
authored andcommitted
v1: migrate to ValueMesh accumulator (meta-pytorch#1486)
Summary: Pull Request resolved: meta-pytorch#1486 his change replaces the old `RankedValues<Status>` path with `ValueOverlay<Status>` updates reduced into a `ValueMesh<Status>` snapshot. `GetRankStatus.reply` now carries overlays; the comm reducer merges them (right-wins) and the accumulator applies them into a full `StatusMesh`, seeded to `NotExist` via the accumulator template (`StatusMesh::from_single(region, NotExist)`) and `ValueMesh::default()` + first-update install. agents emit a single-rank overlay (or an empty overlay when the rank is unknown). `wait()` consumes snapshots, declares completion when no `NotExist` remain for the target ranks, and on timeout returns the last snapshot (falling back to the seed if nothing arrived). termination detection now checks `values().any(is_terminating)`. legacy shims convert `StatusMesh` back to `RankedValues` only where error types still require it. removed the `RankedValues` accumulator/reducer and its registration. Differential Revision: D84318425
1 parent 1f021c9 commit 216826b

File tree

6 files changed

+236
-115
lines changed

6 files changed

+236
-115
lines changed

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,9 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
548548
cx: &Context<Self>,
549549
get_rank_status: resource::GetRankStatus,
550550
) -> anyhow::Result<()> {
551+
use crate::resource::Status;
552+
use crate::v1::StatusOverlay;
553+
551554
let (rank, status) = match self.actor_states.get(&get_rank_status.name) {
552555
Some(ActorInstanceState {
553556
spawn: Ok(actor_id),
@@ -560,9 +563,9 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
560563
(
561564
*create_rank,
562565
if supervision_events.is_empty() {
563-
resource::Status::Running
566+
Status::Running
564567
} else {
565-
resource::Status::Failed(format!(
568+
Status::Failed(format!(
566569
"because of supervision events: {:?}",
567570
supervision_events
568571
))
@@ -572,12 +575,21 @@ impl Handler<resource::GetRankStatus> for ProcMeshAgent {
572575
Some(ActorInstanceState {
573576
spawn: Err(e),
574577
create_rank,
575-
}) => (*create_rank, resource::Status::Failed(e.to_string())),
578+
}) => (*create_rank, Status::Failed(e.to_string())),
576579
// TODO: represent unknown rank
577-
None => (usize::MAX, resource::Status::NotExist),
580+
None => (usize::MAX, Status::NotExist),
581+
};
582+
583+
// Send a sparse overlay update. If rank is unknown, emit an
584+
// empty overlay.
585+
let overlay = if rank == usize::MAX {
586+
StatusOverlay::new()
587+
} else {
588+
StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
589+
.expect("valid single-run overlay")
578590
};
591+
get_rank_status.reply.send(cx, overlay)?;
579592

580-
get_rank_status.reply.send(cx, (rank, status).into())?;
581593
Ok(())
582594
}
583595
}

hyperactor_mesh/src/resource.rs

Lines changed: 38 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use std::collections::HashMap;
1414
use std::fmt;
1515
use std::fmt::Debug;
1616
use std::hash::Hash;
17-
use std::marker::PhantomData;
1817
use std::mem::replace;
1918
use std::mem::take;
2019
use std::ops::Deref;
@@ -31,19 +30,17 @@ use hyperactor::PortRef;
3130
use hyperactor::RefClient;
3231
use hyperactor::RemoteMessage;
3332
use hyperactor::Unbind;
34-
use hyperactor::accum::Accumulator;
35-
use hyperactor::accum::CommReducer;
36-
use hyperactor::accum::ReducerFactory;
37-
use hyperactor::accum::ReducerSpec;
38-
use hyperactor::mailbox::MailboxError;
3933
use hyperactor::mailbox::PortReceiver;
4034
use hyperactor::message::Bind;
4135
use hyperactor::message::Bindings;
4236
use hyperactor::message::Unbind;
37+
use ndslice::Region;
38+
use ndslice::ViewExt;
4339
use serde::Deserialize;
4440
use serde::Serialize;
4541

4642
use crate::v1::Name;
43+
use crate::v1::StatusOverlay;
4744

4845
/// The current lifecycle status of a resource.
4946
#[derive(
@@ -119,8 +116,12 @@ impl Bind for Rank {
119116
}
120117
}
121118

122-
/// Get the status of a resource at a rank. This message is designed to be
123-
/// cast and efficiently accumulated.
119+
/// Get the status of a resource across the mesh.
120+
///
121+
/// This message is cast to all ranks; each rank replies with a sparse
122+
/// status **overlay**. The comm reducer merges overlays (right-wins)
123+
/// and the accumulator applies them to produce **full StatusMesh
124+
/// snapshots** on the receiver side.
124125
#[derive(
125126
Clone,
126127
Debug,
@@ -136,34 +137,50 @@ impl Bind for Rank {
136137
pub struct GetRankStatus {
137138
/// The name of the resource.
138139
pub name: Name,
139-
/// The status of the rank.
140+
/// Sparse status updates (overlays) from a rank.
140141
#[binding(include)]
141-
pub reply: PortRef<RankedValues<Status>>,
142+
pub reply: PortRef<StatusOverlay>,
142143
}
143144

144145
impl GetRankStatus {
145146
pub async fn wait(
146-
mut rx: PortReceiver<RankedValues<Status>>,
147+
mut rx: PortReceiver<crate::v1::StatusMesh>,
147148
num_ranks: usize,
148149
max_idle_time: Duration,
149-
) -> Result<RankedValues<Status>, RankedValues<Status>> {
150+
region: Region, // used only for fallback
151+
) -> Result<crate::v1::StatusMesh, crate::v1::StatusMesh> {
152+
debug_assert_eq!(region.num_ranks(), num_ranks, "region/num_ranks mismatch");
153+
150154
let mut alarm = hyperactor::time::Alarm::new();
151155
alarm.arm(max_idle_time);
152-
let mut statuses = RankedValues::default();
156+
157+
// Fallback snapshot if we time out before receiving anything.
158+
let mut snapshot =
159+
crate::v1::StatusMesh::from_single(region, crate::resource::Status::NotExist);
160+
153161
loop {
154162
let mut sleeper = alarm.sleeper();
155163
tokio::select! {
156-
_ = sleeper.sleep() => return Err(statuses),
157-
new_statuses = rx.recv() => {
158-
match new_statuses {
159-
Ok(new_statuses) => statuses = new_statuses,
160-
Err(_) => return Err(statuses),
164+
_ = sleeper.sleep() => return Err(snapshot),
165+
next = rx.recv() => {
166+
match next {
167+
Ok(mesh) => { snapshot = mesh; } // latest-wins snapshot
168+
Err(_) => return Err(snapshot),
161169
}
162170
}
163171
}
172+
164173
alarm.arm(max_idle_time);
165-
if statuses.rank(num_ranks) == num_ranks {
166-
break Ok(statuses);
174+
175+
// Completion: once every rank (among the first
176+
// `num_ranks`) has reported at least something (i.e.
177+
// moved off NotExist).
178+
if snapshot
179+
.values()
180+
.take(num_ranks)
181+
.all(|s| !matches!(s, crate::resource::Status::NotExist))
182+
{
183+
break Ok(snapshot);
167184
}
168185
}
169186
}
@@ -291,7 +308,7 @@ impl<T: Clone> RankedValues<T> {
291308
pub fn materialized_iter(&self, until: usize) -> impl Iterator<Item = &T> + '_ {
292309
assert_eq!(self.rank(until), until, "insufficient rank");
293310
self.iter()
294-
.flat_map(|(range, value)| std::iter::repeat(value).take(range.end - range.start))
311+
.flat_map(|(range, value)| std::iter::repeat_n(value, range.end - range.start))
295312
}
296313
}
297314

@@ -475,44 +492,6 @@ impl<T> FromIterator<(Range<usize>, T)> for RankedValues<T> {
475492
}
476493
}
477494

478-
impl<T: Eq + Clone + Named> Accumulator for RankedValues<T> {
479-
type State = Self;
480-
type Update = Self;
481-
482-
fn accumulate(&self, state: &mut Self::State, update: Self::Update) -> anyhow::Result<()> {
483-
state.merge_from(update);
484-
Ok(())
485-
}
486-
487-
fn reducer_spec(&self) -> Option<ReducerSpec> {
488-
Some(ReducerSpec {
489-
typehash: <RankedValuesReducer<T> as Named>::typehash(),
490-
builder_params: None,
491-
})
492-
}
493-
}
494-
495-
#[derive(Named)]
496-
struct RankedValuesReducer<T>(std::marker::PhantomData<T>);
497-
498-
impl<T: Hash + Eq + Ord + Clone> CommReducer for RankedValuesReducer<T> {
499-
type Update = RankedValues<T>;
500-
501-
fn reduce(&self, mut left: Self::Update, right: Self::Update) -> anyhow::Result<Self::Update> {
502-
left.merge_from(right);
503-
Ok(left)
504-
}
505-
}
506-
507-
// register for concrete types:
508-
509-
hyperactor::submit! {
510-
ReducerFactory {
511-
typehash_f: <RankedValuesReducer<Status> as Named>::typehash,
512-
builder_f: |_| Ok(Box::new(RankedValuesReducer::<Status>(PhantomData))),
513-
}
514-
}
515-
516495
#[cfg(test)]
517496
mod tests {
518497
use super::*;

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use hyperactor::config;
1414
use hyperactor::config::CONFIG;
1515
use hyperactor::config::ConfigAttr;
1616
use hyperactor::declare_attrs;
17+
18+
use crate::resource::RankedValues;
1719
pub mod mesh_agent;
1820

1921
use std::collections::HashSet;
@@ -43,7 +45,6 @@ use crate::resource;
4345
use crate::resource::CreateOrUpdateClient;
4446
use crate::resource::GetRankStatus;
4547
use crate::resource::GetRankStatusClient;
46-
use crate::resource::RankedValues;
4748
use crate::resource::Status;
4849
use crate::v1;
4950
use crate::v1::Name;
@@ -537,21 +538,45 @@ impl HostMeshRef {
537538
.concat(&per_host)
538539
.map_err(|err| v1::Error::ConfigurationError(err.into()))?;
539540

541+
let region: Region = extent.clone().into();
540542
let mesh_name = Name::new(name);
543+
544+
// Helper: legacy shim for error types that still require
545+
// RankedValues<Status>. TODO(shayne-fletcher): Delete this
546+
// shim once Error::ActorSpawnError carries a StatusMesh
547+
// (ValueMesh<Status>) directly. At that point, use the mesh
548+
// as-is and remove `mesh_to_rankedvalues_*` calls below.
549+
fn mesh_to_rankedvalues_with_default(
550+
mesh: &crate::v1::StatusMesh,
551+
default_fill: Status,
552+
len: usize,
553+
) -> RankedValues<Status> {
554+
let mut out = RankedValues::from((0..len, default_fill));
555+
for (i, s) in mesh.values().enumerate() {
556+
if !matches!(s, Status::NotExist) {
557+
out.merge_from(RankedValues::from((i..i + 1, s.clone())));
558+
}
559+
}
560+
out
561+
}
562+
541563
let mut procs = Vec::new();
542-
let num_ranks = self.region().num_ranks() * per_host.num_ranks();
564+
let num_ranks = region.num_ranks();
565+
// Accumulator outputs full StatusMesh snapshots; seed with
566+
// NotExist.
543567
let (port, rx) = cx.mailbox().open_accum_port_opts(
544-
RankedValues::default(),
568+
crate::v1::StatusMesh::from_single(region.clone(), Status::NotExist),
545569
Some(ReducerOpts {
546570
max_update_interval: Some(Duration::from_millis(50)),
547571
}),
548572
);
549573

550-
// We CreateOrUpdate each proc, and then fence on getting statuses back.
551-
// This is currently necessary because otherwise there is a race between
552-
// the procs being created, and subsequent messages becoming unroutable
553-
// (the agent actor manages the local muxer). We can solve this by allowing
554-
// buffering in the host-level muxer.
574+
// Create or update each proc, then fence on receiving status
575+
// overlays. This prevents a race where procs become
576+
// addressable before their local muxers are ready, which
577+
// could make early messages unroutable. A future improvement
578+
// would allow buffering in the host-level muxer to eliminate
579+
// the need for this synchronization step.
555580
for (host_rank, host) in self.ranks.iter().enumerate() {
556581
for per_host_rank in 0..per_host.num_ranks() {
557582
let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
@@ -585,25 +610,42 @@ impl HostMeshRef {
585610

586611
let start_time = RealClock.now();
587612

588-
match GetRankStatus::wait(rx, num_ranks, config::global::get(PROC_SPAWN_MAX_IDLE)).await {
613+
// Wait on accumulated StatusMesh snapshots until complete or
614+
// timeout.
615+
match GetRankStatus::wait(
616+
rx,
617+
num_ranks,
618+
config::global::get(PROC_SPAWN_MAX_IDLE),
619+
region.clone(), // fallback mesh if nothing arrives
620+
)
621+
.await
622+
{
589623
Ok(statuses) => {
590-
if let Some((rank, status)) = statuses.first_terminating() {
624+
// If any rank is terminating, surface a
625+
// ProcCreationError pointing at that rank.
626+
if let Some((rank, status)) = statuses
627+
.values()
628+
.enumerate()
629+
.find(|(_, s)| s.is_terminating())
630+
{
591631
let proc_name = Name::new(format!("{}-{}", name, rank % per_host.num_ranks()));
592632
return Err(v1::Error::ProcCreationError {
593633
proc_name,
594-
mesh_agent: self.ranks[rank].mesh_agent(),
634+
mesh_agent: self.ranks[rank / per_host.num_ranks()].mesh_agent(),
595635
host_rank: rank / per_host.num_ranks(),
596-
status: status.clone(),
636+
status,
597637
});
598638
}
599639
}
600640
Err(complete) => {
601-
// Fill the remaining statuses with a timeout error.
602-
let mut statuses =
603-
RankedValues::from((0..num_ranks, Status::Timeout(start_time.elapsed())));
604-
statuses.merge_from(complete);
605-
606-
return Err(v1::Error::ProcSpawnError { statuses });
641+
// Fill remaining ranks with a timeout status via the
642+
// legacy shim.
643+
let legacy = mesh_to_rankedvalues_with_default(
644+
&complete,
645+
Status::Timeout(start_time.elapsed()),
646+
num_ranks,
647+
);
648+
return Err(v1::Error::ProcSpawnError { statuses: legacy });
607649
}
608650
}
609651

hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,20 +147,22 @@ impl Handler<resource::GetRankStatus> for HostMeshAgent {
147147
cx: &Context<Self>,
148148
get_rank_status: resource::GetRankStatus,
149149
) -> anyhow::Result<()> {
150-
let Some(created) = self.created.get(&get_rank_status.name) else {
151-
// TODO: how can we get the host's rank here? we should model its absence explicitly.
152-
get_rank_status
153-
.reply
154-
.send(cx, (usize::MAX, resource::Status::NotExist).into())?;
155-
return Ok(());
156-
};
150+
use crate::resource::Status;
151+
use crate::v1::StatusOverlay;
157152

158-
let rank_status = match created {
159-
(rank, Ok(_)) => (*rank, resource::Status::Running),
160-
(rank, Err(e)) => (*rank, resource::Status::Failed(e.to_string())),
153+
let (rank, status) = match self.created.get(&get_rank_status.name) {
154+
Some((rank, Ok(_))) => (*rank, Status::Running),
155+
Some((rank, Err(e))) => (*rank, Status::Failed(e.to_string())),
156+
None => (usize::MAX, Status::NotExist),
161157
};
162-
get_rank_status.reply.send(cx, rank_status.into())?;
163158

159+
let overlay = if rank == usize::MAX {
160+
StatusOverlay::new()
161+
} else {
162+
StatusOverlay::try_from_runs(vec![(rank..(rank + 1), status)])
163+
.expect("valid single-run overlay")
164+
};
165+
get_rank_status.reply.send(cx, overlay)?;
164166
Ok(())
165167
}
166168
}

0 commit comments

Comments
 (0)