Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions hyperactor/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use crate::actor::Referable;
use crate::channel;
use crate::channel::ChannelAddr;
use crate::channel::ChannelError;
use crate::channel::ChannelRx;
use crate::channel::ChannelTransport;
use crate::channel::Rx;
use crate::channel::Tx;
Expand Down Expand Up @@ -125,17 +126,15 @@ pub struct Host<M> {
router: DialMailboxRouter,
manager: M,
service_proc: Proc,
frontend_rx: Option<ChannelRx<MessageEnvelope>>,
}

impl<M: ProcManager> Host<M> {
/// Serve a host using the provided ProcManager, on the provided `addr`.
/// On success, the host will multiplex messages for procs on the host
/// on the address of the host.
#[tracing::instrument(skip(manager))]
pub async fn serve(
manager: M,
addr: ChannelAddr,
) -> Result<(Self, MailboxServerHandle), HostError> {
pub async fn new(manager: M, addr: ChannelAddr) -> Result<Self, HostError> {
let (frontend_addr, frontend_rx) = channel::serve(addr)?;

// We set up a cascade of routers: first, the outer router supports
Expand All @@ -162,21 +161,22 @@ impl<M: ProcManager> Host<M> {
procs: HashMap::new(),
frontend_addr,
backend_addr,
router: router.clone(),
router,
manager,
service_proc: service_proc.clone(),
service_proc,
frontend_rx: Some(frontend_rx),
};

let router = ProcOrDial {
proc: service_proc,
router,
};
// We the same router on both frontend and backend addresses.
let _backend_handle = host.forwarder().serve(backend_rx);

// Serve the same router on both frontend and backend addresses.
let _backend_handle = router.clone().serve(backend_rx);
let frontend_handle = router.serve(frontend_rx);
Ok(host)
}

Ok((host, frontend_handle))
/// Start serving this host's mailbox on its frontend address.
/// Returns the server handle on first invocation; afterwards None.
pub fn serve(&mut self) -> Option<MailboxServerHandle> {
Some(self.forwarder().serve(self.frontend_rx.take()?))
}

/// The underlying proc manager.
Expand Down Expand Up @@ -253,14 +253,21 @@ impl<M: ProcManager> Host<M> {

Ok((proc_id, agent_ref))
}

fn forwarder(&self) -> ProcOrDial {
ProcOrDial {
proc: self.service_proc.clone(),
dialer: self.router.clone(),
}
}
}

/// A router used to route to the system proc, or else fall back to
/// the dial mailbox router.
#[derive(Debug, Clone)]
struct ProcOrDial {
proc: Proc,
router: DialMailboxRouter,
dialer: DialMailboxRouter,
}

impl MailboxSender for ProcOrDial {
Expand All @@ -272,7 +279,7 @@ impl MailboxSender for ProcOrDial {
if envelope.dest().actor_id().proc_id() == self.proc.proc_id() {
self.proc.post_unchecked(envelope, return_handle);
} else {
self.router.post_unchecked(envelope, return_handle)
self.dialer.post_unchecked(envelope, return_handle)
}
}
}
Expand Down Expand Up @@ -1219,10 +1226,9 @@ mod tests {
let proc_manager =
LocalProcManager::new(|proc: Proc| async move { proc.spawn::<()>("agent", ()).await });
let procs = Arc::clone(&proc_manager.procs);
let (mut host, _handle) =
Host::serve(proc_manager, ChannelAddr::any(ChannelTransport::Local))
.await
.unwrap();
let mut host = Host::new(proc_manager, ChannelAddr::any(ChannelTransport::Local))
.await
.unwrap();

let (proc_id1, _ref) = host.spawn("proc1".to_string(), ()).await.unwrap();
assert_eq!(
Expand Down Expand Up @@ -1288,10 +1294,13 @@ mod tests {
let process_manager = ProcessProcManager::<EchoActor>::new(
buck_resources::get("monarch/hyperactor/bootstrap").unwrap(),
);
let (mut host, _handle) =
Host::serve(process_manager, ChannelAddr::any(ChannelTransport::Unix))
.await
.unwrap();
let mut host = Host::new(process_manager, ChannelAddr::any(ChannelTransport::Unix))
.await
.unwrap();

// Manually serve this: the agent isn't actually doing anything in this case,
// but we are testing connectivity.
host.serve();

// (1) Spawn and check invariants.
assert!(matches!(host.addr().transport(), ChannelTransport::Unix));
Expand Down Expand Up @@ -1514,7 +1523,7 @@ mod tests {
Duration::from_millis(10),
);

let (mut host, _h) = Host::serve(
let mut host = Host::new(
TestManager::local(ReadyMode::OkAfter(Duration::from_millis(50))),
ChannelAddr::any(ChannelTransport::Local),
)
Expand All @@ -1533,7 +1542,7 @@ mod tests {
Duration::from_secs(0),
);

let (mut host, _h) = Host::serve(
let mut host = Host::new(
TestManager::local(ReadyMode::OkAfter(Duration::from_millis(20))),
ChannelAddr::any(ChannelTransport::Local),
)
Expand All @@ -1547,7 +1556,7 @@ mod tests {

#[tokio::test]
async fn host_spawn_maps_channel_closed_ready_error_to_config_failure() {
let (mut host, _h) = Host::serve(
let mut host = Host::new(
TestManager::local(ReadyMode::ErrChannelClosed),
ChannelAddr::any(ChannelTransport::Local),
)
Expand All @@ -1560,7 +1569,7 @@ mod tests {

#[tokio::test]
async fn host_spawn_maps_terminal_ready_error_to_config_failure() {
let (mut host, _h) = Host::serve(
let mut host = Host::new(
TestManager::local(ReadyMode::ErrTerminal),
ChannelAddr::any(ChannelTransport::Local),
)
Expand All @@ -1573,7 +1582,7 @@ mod tests {

#[tokio::test]
async fn host_spawn_fails_if_ready_but_missing_addr() {
let (mut host, _h) = Host::serve(
let mut host = Host::new(
TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(true, false),
ChannelAddr::any(ChannelTransport::Local),
)
Expand All @@ -1589,7 +1598,7 @@ mod tests {

#[tokio::test]
async fn host_spawn_fails_if_ready_but_missing_agent() {
let (mut host, _h) = Host::serve(
let mut host = Host::new(
TestManager::local(ReadyMode::OkAfter(Duration::ZERO)).with_omissions(false, true),
ChannelAddr::any(ChannelTransport::Local),
)
Expand Down
2 changes: 1 addition & 1 deletion hyperactor_mesh/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl Bootstrap {
};
let manager = BootstrapProcManager::new(command).unwrap();

let (host, _handle) = ok!(Host::serve(manager, addr).await);
let host = ok!(Host::new(manager, addr).await);
let addr = host.addr().clone();
let host_mesh_agent = ok!(host
.system_proc()
Expand Down
4 changes: 2 additions & 2 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl HostMesh {
let addr = hyperactor_config::global::get_cloned(DEFAULT_TRANSPORT).any();

let manager = BootstrapProcManager::new(bootstrap_cmd)?;
let (host, _handle) = Host::serve(manager, addr).await?;
let host = Host::new(manager, addr).await?;
let addr = host.addr().clone();
let host_mesh_agent = host
.system_proc()
Expand Down Expand Up @@ -1521,7 +1521,7 @@ mod tests {
#[cfg(fbcode_build)]
async fn test_halting_proc_allocation() {
let config = hyperactor_config::global::lock();
let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(5));
let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(10));

let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap");

Expand Down
23 changes: 15 additions & 8 deletions hyperactor_mesh/src/v1/host_mesh/mesh_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,28 @@ impl Actor for HostMeshAgent {
)
.unwrap();
eprintln!(
"Monarch internal logs are being written to {}/{}.log; execution id {}{}",
"Monarch internal logs are being written to {}/{}.log; execution id {}",
directory,
file,
hyperactor_telemetry::env::execution_id(),
hyperactor_telemetry::env::execution_url()
.await
.unwrap_or_else(|e| Some(format!(": <error generating URL: {}>", e)))
.map_or_else(|| "".to_string(), |url| format!(": {}", url))
);
}
Ok(Self {
host: Some(host),
created: HashMap::new(),
})
}

async fn init(&mut self, this: &Instance<Self>) -> Result<(), anyhow::Error> {
// Serve the host now that the agent is initialized. Make sure our port is
// bound before serving.
this.bind::<Self>();
match self.host.as_mut().unwrap() {
HostAgentMode::Process(host) => host.serve(),
HostAgentMode::Local(host) => host.serve(),
};
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -455,7 +462,7 @@ impl Actor for HostMeshAgentProcMeshTrampoline {
let host = if local {
let spawn: ProcManagerSpawnFn = Box::new(|proc| Box::pin(ProcMeshAgent::boot_v1(proc)));
let manager = LocalProcManager::new(spawn);
let (host, _) = Host::serve(manager, transport.any()).await?;
let host = Host::new(manager, transport.any()).await?;
HostAgentMode::Local(host)
} else {
let command = match command {
Expand All @@ -464,7 +471,7 @@ impl Actor for HostMeshAgentProcMeshTrampoline {
};
tracing::info!("booting host with proc command {:?}", command);
let manager = BootstrapProcManager::new(command).unwrap();
let (host, _) = Host::serve(manager, transport.any()).await?;
let host = Host::new(manager, transport.any()).await?;
HostAgentMode::Process(host)
};

Expand Down Expand Up @@ -521,7 +528,7 @@ mod tests {
#[tokio::test]
#[cfg(fbcode_build)]
async fn test_basic() {
let (host, _handle) = Host::serve(
let host = Host::new(
BootstrapProcManager::new(BootstrapCommand::test()).unwrap(),
ChannelTransport::Unix.any(),
)
Expand Down
Loading