-
Notifications
You must be signed in to change notification settings - Fork 177
Refactor futures executor #168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks a lot for the PR! @Pauan Since you're the one who's the most familiar with this code I'd be great if you could take a look at this and review it.
Yes! I would very much like to migrate to it as soon as possible. (Hopefully |
Also, since I've merged @Pauan's other PR this started to have merge conflicts, so you'll have to resolve the them. Sorry. (: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for tasks to carry around a reference to their executor, since there is a single global executor. [...] The "inner" state of the executor does not need to be reference counted.
When wasm gets multiple threads, your implementation will break. It will compile correctly, and it (probably) won't have any runtime errors, but it will silently have different behavior.
I would rather have an Executor that works correctly across multiple threads. That makes it more useful when wasm gets multiple threads, and it's also useful for porting our Executor implementation to other hosts (the only JS/wasm specific thing is the waker function).
Since the Rc
is only created once, the overhead for making it thread-safe is extremely small.
The executor does not need to keep track of whether it is currently running.
That's a good change.
The names of various things (PromiseExecutor, Notifier, etc.) have become out of date. (eg. this is no longer an executor purely based around promises.)
I agree.
src/webcore/executor.rs
Outdated
// A proxy for the javascript event loop. | ||
struct EventLoop; | ||
|
||
// There's only one thread, but this lets us tell the compiler that we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This note doesn't seem correct. thread_local
has nothing to do with "telling the compiler that we don't need a Sync
bound".
Instead, it doesn't need a Sync
bound because it will actually create a new value on each thread, so if there's 5 threads it might create 5 EventLoopInner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the absence of multiple threads, a "thread-local" is a way of telling the compiler that globals do not need to be thread-safe. (ie. not Sync
) whilst also achieving lazy initialisation, so I think the comment is pretty accurate.
src/webcore/executor.rs
Outdated
} | ||
} | ||
|
||
// State relating to the javascript event loop. Only one instance ever exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When wasm gets multiple threads this will no longer be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When wasm gets multiple threads we can re-evaluate the implementation.
} | ||
|
||
impl EventLoopInner { | ||
// Initializes the event loop. Only called once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not true when wasm gets multiple threads.
var node = document.createTextNode( "0" ); | ||
var state = false; | ||
|
||
new MutationObserver( wrapper ).observe( node, { characterData: true } ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the original code doesn't clean this up, but it really should clean up the MutationObserver
when the EventLoopInner
is dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will happen automatically - a MutationObserver
is garbage collected when its target node is collected. The target node will be garbage collected when the waker
is dropped. The waker
will be dropped when the EventLoopInner
is dropped.
src/webcore/executor.rs
Outdated
} else { | ||
var promise = Promise.resolve( null ); | ||
|
||
function nextTick( value ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't take a value
argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah
src/webcore/executor.rs
Outdated
self.microtask_queue.borrow_mut().pop_front() | ||
} | ||
// Poll the queue until it is empty | ||
fn drain(&self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should clean up the memory for the queue. Otherwise it will waste memory if a large number of Tasks are queued (since it will grow the memory, but it will never deallocate the memory).
I had an idea of only cleaning up the queue memory if a certain amount of time has passed (e.g. 5 seconds).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, but I think that can wait for a second PR. I'd prefer not to tie it to wall-clock time, perhaps "shrinking the queue if the maximum fill ratio from the last N iterations drops below 50%" would be a good metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case why not just add back in my implementation of always cleaning the queue? That's simple to add and we can always refine it later.
src/webcore/executor.rs
Outdated
var promise = Promise.resolve( null ); | ||
|
||
function nextTick( value ) { | ||
promise.then( wrapper ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the EventLoopInner
is dropped while the Promise is pending, it will cause a runtime panic. So in that situation it shouldn't call the callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the EventLoopInner
is never dropped (since the main thread never exits) but I will adjust this anyway.
static EVENT_LOOP_INNER: EventLoopInner = EventLoopInner::new(); | ||
} | ||
|
||
impl EventLoop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These functions should probably be marked as #[inline]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
} | ||
} | ||
|
||
fn notify( task: Rc< SpawnedTask > ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be marked #[inline]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Until we find a performance issue due to poor codegen, I'd rather let the compiler decide whether to inline it.
src/webcore/executor.rs
Outdated
} | ||
} | ||
|
||
// A proxy for the javascript event loop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: javascript
-> JavaScript
@Pauan most of your complaints seem to be around multiple threads: wasm doesn't have multiple threads, and it's not clear yet that it will ever get anything beyond web-workers (this PR is compatible with web-workers) so I don't see any reason to take that into consideration - the implementation it replaces was not compatible with threads anyway. |
@Diggsey That's not true. With WebWorkers it's possible to share memory between WebWorkers, so the same multi-thread data race issues happen (and thus the usual Rust thread-safe abstractions need to be used). In fact wasm already had multi-thread support until SharedArrayBuffer was temporarily disabled. Let's suppose that WebAssembly does get threads. If your implementation caused compile-time or run-time errors, then I would be perfectly fine with assuming a single-threaded host. But that's not the case. Because your implementation will cause silent and subtle breakage, it can be quite easy for us to not notice and forget to add in proper thread-safe support later. Because we will need to support multi-threading at some point in the future, why not implement it now and avoid the future problems? The performance cost is incredibly negligible.
Then how can it be made thread-safe? By changing SpawnedTask to use Arc? Or something deeper than that? |
It can't be, the rust stdlib is inherently single-threaded right now - come back to this when rust supports multiple threads.
There's no way to do that - the current rust (and possible llvm) implementation will already break horribly and silently if you try to use |
@Diggsey Yes... I know that the wasm implementation of Rust is single-threaded right now. I'm saying that will change in the future, and probably sooner than you think. So I want to make sure that our implementation is up to snuff, rather than having silent breakage later. |
And I'm saying doing that now is a bad idea when there's no way to actually test our implementation, and we still don't know for sure what the constraints of that multi-threading model will be. When we have a nightly rust that supports multi-threaded wasm and an environment to run it in, then we can start thinking about it. |
You're right that there's no way to test the implementation right now, but we can base the implementation on existing things like the |
The The javascript executor is only superficially similar: we have to rely on thread-local storage, because a new javascript executor is automatically created for every thread, and we have no way to create our own executors. We have no control over the wake-up mechanism, it must go through an existing javascript mechanism. The wake-up mechanism is particularly difficult: if we have a handle to a task running on another thread, we cannot use promises or DOM observers to wake up that thread. We must use an API such as Worker.postMessage - but we only want to fall back to that mechanism if it really is running on a different thread. Also, this means attaching a message listener on every thread, which must have some means to filter out messages meant for the executor vs those in use by the application. We also have to consider that the rust standard library will also have installed its own custom listeners and we must not conflict with that. Now can you see why I have no desire to get into this complexity until we have a working foundation, and even attempting to do so now is a futile effort? |
Fine, at least put in a big huge |
I added a file-level comment describing the executor and its limitations, and also implemented queue shrinking if the queue is under 50% utilised for 25 consecutive iterations. |
@@ -0,0 +1,226 @@ | |||
// This file implements a futures-compatible executor which schedules futures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add in TODO
for easier greppability.
Rc::new( Self { | ||
is_queued: Cell::new( false ), | ||
spawn: RefCell::new( Some( executor::spawn( | ||
Box::new( future ) as BoxedFuture |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tested it, but is it actually needed to convert this into a Box
? Looking at the docs for spawn
, it doesn't seem to require a Box
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spawn<T>(future: T)
returns a Spawn<T>
- ie. it does not perform type-erasure. The box is needed to erase the type.
src/webcore/executor.rs
Outdated
|
||
new MutationObserver( wrapper ).observe( node, { characterData: true } ); | ||
|
||
function nextTick() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function declarations are block scoped, so this won't work. So either it needs to move the return nextTick;
inside of the branches, or it needs to use var nextTick = ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually they're not - function declarations are hoisted to the enclosing function/global scope. However, it looks like Edge does not implement this correctly so I will change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is incorrect. Function declarations in blocks have always been invalid according to the ECMAScript spec. Browsers implemented it anyways, but with inconsistent behavior. Function declarations are always block scoped in strict mode, in all browsers.
src/webcore/executor.rs
Outdated
fn pop_task(&self) -> Option< Rc< SpawnedTask > > { | ||
self.microtask_queue.borrow_mut().pop_front() | ||
} | ||
fn shrink_if_necessary(&self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should have a blank line above and below this function.
src/webcore/executor.rs
Outdated
// `QUEUE_SHRINK_DELAY` iterations. | ||
let shrink_counter = self.shrink_counter.get(); | ||
if shrink_counter < QUEUE_SHRINK_DELAY { | ||
self.shrink_counter.set(shrink_counter+1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should have spaces around +
src/webcore/executor.rs
Outdated
if shrink_counter < QUEUE_SHRINK_DELAY { | ||
self.shrink_counter.set(shrink_counter+1); | ||
} else { | ||
queue.shrink_to_fit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't use shrink_to_fit
, it should replace the queue with a new VecDeque
with INITIAL_QUEUE_CAPACITY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it shouldn't, that will result in unnecessary allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shrink_to_fit
also deallocates + allocates, but less efficiently than recreating the VecDeque
.
In addition, shrink_to_fit
will cause it to have 0 capacity, but we want it to have INITIAL_QUEUE_CAPACITY
capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we want it to have a capacity equal to the last queue size, since that is a good estimate for future iterations. We could avoid some overhead by waiting to resize it until after draining the items, but this is a very cold code-path (it's only hit 25 iterations after sudden drops in the queue size).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, I see, you moved the call to shrink_if_necessary
I still don't like that it might have a capacity lower than INITIAL_QUEUE_CAPACITY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's actually not a good way to control the capacity this way. I've implemented the closest approximation instead, and opened rust-lang/rust#49385.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure there is... have the shrink_if_necessary
method return an Option<usize>
, and then after draining it will check if it is Some
, and if so it will replace the old VecDeque
with a new VecDeque
with the appropriate capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I didn't really want to leak this complexity into the drain function, but I've implemented that now.
src/webcore/executor.rs
Outdated
let mut queue = self.microtask_queue.borrow_mut(); | ||
// We consider shrinking the queue if it is less than | ||
// half full... | ||
if queue.len() <= queue.capacity() / 2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong. The len
will always be 0
, because it's called after draining. It should instead shrink it if the capacity is larger than INITIAL_QUEUE_CAPACITY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right about it reading the length at the wrong point, however using INITIAL_QUEUE_CAPACITY
here will shrink the queue even if it is being fully utilised, which is wrong.
src/webcore/executor.rs
Outdated
// Initial capacity of the event queue | ||
const INITIAL_QUEUE_CAPACITY: usize = 10; | ||
// Iterations to wait before allowing the queue to shrink | ||
const QUEUE_SHRINK_DELAY: usize = 25; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a TODO
which says that we should test for the optimal sizes for these two constants.
e0e2044
to
c7aa2c3
Compare
c7aa2c3
to
0ce735a
Compare
src/webcore/executor.rs
Outdated
@@ -178,6 +178,10 @@ impl EventLoopInner { | |||
// Reclaim space from the queue if it's going to waste | |||
fn shrink_if_necessary(&self) { | |||
let mut queue = self.microtask_queue.borrow_mut(); | |||
if queue.capacity() <= INITIAL_QUEUE_CAPACITY { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right. Let's say the queue has a capacity of 50 but a length of 1. This check will return false, but then later it calls shrink_to_fit
which resizes it to a capacity of 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, great, it looks good. I just need to test that it works correctly and doesn't regress performance, and then I'll approve it.
src/webcore/executor.rs
Outdated
let cap = queue.capacity(); | ||
// We consider shrinking the queue if it is less than | ||
// half full... | ||
if cap > queue.len()*2 && cap > INITIAL_QUEUE_CAPACITY { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't block my approval, however it is possible for the capacity of the VecDeque
to be higher than INITIAL_QUEUE_CAPACITY
.
That means it will reallocate every 25 iterations, even if the queue never goes above INITIAL_QUEUE_CAPACITY
elements.
Either this should be fixed, or a TODO
note should be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And... this is why more control is needed over the capacity 😛 With a shrink_to
method it wouldn't matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed the issue by allowing it up to 2*INITIAL_QUEUE_CAPACITY without trying to shrink (VecDeque
uses powers of two).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More control might not help. The if
branch conditional will still be taken, with or without a shrink_to
method. So you're hoping that the VecDeque::shrink_to
implementation always gives exactly the same result as VecDeque::with_capacity
, which isn't guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't matter what shrink_to
gives - as long as it doesn't reallocate if the new capacity is no different from the old one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if it were to produce 16, then nothing would happen since the capacity would already be smaller than that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not want performance to mysteriously change based upon the implementation details of VecDeque changing.
The implementation details of VecDeque
are not going to change to waste more than a factor of 2 in space, and this implementation is compatible with any other change.
so let's use that instead.
No, there's no reason to introduce another variable and code to track that to handle a situation that's not going to happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Source please? My understanding is that the capacity is not guaranteed in the slightest, and it could change to use any strategy it wishes, including choosing the capacity dynamically.
Yes it's unlikely that it will change, but that's not the same as it never changing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that the capacity is not guaranteed in the slightest, and it could change to use any strategy it wishes, including choosing the capacity dynamically.
In which case your solution is just as broken.
As far as re-allocation strategies go, it's well known that sub-power-of-2 allocation ratios are preferred, as they reduce fragmentation when re-allocating. The main reason power of 2 is used here is because it uses bit-masking to wrap indexes, and computers use base 2.
Nothing is certain, but given those facts, I can be more confident that the allocation ratio is not going to increase than I can be in any of the other myriad of assumptions underpinning the very existence of this crate. Even if it does increase, it's not going to break anything, so the impact/probability ratio is so low that this entire conversation is irrelevant.
You simply cannot performance tune without relying on implementation details, and attempting to do so is a waste of time, and is the definition of premature optimisation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, my solution isn't broken. If you create a VecDeque
with capacity 10, but it's actually capacity 15, then we know if it hasn't gone above capacity 15 then it doesn't need to be reallocated, because Rust does guarantee that it will only reallocate if the number of elements goes above the current capacity. So it doesn't rely upon implementation details at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I tested it, the performance seems on par, so this is okay with me.
So can this be considered ready? |
Yes |
Looks good to me! Thanks a lot! |
(View individual commits for useful diff)
With the recent performance improvements, some unnecessary complexity was introduced:
PromiseExecutor
,Notifier
, etc.) have become out of date. (eg. this is no longer an executor purely based around promises.)With this complexity removed, I was also able to remove some redundant clones, and added some comments (eg. why we're using
MutationObserver
).cc @Pauan, you may wish to verify that this does not regress performance in any way.
Also, thoughts on upgrading to
futures 0.2.0-beta
? Upgrading would allow us to remove the unsafe code here: the new version provides a safe implementation ofNotify
(now calledWake
) for reference counted types, so we don't have to.