From 063fb7a6db4acb4f0720f68300677833ef948196 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Fri, 26 Aug 2022 10:28:28 -0400 Subject: [PATCH 1/6] Refine MGS -> SP serial console messages 1. Explicit attach/detach messages 2. Include offsets in both send and recv These allow us to address a couple bugs/TODOs in the SP implementation: * We no longer duplicate data on the uart if we receive a resent packet * We can now accept a partial packet if we only have room to buffer a portion (instead of requiring MGS to wait and periodically resend all the data until we have room to accept it all) --- gateway-messages/src/lib.rs | 23 +++- gateway-messages/src/sp_impl.rs | 38 ++++-- gateway-sp-comms/src/communicator.rs | 46 ++++++- gateway-sp-comms/src/error.rs | 12 +- gateway-sp-comms/src/single_sp.rs | 171 ++++++++++++++++++++------- gateway/faux-mgs/src/main.rs | 7 ++ gateway/faux-mgs/src/usart.rs | 12 +- 7 files changed, 235 insertions(+), 74 deletions(-) diff --git a/gateway-messages/src/lib.rs b/gateway-messages/src/lib.rs index f2d2445b973..f4758fe722c 100644 --- a/gateway-messages/src/lib.rs +++ b/gateway-messages/src/lib.rs @@ -51,8 +51,14 @@ pub enum RequestKind { command: IgnitionCommand, }, SpState, + SerialConsoleAttach(SpComponent), /// `SerialConsoleWrite` always includes trailing raw data. - SerialConsoleWrite(SpComponent), + SerialConsoleWrite { + /// Offset of the first byte of this packet, starting from 0 when this + /// serial console session was attached. + offset: u64, + }, + SerialConsoleDetach, UpdateStart(UpdateStart), /// `UpdateChunk` always includes trailing raw data. UpdateChunk(UpdateChunk), @@ -87,7 +93,9 @@ pub enum ResponseKind { SpState(SpState), UpdateStartAck, UpdateChunkAck, - SerialConsoleWriteAck, + SerialConsoleAttachAck, + SerialConsoleWriteAck { furthest_ingested_offset: u64 }, + SerialConsoleDetachAck, SysResetPrepareAck, // There is intentionally no `SysResetTriggerAck` response; the expected // "resposne" to `SysResetTrigger` is an SP reset, which won't allow for @@ -125,6 +133,11 @@ pub enum ResponseError { RequestUnsupportedForComponent, /// The specified ignition target does not exist. IgnitionTargetDoesNotExist(u8), + /// Cannot write to the serial console because it is not attached. + SerialConsoleNotAttached, + /// Cannot attach to the serial console because another MGS instance is + /// already attached. + SerialConsoleAlreadyAttached, /// An update is already in progress with the specified amount of data /// already provided. MGS should resume the update at that offset. UpdateInProgress { bytes_received: u32 }, @@ -154,6 +167,12 @@ impl fmt::Display for ResponseError { ResponseError::IgnitionTargetDoesNotExist(target) => { write!(f, "nonexistent ignition target {}", target) } + ResponseError::SerialConsoleNotAttached => { + write!(f, "serial console is not attached") + } + ResponseError::SerialConsoleAlreadyAttached => { + write!(f, "serial console already attached") + } ResponseError::UpdateInProgress { bytes_received } => { write!(f, "update still in progress ({bytes_received} bytes received so far)") } diff --git a/gateway-messages/src/sp_impl.rs b/gateway-messages/src/sp_impl.rs index c113ecf54bb..e2ed12e45ea 100644 --- a/gateway-messages/src/sp_impl.rs +++ b/gateway-messages/src/sp_impl.rs @@ -82,15 +82,28 @@ pub trait SpHandler { data: &[u8], ) -> Result<(), ResponseError>; - // TODO Should we return "number of bytes written" here, or is it sufficient - // to say "all or none"? Would be nice for the caller to not have to resend - // UDP chunks; can SP ensure it writes all data locally? - fn serial_console_write( + fn serial_console_attach( &mut self, sender: SocketAddrV6, port: SpPort, component: SpComponent, + ) -> Result<(), ResponseError>; + + /// The returned u64 should be the offset we want to receive in the next + /// call to `serial_console_write()`; i.e., the furthest offset we've + /// ingested (either by writing to the console or by buffering to write it). + fn serial_console_write( + &mut self, + sender: SocketAddrV6, + port: SpPort, + offset: u64, data: &[u8], + ) -> Result; + + fn serial_console_detach( + &mut self, + sender: SocketAddrV6, + port: SpPort, ) -> Result<(), ResponseError>; fn reset_prepare( @@ -169,7 +182,8 @@ pub fn handle_message( // Do we expect any trailing raw data? Only for specific kinds of messages; // if we get any for other messages, bail out. let trailing_data = match &request.kind { - RequestKind::UpdateChunk(_) | RequestKind::SerialConsoleWrite(_) => { + RequestKind::UpdateChunk(_) + | RequestKind::SerialConsoleWrite { .. } => { unpack_trailing_data(leftover)? } _ => { @@ -203,9 +217,17 @@ pub fn handle_message( RequestKind::UpdateChunk(chunk) => handler .update_chunk(sender, port, chunk, trailing_data) .map(|()| ResponseKind::UpdateChunkAck), - RequestKind::SerialConsoleWrite(packet) => handler - .serial_console_write(sender, port, packet, trailing_data) - .map(|()| ResponseKind::SerialConsoleWriteAck), + RequestKind::SerialConsoleAttach(component) => handler + .serial_console_attach(sender, port, component) + .map(|()| ResponseKind::SerialConsoleAttachAck), + RequestKind::SerialConsoleWrite { offset } => handler + .serial_console_write(sender, port, offset, trailing_data) + .map(|n| ResponseKind::SerialConsoleWriteAck { + furthest_ingested_offset: n, + }), + RequestKind::SerialConsoleDetach => handler + .serial_console_detach(sender, port) + .map(|()| ResponseKind::SerialConsoleDetachAck), RequestKind::SysResetPrepare => handler .reset_prepare(sender, port) .map(|()| ResponseKind::SysResetPrepareAck), diff --git a/gateway-sp-comms/src/communicator.rs b/gateway-sp-comms/src/communicator.rs index 94efe7e0522..e39815abebe 100644 --- a/gateway-sp-comms/src/communicator.rs +++ b/gateway-sp-comms/src/communicator.rs @@ -198,7 +198,7 @@ impl Communicator { ) -> Result<(), Error> { let port = self.id_to_port(sp)?; let sp = self.switch.sp(port).ok_or(Error::SpAddressUnknown(sp))?; - sp.serial_console_detach().await; + sp.serial_console_detach().await?; Ok(()) } @@ -291,7 +291,11 @@ pub(crate) trait ResponseKindExt { fn expect_sp_state(self) -> Result; - fn expect_serial_console_write_ack(self) -> Result<(), BadResponseType>; + fn expect_serial_console_attach_ack(self) -> Result<(), BadResponseType>; + + fn expect_serial_console_write_ack(self) -> Result; + + fn expect_serial_console_detach_ack(self) -> Result<(), BadResponseType>; fn expect_update_start_ack(self) -> Result<(), BadResponseType>; @@ -314,9 +318,15 @@ impl ResponseKindExt for ResponseKind { response_kind_names::IGNITION_COMMAND_ACK } ResponseKind::SpState(_) => response_kind_names::SP_STATE, - ResponseKind::SerialConsoleWriteAck => { + ResponseKind::SerialConsoleAttachAck => { + response_kind_names::SERIAL_CONSOLE_ATTACH_ACK + } + ResponseKind::SerialConsoleWriteAck { .. } => { response_kind_names::SERIAL_CONSOLE_WRITE_ACK } + ResponseKind::SerialConsoleDetachAck => { + response_kind_names::SERIAL_CONSOLE_DETACH_ACK + } ResponseKind::UpdateStartAck => { response_kind_names::UPDATE_START_ACK } @@ -381,9 +391,31 @@ impl ResponseKindExt for ResponseKind { } } - fn expect_serial_console_write_ack(self) -> Result<(), BadResponseType> { + fn expect_serial_console_attach_ack(self) -> Result<(), BadResponseType> { + match self { + ResponseKind::SerialConsoleAttachAck => Ok(()), + other => Err(BadResponseType { + expected: response_kind_names::SP_STATE, + got: other.name(), + }), + } + } + + fn expect_serial_console_write_ack(self) -> Result { + match self { + ResponseKind::SerialConsoleWriteAck { + furthest_ingested_offset, + } => Ok(furthest_ingested_offset), + other => Err(BadResponseType { + expected: response_kind_names::SP_STATE, + got: other.name(), + }), + } + } + + fn expect_serial_console_detach_ack(self) -> Result<(), BadResponseType> { match self { - ResponseKind::SerialConsoleWriteAck => Ok(()), + ResponseKind::SerialConsoleDetachAck => Ok(()), other => Err(BadResponseType { expected: response_kind_names::SP_STATE, got: other.name(), @@ -428,8 +460,12 @@ mod response_kind_names { pub(super) const BULK_IGNITION_STATE: &str = "bulk_ignition_state"; pub(super) const IGNITION_COMMAND_ACK: &str = "ignition_command_ack"; pub(super) const SP_STATE: &str = "sp_state"; + pub(super) const SERIAL_CONSOLE_ATTACH_ACK: &str = + "serial_console_attach_ack"; pub(super) const SERIAL_CONSOLE_WRITE_ACK: &str = "serial_console_write_ack"; + pub(super) const SERIAL_CONSOLE_DETACH_ACK: &str = + "serial_console_detach_ack"; pub(super) const UPDATE_START_ACK: &str = "update_start_ack"; pub(super) const UPDATE_CHUNK_ACK: &str = "update_chunk_ack"; pub(super) const SYS_RESET_PREPARE_ACK: &str = "sys_reset_prepare_ack"; diff --git a/gateway-sp-comms/src/error.rs b/gateway-sp-comms/src/error.rs index 3e14228640f..ba31d6550a9 100644 --- a/gateway-sp-comms/src/error.rs +++ b/gateway-sp-comms/src/error.rs @@ -25,6 +25,8 @@ pub enum SpCommunicationError { BadResponseType(#[from] BadResponseType), #[error("Error response from SP: {0}")] SpError(#[from] ResponseError), + #[error("Bogus serial console state; detach and reattach")] + BogusSerialConsoleState, } #[derive(Debug, Error)] @@ -37,10 +39,6 @@ pub enum UpdateError { Chunk { offset: u32, err: SpCommunicationError }, } -#[derive(Debug, Error)] -#[error("serial console already attached")] -pub struct SerialConsoleAlreadyAttached; - #[derive(Debug, Error)] pub enum StartupError { #[error("error binding to UDP address {addr}: {err}")] @@ -79,12 +77,6 @@ pub enum Error { SerialConsoleAttached, } -impl From for Error { - fn from(_: SerialConsoleAlreadyAttached) -> Self { - Self::SerialConsoleAttached - } -} - #[derive(Debug, Error)] #[error("bogus SP response type: expected {expected:?} but got {got:?}")] pub struct BadResponseType { diff --git a/gateway-sp-comms/src/single_sp.rs b/gateway-sp-comms/src/single_sp.rs index a512a581c55..fd441d25332 100644 --- a/gateway-sp-comms/src/single_sp.rs +++ b/gateway-sp-comms/src/single_sp.rs @@ -8,7 +8,6 @@ use crate::communicator::ResponseKindExt; use crate::error::BadResponseType; -use crate::error::SerialConsoleAlreadyAttached; use crate::error::SpCommunicationError; use crate::error::UpdateError; use gateway_messages::sp_impl; @@ -284,31 +283,37 @@ impl SingleSp { pub async fn serial_console_attach( &self, component: SpComponent, - ) -> Result { + ) -> Result { let (tx, rx) = oneshot::channel(); // `Inner::run()` doesn't exit until we are dropped, so unwrapping here // only panics if it itself panicked. - self.cmds_tx.send(InnerCommand::SerialConsoleAttach(tx)).await.unwrap(); + self.cmds_tx + .send(InnerCommand::SerialConsoleAttach(component, tx)) + .await + .unwrap(); let attachment = rx.await.unwrap()?; Ok(AttachedSerialConsole { key: attachment.key, rx: attachment.incoming, - component, inner_tx: self.cmds_tx.clone(), }) } /// Detach any existing attached serial console connection. - pub async fn serial_console_detach(&self) { + pub async fn serial_console_detach(&self) -> Result<()> { + let (tx, rx) = oneshot::channel(); + // `Inner::run()` doesn't exit until we are dropped, so unwrapping here // only panics if it itself panicked. self.cmds_tx - .send(InnerCommand::SerialConsoleDetach(None)) + .send(InnerCommand::SerialConsoleDetach(None, tx)) .await .unwrap(); + + rx.await.unwrap() } pub(crate) async fn rpc( @@ -364,7 +369,6 @@ async fn rpc( #[derive(Debug)] pub struct AttachedSerialConsole { key: u64, - component: SpComponent, rx: mpsc::Receiver>, inner_tx: mpsc::Sender, } @@ -376,7 +380,7 @@ impl AttachedSerialConsole { ( AttachedSerialConsoleSend { key: self.key, - component: self.component, + tx_offset: 0, inner_tx: self.inner_tx, }, AttachedSerialConsoleRecv { rx: self.rx }, @@ -387,37 +391,66 @@ impl AttachedSerialConsole { #[derive(Debug)] pub struct AttachedSerialConsoleSend { key: u64, + tx_offset: u64, inner_tx: mpsc::Sender, - component: SpComponent, } impl AttachedSerialConsoleSend { /// Write `data` to the serial console of the SP. - pub async fn write(&self, data: Vec) -> Result<()> { + pub async fn write(&mut self, data: Vec) -> Result<()> { let mut data = Cursor::new(data); - while !CursorExt::is_empty(&data) { + let mut remaining_data = CursorExt::remaining_slice(&data).len(); + while remaining_data > 0 { let (result, new_data) = rpc_with_trailing_data( &self.inner_tx, - RequestKind::SerialConsoleWrite(self.component), + RequestKind::SerialConsoleWrite { offset: self.tx_offset }, data, ) .await; - result.and_then(|(_peer, response)| { + let data_sent = (remaining_data + - CursorExt::remaining_slice(&new_data).len()) + as u64; + + let n = result.and_then(|(_peer, response)| { response.expect_serial_console_write_ack().map_err(Into::into) })?; + // Confirm the ack we got back makes sense; its `n` should be in the + // range `[self.tx_offset..self.tx_offset + data_sent]`. + if n < self.tx_offset { + return Err(SpCommunicationError::BogusSerialConsoleState); + } + let bytes_accepted = n - self.tx_offset; + if bytes_accepted > data_sent { + return Err(SpCommunicationError::BogusSerialConsoleState); + } + data = new_data; + + // If the SP only accepted part of the data we sent, we need to + // rewind our cursor and resend what it couldn't accept. + if bytes_accepted < data_sent { + let rewind = data_sent - bytes_accepted; + data.seek(SeekFrom::Current(-(rewind as i64))).unwrap(); + } + + self.tx_offset += bytes_accepted; + remaining_data = CursorExt::remaining_slice(&data).len(); } Ok(()) } /// Detach this serial console connection. - pub async fn detach(&self) { + pub async fn detach(&self) -> Result<()> { + let (tx, rx) = oneshot::channel(); + self.inner_tx - .send(InnerCommand::SerialConsoleDetach(Some(self.key))) + .send(InnerCommand::SerialConsoleDetach(Some(self.key), tx)) .await .unwrap(); + + rx.await.unwrap() } } @@ -473,16 +506,15 @@ struct SerialConsoleAttachment { enum InnerCommand { Rpc(RpcRequest), SerialConsoleAttach( - oneshot::Sender< - Result, - >, + SpComponent, + oneshot::Sender>, ), // The associated value is the connection key; if `Some(_)`, only detach if // the currently-attached key number matches. If `None`, detach any current // connection. These correspond to "detach the current session" (performed // automatically when a connection is closed) and "force-detach any session" // (performed by a user). - SerialConsoleDetach(Option), + SerialConsoleDetach(Option, oneshot::Sender>), } struct Inner { @@ -626,17 +658,6 @@ impl Inner { command: InnerCommand, incoming_buf: &mut [u8; gateway_messages::MAX_SERIALIZED_SIZE], ) { - // When a caller attaches to the SP's serial console, we return an - // `mpsc::Receiver<_>` on which we send any packets received from the - // SP. We have to pick a depth for that channel, and given we're not - // able to apply backpressure to the SP / host sending the data, we - // choose to drop data if the channel fills. We want something large - // enough that hiccups in the receiver doesn't cause data loss, but - // small enough that if the receiver stops handling messages we don't - // eat a bunch of memory buffering up console data. We'll take a WAG and - // pick a depth of 32 for now. - const SERIAL_CONSOLE_CHANNEL_DEPTH: usize = 32; - match command { InnerCommand::Rpc(mut rpc) => { let result = self @@ -657,26 +678,21 @@ impl Inner { ); } } - InnerCommand::SerialConsoleAttach(response_tx) => { - let resp = if self.serial_console_tx.is_some() { - Err(SerialConsoleAlreadyAttached) - } else { - let (tx, rx) = mpsc::channel(SERIAL_CONSOLE_CHANNEL_DEPTH); - self.serial_console_tx = Some(tx); - self.serial_console_connection_key += 1; - Ok(SerialConsoleAttachment { - key: self.serial_console_connection_key, - incoming: rx, - }) - }; + InnerCommand::SerialConsoleAttach(component, response_tx) => { + let resp = self + .attach_serial_console(sp_addr, component, incoming_buf) + .await; response_tx.send(resp).unwrap(); } - InnerCommand::SerialConsoleDetach(key) => { - if key.is_none() + InnerCommand::SerialConsoleDetach(key, response_tx) => { + let resp = if key.is_none() || key == Some(self.serial_console_connection_key) { - self.serial_console_tx = None; - } + self.detach_serial_console(sp_addr, incoming_buf).await + } else { + Ok(()) + }; + response_tx.send(resp).unwrap(); } } } @@ -883,6 +899,69 @@ impl Inner { } warn!(self.log, "discarding SP serial console data (no receiver)"); } + + async fn attach_serial_console( + &mut self, + sp_addr: SocketAddrV6, + component: SpComponent, + incoming_buf: &mut [u8; gateway_messages::MAX_SERIALIZED_SIZE], + ) -> Result { + // When a caller attaches to the SP's serial console, we return an + // `mpsc::Receiver<_>` on which we send any packets received from the + // SP. We have to pick a depth for that channel, and given we're not + // able to apply backpressure to the SP / host sending the data, we + // choose to drop data if the channel fills. We want something large + // enough that hiccups in the receiver doesn't cause data loss, but + // small enough that if the receiver stops handling messages we don't + // eat a bunch of memory buffering up console data. We'll take a WAG and + // pick a depth of 32 for now. + const SERIAL_CONSOLE_CHANNEL_DEPTH: usize = 32; + + if self.serial_console_tx.is_some() { + // Returning an `SpError` here is a little suspect since we didn't + // actually talk to an SP, but we already know we're attached to it. + // If we asked it to attach again, it would send back this error. + return Err(SpCommunicationError::SpError( + ResponseError::SerialConsoleAlreadyAttached, + )); + } + + let (_peer, response) = self + .rpc_call( + sp_addr, + RequestKind::SerialConsoleAttach(component), + None, + incoming_buf, + ) + .await?; + response.expect_serial_console_attach_ack()?; + + let (tx, rx) = mpsc::channel(SERIAL_CONSOLE_CHANNEL_DEPTH); + self.serial_console_tx = Some(tx); + self.serial_console_connection_key += 1; + Ok(SerialConsoleAttachment { + key: self.serial_console_connection_key, + incoming: rx, + }) + } + + async fn detach_serial_console( + &mut self, + sp_addr: SocketAddrV6, + incoming_buf: &mut [u8; gateway_messages::MAX_SERIALIZED_SIZE], + ) -> Result<()> { + let (_peer, response) = self + .rpc_call( + sp_addr, + RequestKind::SerialConsoleDetach, + None, + incoming_buf, + ) + .await?; + response.expect_serial_console_detach_ack()?; + self.serial_console_tx = None; + Ok(()) + } } async fn send( diff --git a/gateway/faux-mgs/src/main.rs b/gateway/faux-mgs/src/main.rs index 568e3aeff27..d2c9dec2d59 100644 --- a/gateway/faux-mgs/src/main.rs +++ b/gateway/faux-mgs/src/main.rs @@ -104,6 +104,9 @@ enum SpCommand { stdin_buffer_time_millis: u64, }, + /// Detach any other attached USART connection. + UsartDetach, + /// Upload a new image to the SP and have it swap banks (requires reset) Update { hubris_archive: PathBuf }, @@ -189,6 +192,10 @@ async fn main() -> Result<()> { ) .await?; } + Some(SpCommand::UsartDetach) => { + sp.serial_console_detach().await?; + info!(log, "SP serial console detached"); + } Some(SpCommand::Update { hubris_archive }) => { let mut archive = HubrisArchive::open(&hubris_archive)?; let data = archive.final_bin()?; diff --git a/gateway/faux-mgs/src/usart.rs b/gateway/faux-mgs/src/usart.rs index f537d8cdd86..94084caf652 100644 --- a/gateway/faux-mgs/src/usart.rs +++ b/gateway/faux-mgs/src/usart.rs @@ -44,7 +44,7 @@ pub(crate) async fn run( let (console_tx, mut console_rx) = console.split(); let (send_tx, send_rx) = mpsc::channel(8); - tokio::spawn(async move { + let tx_to_sp_handle = tokio::spawn(async move { relay_data_to_sp(console_tx, send_rx).await.unwrap(); }); @@ -53,6 +53,8 @@ pub(crate) async fn run( result = stdin.read_buf(&mut stdin_buf) => { let n = result.with_context(|| "failed to read from stdin")?; if n == 0 { + mem::drop(send_tx); + tx_to_sp_handle.await.unwrap(); return Ok(()); } @@ -83,16 +85,20 @@ pub(crate) async fn run( } async fn relay_data_to_sp( - console_tx: AttachedSerialConsoleSend, + mut console_tx: AttachedSerialConsoleSend, mut data_rx: mpsc::Receiver>, ) -> Result<()> { loop { let data = match data_rx.recv().await { Some(data) => data, - None => return Ok(()), + None => break, }; console_tx.write(data).await?; } + + console_tx.detach().await?; + + Ok(()) } struct UnrawTermiosGuard { From a18fe0bcac0e12e0701196cd4d5c76bcf46f3bf2 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 29 Aug 2022 10:00:26 -0400 Subject: [PATCH 2/6] Refine SP -> MGS serial console messages (Re)add an offset to detect lost data in between packets. --- gateway-messages/src/lib.rs | 12 +++++++- gateway-sp-comms/src/single_sp.rs | 49 +++++++++++++++++++++++-------- 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/gateway-messages/src/lib.rs b/gateway-messages/src/lib.rs index f4758fe722c..3369124ea80 100644 --- a/gateway-messages/src/lib.rs +++ b/gateway-messages/src/lib.rs @@ -209,7 +209,17 @@ pub enum SpMessageKind { /// Data traveling from an SP-attached component (in practice, a CPU) on the /// component's serial console. - SerialConsole(SpComponent), + /// + /// Note that SP -> MGS serial console messages are currently _not_ + /// acknowledged or retried; they are purely "fire and forget" from the SP's + /// point of view. Once it sends data in a packet, it discards it from its + /// local buffer. + SerialConsole { + component: SpComponent, + /// Offset of the first byte in this packet's data starting from 0 when + /// the serial console session was attached. + offset: u64, + }, } #[derive( diff --git a/gateway-sp-comms/src/single_sp.rs b/gateway-sp-comms/src/single_sp.rs index fd441d25332..02a1fe3fb09 100644 --- a/gateway-sp-comms/src/single_sp.rs +++ b/gateway-sp-comms/src/single_sp.rs @@ -299,6 +299,7 @@ impl SingleSp { key: attachment.key, rx: attachment.incoming, inner_tx: self.cmds_tx.clone(), + log: self.log.clone(), }) } @@ -369,8 +370,9 @@ async fn rpc( #[derive(Debug)] pub struct AttachedSerialConsole { key: u64, - rx: mpsc::Receiver>, + rx: mpsc::Receiver<(u64, Vec)>, inner_tx: mpsc::Sender, + log: Logger, } impl AttachedSerialConsole { @@ -383,7 +385,11 @@ impl AttachedSerialConsole { tx_offset: 0, inner_tx: self.inner_tx, }, - AttachedSerialConsoleRecv { rx: self.rx }, + AttachedSerialConsoleRecv { + rx_offset: 0, + rx: self.rx, + log: self.log, + }, ) } } @@ -456,7 +462,9 @@ impl AttachedSerialConsoleSend { #[derive(Debug)] pub struct AttachedSerialConsoleRecv { - rx: mpsc::Receiver>, + rx_offset: u64, + rx: mpsc::Receiver<(u64, Vec)>, + log: Logger, } impl AttachedSerialConsoleRecv { @@ -465,7 +473,15 @@ impl AttachedSerialConsoleRecv { /// Returns `None` if the underlying channel has been closed (e.g., if the /// serial console has been detached). pub async fn recv(&mut self) -> Option> { - self.rx.recv().await + let (offset, data) = self.rx.recv().await?; + if offset != self.rx_offset { + warn!( + self.log, + "gap in serial console data (dropped packet or buffer overrun)", + ); + } + self.rx_offset = offset + data.len() as u64; + Some(data) } } @@ -496,7 +512,7 @@ struct RpcResponse { #[derive(Debug)] struct SerialConsoleAttachment { key: u64, - incoming: mpsc::Receiver>, + incoming: mpsc::Receiver<(u64, Vec)>, } #[derive(Debug)] @@ -524,7 +540,7 @@ struct Inner { discovery_addr: SocketAddrV6, max_attempts: usize, per_attempt_timeout: Duration, - serial_console_tx: Option>>, + serial_console_tx: Option)>>, cmds_rx: mpsc::Receiver, request_id: u32, serial_console_connection_key: u64, @@ -739,8 +755,8 @@ impl Inner { "result" => ?result, ); } - SpMessageKind::SerialConsole(component) => { - self.forward_serial_console(component, trailing_data); + SpMessageKind::SerialConsole { component, offset } => { + self.forward_serial_console(component, offset, trailing_data); } } } @@ -858,8 +874,12 @@ impl Inner { return Ok(None); } } - SpMessageKind::SerialConsole(serial_console) => { - self.forward_serial_console(serial_console, trailing_data); + SpMessageKind::SerialConsole { component, offset } => { + self.forward_serial_console( + component, + offset, + trailing_data, + ); continue; } }; @@ -876,14 +896,19 @@ impl Inner { } } - fn forward_serial_console(&mut self, _component: SpComponent, data: &[u8]) { + fn forward_serial_console( + &mut self, + _component: SpComponent, + offset: u64, + data: &[u8], + ) { // TODO-cleanup component support for serial console is half baked; // should we check here that it matches the attached serial console? For // the foreseeable future we only support one component, so we skip that // for now. if let Some(tx) = self.serial_console_tx.as_ref() { - match tx.try_send(data.to_vec()) { + match tx.try_send((offset, data.to_vec())) { Ok(()) => return, Err(mpsc::error::TrySendError::Closed(_)) => { self.serial_console_tx = None; From 879fedae4594459d32ada94da7036a5efa0c98b5 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 29 Aug 2022 10:44:39 -0400 Subject: [PATCH 3/6] Update simulated SPs for serial console changes --- gateway/faux-mgs/src/usart.rs | 7 +- gateway/src/serial_console.rs | 11 ++- sp-sim/src/gimlet.rs | 154 ++++++++++++++++++++-------------- sp-sim/src/sidecar.rs | 33 +++++++- 4 files changed, 134 insertions(+), 71 deletions(-) diff --git a/gateway/faux-mgs/src/usart.rs b/gateway/faux-mgs/src/usart.rs index 94084caf652..aa867c63c75 100644 --- a/gateway/faux-mgs/src/usart.rs +++ b/gateway/faux-mgs/src/usart.rs @@ -88,14 +88,9 @@ async fn relay_data_to_sp( mut console_tx: AttachedSerialConsoleSend, mut data_rx: mpsc::Receiver>, ) -> Result<()> { - loop { - let data = match data_rx.recv().await { - Some(data) => data, - None => break, - }; + while let Some(data) = data_rx.recv().await { console_tx.write(data).await?; } - console_tx.detach().await?; Ok(()) diff --git a/gateway/src/serial_console.rs b/gateway/src/serial_console.rs index 9d39e9bc1c7..d7aa41434d3 100644 --- a/gateway/src/serial_console.rs +++ b/gateway/src/serial_console.rs @@ -24,6 +24,7 @@ use slog::info; use slog::Logger; use std::borrow::Cow; use std::ops::Deref; +use std::ops::DerefMut; use tokio::sync::mpsc; use tokio_tungstenite::tungstenite::handshake; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; @@ -222,7 +223,7 @@ impl SerialConsoleTask { async fn ws_recv_task( mut ws_stream: SplitStream>, - console_tx: DetachOnDrop, + mut console_tx: DetachOnDrop, log: Logger, ) -> Result<(), SerialTaskError> { while let Some(message) = ws_stream.next().await { @@ -281,3 +282,11 @@ impl Deref for DetachOnDrop { self.0.as_ref().unwrap() } } + +impl DerefMut for DetachOnDrop { + fn deref_mut(&mut self) -> &mut Self::Target { + // We know from `new()` that we're created with `Some(console)`, and we + // don't remove it until our `Drop` impl + self.0.as_mut().unwrap() + } +} diff --git a/sp-sim/src/gimlet.rs b/sp-sim/src/gimlet.rs index e32e3454013..17508868671 100644 --- a/sp-sim/src/gimlet.rs +++ b/sp-sim/src/gimlet.rs @@ -94,16 +94,7 @@ impl Gimlet { pub async fn spawn(gimlet: &GimletConfig, log: Logger) -> Result { info!(log, "setting up simualted gimlet"); - // We want to be able to start without knowing the gateways' socket - // addresses, but we're spawning both the primary UDP task (which - // receives messages from the gateway) and a helper TCP task (which - // emulates a serial console and sends messages to the gateway - // unprompted). We'll share a locked `Option` between the - // tasks, and have the UDP task populate it. If the TCP task receives - // data but doesn't know either of the gateways addresses, it will just - // discard it. - let gateway_addresses: Arc; 2]>> = - Arc::default(); + let attached_mgs = Arc::new(Mutex::new(None)); let mut incoming_console_tx = HashMap::new(); let mut serial_console_addrs = HashMap::new(); @@ -173,7 +164,7 @@ impl Gimlet { Arc::clone(servers[0].socket()), Arc::clone(servers[1].socket()), ], - Arc::clone(&gateway_addresses), + Arc::clone(&attached_mgs), log.new(slog::o!("serial-console" => name.to_string())), ); inner_tasks.push(task::spawn(async move { @@ -185,7 +176,7 @@ impl Gimlet { [servers[0].local_addr(), servers[1].local_addr()]; let inner = UdpTask::new( servers, - gateway_addresses, + attached_mgs, gimlet.common.serial_number, incoming_console_tx, commands_rx, @@ -221,7 +212,8 @@ struct SerialConsoleTcpTask { listener: TcpListener, incoming_serial_console: UnboundedReceiver>, socks: [Arc; 2], - gateway_addresses: Arc; 2]>>, + attached_mgs: Arc>>, + serial_console_tx_offset: u64, component: SpComponent, log: Logger, } @@ -232,53 +224,61 @@ impl SerialConsoleTcpTask { listener: TcpListener, incoming_serial_console: UnboundedReceiver>, socks: [Arc; 2], - gateway_addresses: Arc; 2]>>, + attached_mgs: Arc>>, log: Logger, ) -> Self { Self { listener, incoming_serial_console, socks, - gateway_addresses, + attached_mgs, + serial_console_tx_offset: 0, component, log, } } async fn send_serial_console(&mut self, data: &[u8]) -> Result<()> { - let gateway_addrs = *self.gateway_addresses.lock().unwrap(); - for (i, (sock, &gateway_addr)) in - self.socks.iter().zip(&gateway_addrs).enumerate() - { - let gateway_addr = match gateway_addr { - Some(addr) => addr, + let (component, sp_port, mgs_addr) = + match *self.attached_mgs.lock().unwrap() { + Some((component, sp_port, mgs_addr)) => { + (component, sp_port, mgs_addr) + } None => { info!( self.log, - concat!( - "MGS address on port {} not known - ", - "not sending it serial console data", - ), - i, + "No attached MGS; discarding serial console data" ); - continue; + return Ok(()); } }; - let mut out = [0; gateway_messages::MAX_SERIALIZED_SIZE]; - let mut remaining = data; - while !remaining.is_empty() { - let message = SpMessage { - version: version::V1, - kind: SpMessageKind::SerialConsole(self.component), - }; - let (n, written) = - gateway_messages::serialize_with_trailing_data( - &mut out, &message, remaining, - ); - sock.send_to(&out[..n], gateway_addr).await?; - remaining = &remaining[written..]; - } + if component != self.component { + info!(self.log, "MGS is attached to a different component; discarding serial console data"); + return Ok(()); + } + + let sock = match sp_port { + SpPort::One => &self.socks[0], + SpPort::Two => &self.socks[1], + }; + + let mut out = [0; gateway_messages::MAX_SERIALIZED_SIZE]; + let mut remaining = data; + while !remaining.is_empty() { + let message = SpMessage { + version: version::V1, + kind: SpMessageKind::SerialConsole { + component: self.component, + offset: self.serial_console_tx_offset, + }, + }; + let (n, written) = gateway_messages::serialize_with_trailing_data( + &mut out, &message, remaining, + ); + sock.send_to(&out[..n], mgs_addr).await?; + remaining = &remaining[written..]; + self.serial_console_tx_offset += written as u64; } Ok(()) @@ -381,7 +381,7 @@ struct UdpTask { impl UdpTask { fn new( servers: [UdpServer; 2], - gateway_addresses: Arc; 2]>>, + attached_mgs: Arc>>, serial_number: SerialNumber, incoming_serial_console: HashMap>>, commands: mpsc::UnboundedReceiver<( @@ -396,7 +396,7 @@ impl UdpTask { udp1, handler: Handler { log, - gateway_addresses, + attached_mgs, serial_number, incoming_serial_console, }, @@ -456,20 +456,10 @@ impl UdpTask { struct Handler { log: Logger, serial_number: SerialNumber, - gateway_addresses: Arc; 2]>>, + attached_mgs: Arc>>, incoming_serial_console: HashMap>>, } -impl Handler { - fn update_gateway_address(&self, addr: SocketAddrV6, port: SpPort) { - let i = match port { - SpPort::One => 0, - SpPort::Two => 1, - }; - self.gateway_addresses.lock().unwrap()[i] = Some(addr); - } -} - impl SpHandler for Handler { fn discover( &mut self, @@ -491,7 +481,6 @@ impl SpHandler for Handler { port: SpPort, target: u8, ) -> Result { - self.update_gateway_address(sender, port); warn!( &self.log, "received ignition state request; not supported by gimlet"; @@ -507,7 +496,6 @@ impl SpHandler for Handler { sender: SocketAddrV6, port: SpPort, ) -> Result { - self.update_gateway_address(sender, port); warn!( &self.log, "received bulk ignition state request; not supported by gimlet"; @@ -524,7 +512,6 @@ impl SpHandler for Handler { target: u8, command: gateway_messages::IgnitionCommand, ) -> Result<(), ResponseError> { - self.update_gateway_address(sender, port); warn!( &self.log, "received ignition command; not supported by gimlet"; @@ -536,23 +523,52 @@ impl SpHandler for Handler { Err(ResponseError::RequestUnsupportedForSp) } - fn serial_console_write( + fn serial_console_attach( &mut self, sender: SocketAddrV6, port: SpPort, component: SpComponent, - data: &[u8], ) -> Result<(), ResponseError> { - self.update_gateway_address(sender, port); + debug!( + &self.log, + "received serial console attach request"; + "sender" => %sender, + "port" => ?port, + "component" => ?component, + ); + + let mut attached_mgs = self.attached_mgs.lock().unwrap(); + if attached_mgs.is_some() { + return Err(ResponseError::SerialConsoleAlreadyAttached); + } + + *attached_mgs = Some((component, port, sender)); + Ok(()) + } + + fn serial_console_write( + &mut self, + sender: SocketAddrV6, + port: SpPort, + offset: u64, + data: &[u8], + ) -> Result { debug!( &self.log, "received serial console packet"; "sender" => %sender, "port" => ?port, "len" => data.len(), - "component" => ?component, + "offset" => offset, ); + let component = self + .attached_mgs + .lock() + .unwrap() + .map(|(component, _port, _addr)| component) + .ok_or(ResponseError::SerialConsoleNotAttached)?; + let incoming_serial_console = self .incoming_serial_console .get(&component) @@ -565,6 +581,21 @@ impl SpHandler for Handler { // ignore errors here let _ = incoming_serial_console.send(data.to_vec()); + Ok(offset + data.len() as u64) + } + + fn serial_console_detach( + &mut self, + sender: SocketAddrV6, + port: SpPort, + ) -> Result<(), ResponseError> { + debug!( + &self.log, + "received serial console detach request"; + "sender" => %sender, + "port" => ?port, + ); + *self.attached_mgs.lock().unwrap() = None; Ok(()) } @@ -573,7 +604,6 @@ impl SpHandler for Handler { sender: SocketAddrV6, port: SpPort, ) -> Result { - self.update_gateway_address(sender, port); let state = SpState { serial_number: self.serial_number, version: SIM_GIMLET_VERSION, diff --git a/sp-sim/src/sidecar.rs b/sp-sim/src/sidecar.rs index 23c54c342b9..46bff880814 100644 --- a/sp-sim/src/sidecar.rs +++ b/sp-sim/src/sidecar.rs @@ -388,13 +388,28 @@ impl SpHandler for Handler { Ok(()) } - fn serial_console_write( + fn serial_console_attach( &mut self, sender: SocketAddrV6, port: SpPort, _component: SpComponent, + ) -> Result<(), ResponseError> + { + warn!( + &self.log, "received serial console attach; unsupported by sidecar"; + "sender" => %sender, + "port" => ?port, + ); + Err(ResponseError::RequestUnsupportedForSp) + } + + fn serial_console_write( + &mut self, + sender: SocketAddrV6, + port: SpPort, + _offset: u64, _data: &[u8], - ) -> Result<(), ResponseError> { + ) -> Result { warn!( &self.log, "received serial console write; unsupported by sidecar"; "sender" => %sender, @@ -403,6 +418,20 @@ impl SpHandler for Handler { Err(ResponseError::RequestUnsupportedForSp) } + fn serial_console_detach( + &mut self, + sender: SocketAddrV6, + port: SpPort, + ) -> Result<(), ResponseError> + { + warn!( + &self.log, "received serial console detach; unsupported by sidecar"; + "sender" => %sender, + "port" => ?port, + ); + Err(ResponseError::RequestUnsupportedForSp) + } + fn sp_state( &mut self, sender: SocketAddrV6, From d47fb95bbb0078fd44da71abc582387086b60201 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 29 Aug 2022 10:46:41 -0400 Subject: [PATCH 4/6] cargo fmt --- sp-sim/src/sidecar.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sp-sim/src/sidecar.rs b/sp-sim/src/sidecar.rs index 46bff880814..da841f77870 100644 --- a/sp-sim/src/sidecar.rs +++ b/sp-sim/src/sidecar.rs @@ -393,8 +393,7 @@ impl SpHandler for Handler { sender: SocketAddrV6, port: SpPort, _component: SpComponent, - ) -> Result<(), ResponseError> - { + ) -> Result<(), ResponseError> { warn!( &self.log, "received serial console attach; unsupported by sidecar"; "sender" => %sender, @@ -422,8 +421,7 @@ impl SpHandler for Handler { &mut self, sender: SocketAddrV6, port: SpPort, - ) -> Result<(), ResponseError> - { + ) -> Result<(), ResponseError> { warn!( &self.log, "received serial console detach; unsupported by sidecar"; "sender" => %sender, From 60f704008e8d4a8545195f03384cb8232184391e Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 29 Aug 2022 10:46:50 -0400 Subject: [PATCH 5/6] Make example config match sp-sim config --- gateway/examples/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/examples/config.toml b/gateway/examples/config.toml index ff53ef02308..4ef963295ea 100644 --- a/gateway/examples/config.toml +++ b/gateway/examples/config.toml @@ -61,7 +61,7 @@ switch1 = ["switch", 1] [switch.port.1] data_link_addr = "[::]:33201" -multicast_addr = "[ff15:0:1de::1%2]:11111" +multicast_addr = "[ff15:0:1de::1]:33310" [switch.port.1.location] switch0 = ["sled", 0] switch1 = ["sled", 0] From 644aab4c1c9c405cd92f04cefe69ece0bfed9294 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Mon, 29 Aug 2022 14:00:40 -0400 Subject: [PATCH 6/6] Fix failing test --- gateway-sp-comms/src/error.rs | 2 -- gateway/src/error.rs | 6 +++++- gateway/src/http_entrypoints.rs | 1 - 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/gateway-sp-comms/src/error.rs b/gateway-sp-comms/src/error.rs index ba31d6550a9..403f88e684e 100644 --- a/gateway-sp-comms/src/error.rs +++ b/gateway-sp-comms/src/error.rs @@ -73,8 +73,6 @@ pub enum Error { SpCommunicationFailed(#[from] SpCommunicationError), #[error("updating SP failed: {0}")] UpdateFailed(#[from] UpdateError), - #[error("serial console is already attached")] - SerialConsoleAttached, } #[derive(Debug, Error)] diff --git a/gateway/src/error.rs b/gateway/src/error.rs index 6816f98c118..f8b01d1a166 100644 --- a/gateway/src/error.rs +++ b/gateway/src/error.rs @@ -7,7 +7,9 @@ use std::borrow::Borrow; use dropshot::HttpError; +use gateway_messages::ResponseError; use gateway_sp_comms::error::Error as SpCommsError; +use gateway_sp_comms::error::SpCommunicationError; #[derive(Debug, thiserror::Error)] pub(crate) enum Error { @@ -53,7 +55,9 @@ where Some("InvalidSp".to_string()), err.to_string(), ), - SpCommsError::SerialConsoleAttached => HttpError::for_bad_request( + SpCommsError::SpCommunicationFailed(SpCommunicationError::SpError( + ResponseError::SerialConsoleAlreadyAttached, + )) => HttpError::for_bad_request( Some("SerialConsoleAttached".to_string()), err.to_string(), ), diff --git a/gateway/src/http_entrypoints.rs b/gateway/src/http_entrypoints.rs index 5afcbf4d114..a2229de22d3 100644 --- a/gateway/src/http_entrypoints.rs +++ b/gateway/src/http_entrypoints.rs @@ -326,7 +326,6 @@ async fn sp_list( // These errors should not be possible for the request we // made. SpCommsError::SpDoesNotExist(_) - | SpCommsError::SerialConsoleAttached | SpCommsError::UpdateFailed(_) => { unreachable!("impossible error {}", err) }