Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions crates/ra_lsp_server/src/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod handlers;
mod subscriptions;

use std::{fmt, path::PathBuf, sync::Arc};
use std::{fmt, path::PathBuf, sync::Arc, time::Instant};

use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender};
use failure::{bail, format_err};
Expand All @@ -12,7 +12,7 @@ use gen_lsp_server::{
use lsp_types::NumberOrString;
use ra_ide_api::{Canceled, FileId, LibraryData};
use ra_vfs::VfsTask;
use rustc_hash::FxHashSet;
use rustc_hash::FxHashMap;
use serde::{de::DeserializeOwned, Serialize};
use threadpool::ThreadPool;

Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn main_loop(

log::info!("server initialized, serving requests");

let mut pending_requests = FxHashSet::default();
let mut pending_requests = FxHashMap::default();
let mut subs = Subscriptions::new();
let main_res = main_loop_inner(
options,
Expand Down Expand Up @@ -159,7 +159,7 @@ fn main_loop_inner(
task_sender: Sender<Task>,
task_receiver: Receiver<Task>,
state: &mut ServerWorldState,
pending_requests: &mut FxHashSet<u64>,
pending_requests: &mut FxHashMap<u64, Instant>,
subs: &mut Subscriptions,
) -> Result<()> {
// We try not to index more than THREADPOOL_SIZE - 3 libraries at the same
Expand All @@ -170,7 +170,6 @@ fn main_loop_inner(

let (libdata_sender, libdata_receiver) = unbounded();
loop {
state.maybe_collect_garbage();
log::trace!("selecting");
let event = select! {
recv(msg_receiver) -> msg => match msg {
Expand All @@ -184,17 +183,28 @@ fn main_loop_inner(
},
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
};
log::info!("loop_turn = {:?}", event);
let _p = profile("loop_turn");
// NOTE: don't count blocking select! call as a loop-turn time
let _p = profile("main_loop_inner/loop-turn");
let loop_start = Instant::now();

log::info!("loop turn = {:?}", event);
let queue_count = pool.queued_count();
if queue_count > 0 {
log::info!("queued count = {}", queue_count);
}
let mut state_changed = false;
match event {
Event::Task(task) => on_task(task, msg_sender, pending_requests),
Event::Task(task) => {
on_task(task, msg_sender, pending_requests);
state.maybe_collect_garbage();
}
Event::Vfs(task) => {
state.vfs.write().handle_task(task);
state_changed = true;
}
Event::Lib(lib) => {
state.add_lib(lib);
state.maybe_collect_garbage();
in_flight_libraries -= 1;
}
Event::Msg(msg) => match msg {
Expand All @@ -210,7 +220,14 @@ fn main_loop_inner(
msg_sender.send(resp.into()).unwrap()
}
Err(req) => {
match on_request(state, pending_requests, pool, &task_sender, req)? {
match on_request(
state,
pending_requests,
pool,
&task_sender,
loop_start,
req,
)? {
None => (),
Some(req) => {
log::error!("unknown request: {:?}", req);
Expand Down Expand Up @@ -272,10 +289,15 @@ fn main_loop_inner(
}
}

fn on_task(task: Task, msg_sender: &Sender<RawMessage>, pending_requests: &mut FxHashSet<u64>) {
fn on_task(
task: Task,
msg_sender: &Sender<RawMessage>,
pending_requests: &mut FxHashMap<u64, Instant>,
) {
match task {
Task::Respond(response) => {
if pending_requests.remove(&response.id) {
if let Some(request_received) = pending_requests.remove(&response.id) {
log::info!("handled req#{} in {:?}", response.id, request_received.elapsed());
msg_sender.send(response.into()).unwrap();
}
}
Expand All @@ -287,9 +309,10 @@ fn on_task(task: Task, msg_sender: &Sender<RawMessage>, pending_requests: &mut F

fn on_request(
world: &mut ServerWorldState,
pending_requests: &mut FxHashSet<u64>,
pending_requests: &mut FxHashMap<u64, Instant>,
pool: &ThreadPool,
sender: &Sender<Task>,
request_received: Instant,
req: RawRequest,
) -> Result<Option<RawRequest>> {
let mut pool_dispatcher = PoolDispatcher { req: Some(req), res: None, pool, world, sender };
Expand Down Expand Up @@ -325,8 +348,8 @@ fn on_request(
.finish();
match req {
Ok(id) => {
let inserted = pending_requests.insert(id);
assert!(inserted, "duplicate request: {}", id);
let prev = pending_requests.insert(id, request_received);
assert!(prev.is_none(), "duplicate request: {}", id);
Ok(None)
}
Err(req) => Ok(Some(req)),
Expand All @@ -336,7 +359,7 @@ fn on_request(
fn on_notification(
msg_sender: &Sender<RawMessage>,
state: &mut ServerWorldState,
pending_requests: &mut FxHashSet<u64>,
pending_requests: &mut FxHashMap<u64, Instant>,
subs: &mut Subscriptions,
not: RawNotification,
) -> Result<()> {
Expand All @@ -348,7 +371,7 @@ fn on_notification(
panic!("string id's not supported: {:?}", id);
}
};
if pending_requests.remove(&id) {
if pending_requests.remove(&id).is_some() {
let response = RawResponse::err(
id,
ErrorCode::RequestCanceled as i32,
Expand Down Expand Up @@ -426,7 +449,7 @@ impl<'a> PoolDispatcher<'a> {
let world = self.world.snapshot();
let sender = self.sender.clone();
self.pool.execute(move || {
let resp = match f(world, params) {
let response = match f(world, params) {
Ok(resp) => RawResponse::ok::<R>(id, &resp),
Err(e) => match e.downcast::<LspError>() {
Ok(lsp_error) => {
Expand Down Expand Up @@ -458,7 +481,7 @@ impl<'a> PoolDispatcher<'a> {
}
},
};
let task = Task::Respond(resp);
let task = Task::Respond(response);
sender.send(task).unwrap();
});
self.res = Some(id);
Expand All @@ -468,7 +491,7 @@ impl<'a> PoolDispatcher<'a> {
Ok(self)
}

fn finish(&mut self) -> ::std::result::Result<u64, RawRequest> {
fn finish(&mut self) -> std::result::Result<u64, RawRequest> {
match (self.res.take(), self.req.take()) {
(Some(res), None) => Ok(res),
(None, Some(req)) => Err(req),
Expand Down