Skip to content

Commit eb28c3e

Browse files
committed
hook up async dispatcher in repl setup
1 parent 05380ee commit eb28c3e

File tree

4 files changed

+108
-11
lines changed

4 files changed

+108
-11
lines changed

Cargo.lock

Lines changed: 29 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ anyhow = "1.0.57"
262262
any_vec = "0.13"
263263
ashpd = "0.8.0"
264264
async-compression = { version = "0.4", features = ["gzip", "futures-io"] }
265+
async-dispatcher = { version = "0.1"}
265266
async-fs = "1.6"
266267
async-recursion = "1.0.0"
267268
async-tar = "0.4.2"
@@ -325,7 +326,7 @@ rand = "0.8.5"
325326
refineable = { path = "./crates/refineable" }
326327
regex = "1.5"
327328
repair_json = "0.1.0"
328-
runtimelib = "0.11.1"
329+
runtimelib = { path = "../../runtimed/runtimed/runtimelib", default-features = false, features = ["async-dispatcher-runtime"] }
329330
rusqlite = { version = "0.29.0", features = ["blob", "array", "modern_sqlite"] }
330331
rust-embed = { version = "8.4", features = ["include-exclude"] }
331332
schemars = "0.8"

crates/repl/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ doctest = false
1515
[dependencies]
1616
anyhow.workspace = true
1717
alacritty_terminal.workspace = true
18+
async-dispatcher.workspace = true
1819
base64.workspace = true
1920
collections.workspace = true
2021
editor.workspace = true

crates/repl/src/repl.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::{anyhow, Context as _, Result};
2+
use async_dispatcher::{set_dispatcher, Dispatcher, Runnable};
23
use collections::{HashMap, HashSet};
34
use editor::{
45
display_map::{
@@ -18,10 +19,11 @@ use outputs::{ExecutionStatus, ExecutionView, LineHeight as _};
1819
use project::Fs;
1920
use runtimelib::JupyterMessageContent;
2021
use settings::Settings as _;
21-
use std::ops::Range;
22-
use std::sync::Arc;
22+
use std::{ops::Range, sync::mpsc::RecvTimeoutError, time::Instant};
23+
use std::{sync::Arc, time::Duration};
2324
use theme::{ActiveTheme, ThemeSettings};
2425
use ui::prelude::*;
26+
use util::ResultExt;
2527
use workspace::Workspace;
2628

2729
mod outputs;
@@ -37,7 +39,79 @@ pub struct RuntimeManagerGlobal(Model<RuntimeManager>);
3739

3840
impl Global for RuntimeManagerGlobal {}
3941

42+
pub fn zed_dispatcher(cx: &mut AppContext) -> impl Dispatcher {
43+
struct ZedDispatcher {
44+
tx: std::sync::mpsc::Sender<(Runnable, Option<Instant>)>,
45+
task: Task<()>,
46+
}
47+
48+
let (tx, rx) = std::sync::mpsc::channel();
49+
50+
impl Dispatcher for ZedDispatcher {
51+
fn dispatch(&self, runnable: async_dispatcher::Runnable) {
52+
self.tx.send((runnable, None)).log_err();
53+
}
54+
55+
fn dispatch_after(
56+
&self,
57+
duration: std::time::Duration,
58+
runnable: async_dispatcher::Runnable,
59+
) {
60+
self.tx
61+
.send((runnable, Some(Instant::now() + duration)))
62+
.log_err();
63+
}
64+
}
65+
66+
let executor = cx.background_executor();
67+
68+
let task = executor.spawn(async {
69+
let mut timers = Vec::<(Runnable, Instant)>::new();
70+
let rx = rx;
71+
72+
loop {
73+
let timeout = timers
74+
.first()
75+
.map(|&(_, time)| time.saturating_duration_since(Instant::now()))
76+
.unwrap_or_else(|| Duration::from_secs(u64::MAX));
77+
78+
match rx.recv_timeout(timeout) {
79+
Ok((runnable, deadline)) => {
80+
if let Some(deadline) = deadline {
81+
let now = Instant::now();
82+
if deadline > now {
83+
let idx =
84+
match timers.binary_search_by_key(&deadline, |&(_, time)| time) {
85+
Ok(i) => i,
86+
Err(i) => i,
87+
};
88+
timers.insert(idx, (runnable, deadline));
89+
continue;
90+
}
91+
}
92+
runnable.run();
93+
}
94+
95+
Err(RecvTimeoutError::Timeout) => {
96+
let now = Instant::now();
97+
while let Some((_, deadline)) = timers.first() {
98+
if *deadline > now {
99+
break;
100+
}
101+
timers.remove(0).0.run();
102+
}
103+
}
104+
Err(RecvTimeoutError::Disconnected) => break,
105+
}
106+
}
107+
});
108+
109+
ZedDispatcher { tx, task }
110+
}
111+
40112
pub fn init(fs: Arc<dyn Fs>, cx: &mut AppContext) {
113+
set_dispatcher(zed_dispatcher(cx));
114+
41115
let runtime_manager = cx.new_model(|cx| RuntimeManager::new(fs.clone(), cx));
42116
RuntimeManager::set_global(runtime_manager.clone(), cx);
43117

0 commit comments

Comments
 (0)