Skip to content

Commit e42bc2c

Browse files
authored
Added Success/NoSuccess results for GenServer init (#39)
* Added Success/NoSuccess results for GenServer init * Added NoSuccess test
1 parent 8f94d4e commit e42bc2c

File tree

6 files changed

+122
-41
lines changed

6 files changed

+122
-41
lines changed

concurrency/src/messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
#[derive(Clone)]
1+
#[derive(Clone, Debug)]
22
pub struct Unused;

concurrency/src/tasks/gen_server.rs

Lines changed: 93 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use futures::future::FutureExt as _;
44
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
55
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};
66

7-
use crate::error::GenServerError;
7+
use crate::{
8+
error::GenServerError,
9+
tasks::InitResult::{NoSuccess, Success},
10+
};
811

912
const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);
1013

@@ -120,6 +123,11 @@ pub enum CastResponse<G: GenServer> {
120123
Stop,
121124
}
122125

126+
pub enum InitResult<G: GenServer> {
127+
Success(G),
128+
NoSuccess(G),
129+
}
130+
123131
pub trait GenServer: Send + Sized + Clone {
124132
type CallMsg: Clone + Send + Sized + Sync;
125133
type CastMsg: Clone + Send + Sized + Sync;
@@ -145,14 +153,18 @@ pub trait GenServer: Send + Sized + Clone {
145153
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
146154
) -> impl Future<Output = Result<(), GenServerError>> + Send {
147155
async {
148-
let init_result = self
149-
.init(handle)
150-
.await
151-
.inspect_err(|err| tracing::error!("Initialization failed: {err:?}"));
152-
153-
let res = match init_result {
154-
Ok(new_state) => new_state.main_loop(handle, rx).await,
155-
Err(_) => Err(GenServerError::Initialization),
156+
let res = match self.init(handle).await {
157+
Ok(Success(new_state)) => new_state.main_loop(handle, rx).await,
158+
Ok(NoSuccess(intermediate_state)) => {
159+
// new_state is NoSuccess, this means the initialization failed, but the error was handled
160+
// in callback. No need to report the error.
161+
// Just skip main_loop and return the state to teardown the GenServer
162+
Ok(intermediate_state)
163+
}
164+
Err(err) => {
165+
tracing::error!("Initialization failed with unhandled error: {err:?}");
166+
Err(GenServerError::Initialization)
167+
}
156168
};
157169

158170
handle.cancellation_token().cancel();
@@ -171,8 +183,8 @@ pub trait GenServer: Send + Sized + Clone {
171183
fn init(
172184
self,
173185
_handle: &GenServerHandle<Self>,
174-
) -> impl Future<Output = Result<Self, Self::Error>> + Send {
175-
async { Ok(self) }
186+
) -> impl Future<Output = Result<InitResult<Self>, Self::Error>> + Send {
187+
async { Ok(Success(self)) }
176188
}
177189

178190
fn main_loop(
@@ -297,8 +309,12 @@ pub trait GenServer: Send + Sized + Clone {
297309
mod tests {
298310

299311
use super::*;
300-
use crate::tasks::send_after;
301-
use std::{thread, time::Duration};
312+
use crate::{messages::Unused, tasks::send_after};
313+
use std::{
314+
sync::{Arc, Mutex},
315+
thread,
316+
time::Duration,
317+
};
302318

303319
#[derive(Clone)]
304320
struct BadlyBehavedTask;
@@ -315,16 +331,16 @@ mod tests {
315331

316332
impl GenServer for BadlyBehavedTask {
317333
type CallMsg = InMessage;
318-
type CastMsg = ();
319-
type OutMsg = ();
320-
type Error = ();
334+
type CastMsg = Unused;
335+
type OutMsg = Unused;
336+
type Error = Unused;
321337

322338
async fn handle_call(
323339
self,
324340
_: Self::CallMsg,
325341
_: &GenServerHandle<Self>,
326342
) -> CallResponse<Self> {
327-
CallResponse::Stop(())
343+
CallResponse::Stop(Unused)
328344
}
329345

330346
async fn handle_cast(
@@ -345,9 +361,9 @@ mod tests {
345361

346362
impl GenServer for WellBehavedTask {
347363
type CallMsg = InMessage;
348-
type CastMsg = ();
364+
type CastMsg = Unused;
349365
type OutMsg = OutMsg;
350-
type Error = ();
366+
type Error = Unused;
351367

352368
async fn handle_call(
353369
self,
@@ -370,7 +386,7 @@ mod tests {
370386
) -> CastResponse<Self> {
371387
self.count += 1;
372388
println!("{:?}: good still alive", thread::current().id());
373-
send_after(Duration::from_millis(100), handle.to_owned(), ());
389+
send_after(Duration::from_millis(100), handle.to_owned(), Unused);
374390
CastResponse::NoReply(self)
375391
}
376392
}
@@ -380,9 +396,9 @@ mod tests {
380396
let runtime = rt::Runtime::new().unwrap();
381397
runtime.block_on(async move {
382398
let mut badboy = BadlyBehavedTask.start();
383-
let _ = badboy.cast(()).await;
399+
let _ = badboy.cast(Unused).await;
384400
let mut goodboy = WellBehavedTask { count: 0 }.start();
385-
let _ = goodboy.cast(()).await;
401+
let _ = goodboy.cast(Unused).await;
386402
rt::sleep(Duration::from_secs(1)).await;
387403
let count = goodboy.call(InMessage::GetCount).await.unwrap();
388404

@@ -400,9 +416,9 @@ mod tests {
400416
let runtime = rt::Runtime::new().unwrap();
401417
runtime.block_on(async move {
402418
let mut badboy = BadlyBehavedTask.start_blocking();
403-
let _ = badboy.cast(()).await;
419+
let _ = badboy.cast(Unused).await;
404420
let mut goodboy = WellBehavedTask { count: 0 }.start();
405-
let _ = goodboy.cast(()).await;
421+
let _ = goodboy.cast(Unused).await;
406422
rt::sleep(Duration::from_secs(1)).await;
407423
let count = goodboy.call(InMessage::GetCount).await.unwrap();
408424

@@ -428,9 +444,9 @@ mod tests {
428444

429445
impl GenServer for SomeTask {
430446
type CallMsg = SomeTaskCallMsg;
431-
type CastMsg = ();
432-
type OutMsg = ();
433-
type Error = ();
447+
type CastMsg = Unused;
448+
type OutMsg = Unused;
449+
type Error = Unused;
434450

435451
async fn handle_call(
436452
self,
@@ -441,12 +457,12 @@ mod tests {
441457
SomeTaskCallMsg::SlowOperation => {
442458
// Simulate a slow operation that will not resolve in time
443459
rt::sleep(TIMEOUT_DURATION * 2).await;
444-
CallResponse::Reply(self, ())
460+
CallResponse::Reply(self, Unused)
445461
}
446462
SomeTaskCallMsg::FastOperation => {
447463
// Simulate a fast operation that resolves in time
448464
rt::sleep(TIMEOUT_DURATION / 2).await;
449-
CallResponse::Reply(self, ())
465+
CallResponse::Reply(self, Unused)
450466
}
451467
}
452468
}
@@ -461,12 +477,59 @@ mod tests {
461477
let result = unresolving_task
462478
.call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION)
463479
.await;
464-
assert!(matches!(result, Ok(())));
480+
assert!(matches!(result, Ok(Unused)));
465481

466482
let result = unresolving_task
467483
.call_with_timeout(SomeTaskCallMsg::SlowOperation, TIMEOUT_DURATION)
468484
.await;
469485
assert!(matches!(result, Err(GenServerError::CallTimeout)));
470486
});
471487
}
488+
489+
#[derive(Clone)]
490+
struct SomeTaskThatFailsOnInit {
491+
sender_channel: Arc<Mutex<mpsc::Receiver<u8>>>,
492+
}
493+
494+
impl SomeTaskThatFailsOnInit {
495+
pub fn new(sender_channel: Arc<Mutex<mpsc::Receiver<u8>>>) -> Self {
496+
Self { sender_channel }
497+
}
498+
}
499+
500+
impl GenServer for SomeTaskThatFailsOnInit {
501+
type CallMsg = Unused;
502+
type CastMsg = Unused;
503+
type OutMsg = Unused;
504+
type Error = Unused;
505+
506+
async fn init(
507+
self,
508+
_handle: &GenServerHandle<Self>,
509+
) -> Result<InitResult<Self>, Self::Error> {
510+
// Simulate an initialization failure by returning NoSuccess
511+
Ok(NoSuccess(self))
512+
}
513+
514+
async fn teardown(self, _handle: &GenServerHandle<Self>) -> Result<(), Self::Error> {
515+
self.sender_channel.lock().unwrap().close();
516+
Ok(())
517+
}
518+
}
519+
520+
#[test]
521+
pub fn task_fails_with_intermediate_state() {
522+
let runtime = rt::Runtime::new().unwrap();
523+
runtime.block_on(async move {
524+
let (rx, tx) = mpsc::channel::<u8>();
525+
let sender_channel = Arc::new(Mutex::new(tx));
526+
let _task = SomeTaskThatFailsOnInit::new(sender_channel).start();
527+
528+
// Wait a while to ensure the task has time to run and fail
529+
rt::sleep(Duration::from_secs(1)).await;
530+
531+
// We assure that the teardown function has ran by checking that the receiver channel is closed
532+
assert!(rx.is_closed())
533+
});
534+
}
472535
}

concurrency/src/tasks/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ mod stream_tests;
1111
#[cfg(test)]
1212
mod timer_tests;
1313

14-
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
14+
pub use gen_server::{
15+
CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult,
16+
InitResult::NoSuccess, InitResult::Success,
17+
};
1518
pub use process::{send, Process, ProcessInfo};
1619
pub use stream::spawn_listener;
1720
pub use time::{send_after, send_interval};

concurrency/src/tasks/timer_tests.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::tasks::{send_interval, CallResponse, CastResponse, GenServer, GenServerHandle};
1+
use crate::tasks::{
2+
gen_server::InitResult, send_interval, CallResponse, CastResponse, GenServer, GenServerHandle,
3+
InitResult::Success,
4+
};
25
use spawned_rt::tasks::{self as rt, CancellationToken};
36
use std::time::Duration;
47

@@ -59,14 +62,14 @@ impl GenServer for Repeater {
5962
type OutMsg = RepeaterOutMessage;
6063
type Error = ();
6164

62-
async fn init(mut self, handle: &RepeaterHandle) -> Result<Self, Self::Error> {
65+
async fn init(mut self, handle: &RepeaterHandle) -> Result<InitResult<Self>, Self::Error> {
6366
let timer = send_interval(
6467
Duration::from_millis(100),
6568
handle.clone(),
6669
RepeaterCastMessage::Inc,
6770
);
6871
self.cancellation_token = Some(timer.cancellation_token);
69-
Ok(self)
72+
Ok(Success(self))
7073
}
7174

7275
async fn handle_call(

examples/bank/src/server.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use std::collections::HashMap;
22

33
use spawned_concurrency::{
44
messages::Unused,
5-
tasks::{CallResponse, GenServer, GenServerHandle},
5+
tasks::{
6+
CallResponse, GenServer, GenServerHandle,
7+
InitResult::{self, Success},
8+
},
69
};
710

811
use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as OutMessage};
@@ -60,9 +63,12 @@ impl GenServer for Bank {
6063
type Error = BankError;
6164

6265
// Initializing "main" account with 1000 in balance to test init() callback.
63-
async fn init(mut self, _handle: &GenServerHandle<Self>) -> Result<Self, Self::Error> {
66+
async fn init(
67+
mut self,
68+
_handle: &GenServerHandle<Self>,
69+
) -> Result<InitResult<Self>, Self::Error> {
6470
self.accounts.insert("main".to_string(), 1000);
65-
Ok(self)
71+
Ok(Success(self))
6672
}
6773

6874
async fn handle_call(

examples/updater/src/server.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use std::time::Duration;
22

33
use spawned_concurrency::{
44
messages::Unused,
5-
tasks::{send_interval, CastResponse, GenServer, GenServerHandle},
5+
tasks::{
6+
send_interval, CastResponse, GenServer, GenServerHandle,
7+
InitResult::{self, Success},
8+
},
69
};
710
use spawned_rt::tasks::CancellationToken;
811

@@ -34,10 +37,13 @@ impl GenServer for UpdaterServer {
3437
type Error = std::fmt::Error;
3538

3639
// Initializing GenServer to start periodic checks.
37-
async fn init(mut self, handle: &GenServerHandle<Self>) -> Result<Self, Self::Error> {
40+
async fn init(
41+
mut self,
42+
handle: &GenServerHandle<Self>,
43+
) -> Result<InitResult<Self>, Self::Error> {
3844
let timer = send_interval(self.periodicity, handle.clone(), InMessage::Check);
3945
self.timer_token = Some(timer.cancellation_token);
40-
Ok(self)
46+
Ok(Success(self))
4147
}
4248

4349
async fn handle_cast(

0 commit comments

Comments
 (0)