-
Notifications
You must be signed in to change notification settings - Fork 359
Upgrade to tokio 1.0 #276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade to tokio 1.0 #276
Conversation
244063b
to
110d9ea
Compare
For the people looking for Tokio 1.0 support of aws-lambda-rust-runtime: Netlify's fork already has Tokio 1.0 support. FYI References |
@coolreader18 I can confirm that it is working 👍 solved many hours debugging why it was not working with Hoping to see this merged soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this is a great contribution and we are very appreciative for it! Looking forward to hearing back.
lambda/src/simulated.rs
Outdated
let bytes_to_read = min(to_buf.remaining(), self.buffer.len()); | ||
let (buf_l, buf_r) = self.buffer.as_mut_slices(); | ||
let (len_l, len_r) = (buf_l.len(), buf_r.len()); | ||
let (buf_l, buf_r): (&mut [u8], &mut [u8]) = if len_r >= bytes_to_read { | ||
(&mut [], &mut buf_r[len_r - bytes_to_read..]) | ||
} else { | ||
let bytes_from_l = bytes_to_read - buf_r.len(); | ||
(&mut buf_l[len_l - bytes_from_l..], buf_r) | ||
}; | ||
|
||
buf_r.reverse(); | ||
to_buf.put_slice(buf_r); | ||
buf_l.reverse(); | ||
to_buf.put_slice(buf_l); | ||
|
||
let remaining_buffer_size = self.buffer.len() - bytes_to_read; | ||
self.buffer.truncate(remaining_buffer_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm conflicted on how I feel about this change specifically. It seems like quite a bit of extra code to achieve necessarily the same thing, unless I'm totally mistaken.
Just to confirm my understanding - you split the buffer into two slices left and right, check if bytes_to_read
is less than or equal to the length of the right buffer, at which point you take what you need from the right buffer to place into the to_buf
. If bytes_to_read
is greater than the right, then you take what you need from the left and return both all of the right buffer and what you've taken from the left. And then finally you truncate the buffer before returning.
It just seems like a lot of extra code that makes this somewhat difficult to follow. Is it necessary? If so, is there a way we could simplify this or at least comment a bit on the intention here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so self.buffer
is a VecDeque, i.e. not contiguous. However, that doesn't mean that you can't get contiguous slices from it, and the repeated pop_back().unwrap()
seemed inefficient, especially when the equivalent for tokio::io::ReadBuf
would be repeated .put_slice(&[i])
. I can definitely add comments to the code, but it's essentially taking bytes_to_read
bytes from the end of the buffer, than feeding them backwards into the read buf. The truncation is to formally "take" the slice we've read out of the buffer. It's more complicated in code than it might sound since there's 2 slices that make up the VecDeque, but that's all it's doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though, I think this can be a lot simpler, since it doesn't need to be reversed at all if we append instead of prepend in BufferState::write()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing these. I'll take a look at the PR you split off from this soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Tokio now has https://docs.rs/tokio/1.1.0/tokio/io/struct.DuplexStream.html, I think this code can be removed wholesale—I wrote this code initially to mimic this. I think it's largely compatible?
lambda/src/simulated.rs
Outdated
fn read(&mut self, to_buf: &mut [u8]) -> usize { | ||
fn read(&mut self, to_buf: &mut ReadBuf<'_>) { | ||
// Read no more bytes than we have available, and no more bytes than we were asked for | ||
let bytes_to_read = min(to_buf.len(), self.buffer.len()); | ||
for i in 0..bytes_to_read { | ||
to_buf[i] = self.buffer.pop_back().unwrap(); | ||
} | ||
let bytes_to_read = min(to_buf.remaining(), self.buffer.len()); | ||
// a VecDeque isn't contiguous, so we have 2 slices we need to account for | ||
let (buf_l, buf_r) = self.buffer.as_slices(); | ||
if let Some(buf) = buf_l.get(..bytes_to_read) { | ||
// if buf_l has enough to satisfy bytes_to_read, just take from it | ||
to_buf.put_slice(buf); | ||
} else { | ||
// otherwise, use up all of buf_l and take the remaining from buf_r | ||
to_buf.put_slice(buf_l); | ||
to_buf.put_slice(&buf_r[..bytes_to_read - buf_l.len()]); | ||
}; | ||
|
||
bytes_to_read | ||
// cut off what we've read from the start of the buffer | ||
self.buffer.drain(..bytes_to_read); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your PR! We really want to get the new tokio update out as soon as we can. I'm a little skeptical about if this is really necessary though especially since simulated.rs
is used for testing only. Do you think we can separate this out to a separate issue for more discussion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I've removed that change from the PR.
To me, everything looks good. I played around with this on my side and things seem to be functioning as expected as well. Going to ask the others to take a look just for posterity, but hopefully we can get this merged in soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved with a few nits.
serde = { version = "1", features = ["derive"] } | ||
serde_json = "1.0.39" | ||
tower-service = "0.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: technically speaking, this dependency is not being removed. Hyper already depends on tower-service = "0.3"
—they are the same type. I'm weakly in favor of keeping this and using the crate that defines it directly, but I dunno.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like it makes sense to use the re-export from hyper - it is part of the public API, and I think it makes it more clear that hyper uses that trait as well, rather than us integrating with something else from the tower stack.
lambda/Cargo.toml
Outdated
tokio = { version = "0.2.4", features = ["full"] } | ||
hyper = "0.13" | ||
tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] } | ||
hyper = { version = "0.14", features = ["full"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'm not sure that we want the "full" dependency here—I think we should be using everything except for the http2
feature. See details here: https://docs.rs/hyper/0.14.2/hyper/#optional-features
impl AsyncRead for SimStream { | ||
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<IoResult<usize>> { | ||
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<IoResult<()>> { | ||
Pin::new(&mut self.read).poll_read(cx, buf) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this in a comment elsewhere, but I think that SimStream can be replaced with tokio::io::Duplex
. This isn't a blocker to merging, however, but it would reduce how much code needs to be in this crate.
(I think some usages in tests would require a std::sync::Arc
, but I'm not sure.)
@coolreader18 Thoughts on the comments left by @davidbarsky? Don't worry about removing the SimStream in this PR, I'm mainly talking about changing the dependency changes slightly. I think once that's done I'll merge this PR in. |
Alright, we're merged in. Thanks for the PR and addressing the feedback so quickly! |
Description of changes: Change to depend on tokio 1.0 instead of tokio 0.2.
By submitting this pull request