Skip to content

Commit 7bfd3ba

Browse files
dysnrotty
authored andcommitted
add asyncsrv zguide example
1 parent b1a7624 commit 7bfd3ba

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ path="examples/zguide/rtreq/main.rs"
147147
name="lbbroker"
148148
path="examples/zguide/lbbroker/main.rs"
149149

150+
[[example]]
151+
name="asyncsrv"
152+
path="examples/zguide/asyncsrv/main.rs"
153+
150154
[dependencies]
151155
libc = "0.2.15"
152156
log = "0.3.6"

examples/zguide/asyncsrv/main.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Asynchronous client-to-server (DEALER to ROUTER)
2+
//
3+
// While this example runs in a single process, that is to make
4+
// it easier to start and stop the example. Each task has its own
5+
// context and conceptually acts as a separate process.
6+
#![crate_name = "asyncsrv"]
7+
8+
extern crate zmq;
9+
extern crate rand;
10+
use rand::{thread_rng, Rng};
11+
use std::{str, thread};
12+
use std::time::Duration;
13+
14+
fn client_task() {
15+
let context = zmq::Context::new();
16+
let client = context.socket(zmq::DEALER).unwrap();
17+
let mut rng = thread_rng();
18+
let identity = format!("{:04X}-{:04X}", rng.gen::<u16>(), rng.gen::<u16>());
19+
client
20+
.set_identity(identity.as_bytes())
21+
.expect("failed setting client id");
22+
client
23+
.connect("tcp://localhost:5570")
24+
.expect("failed connecting client");
25+
let mut request_nbr = 0;
26+
loop {
27+
for _ in 0..100 {
28+
if client.poll(zmq::POLLIN, 10).expect("client failed polling") > 0 {
29+
let msg = client
30+
.recv_multipart(0)
31+
.expect("client failed receivng response");
32+
println!("{}", str::from_utf8(&msg[msg.len() - 1]).unwrap());
33+
}
34+
}
35+
request_nbr = request_nbr + 1;
36+
let request = format!("request #{}", request_nbr);
37+
client
38+
.send(&request, 0)
39+
.expect("client failed sending request");
40+
}
41+
}
42+
43+
fn server_task() {
44+
let context = zmq::Context::new();
45+
let frontend = context.socket(zmq::ROUTER).unwrap();
46+
frontend
47+
.bind("tcp://*:5570")
48+
.expect("server failed binding frontend");
49+
let backend = context.socket(zmq::DEALER).unwrap();
50+
backend
51+
.bind("inproc://backend")
52+
.expect("server failed binding backend");
53+
for _ in 0..5 {
54+
let ctx = context.clone();
55+
thread::spawn(move || server_worker(&ctx));
56+
}
57+
zmq::proxy(&frontend, &backend).expect("server failed proxying");
58+
}
59+
60+
fn server_worker(context: &zmq::Context) {
61+
let worker = context.socket(zmq::DEALER).unwrap();
62+
worker
63+
.connect("inproc://backend")
64+
.expect("worker failed to connect to backend");
65+
let mut rng = thread_rng();
66+
67+
loop {
68+
let identity = worker
69+
.recv_string(0)
70+
.expect("worker failed receiving identity")
71+
.unwrap();
72+
let message = worker
73+
.recv_string(0)
74+
.expect("worker failed receiving message")
75+
.unwrap();
76+
let replies = rng.gen_range(0, 4);
77+
for _ in 0..replies {
78+
thread::sleep(Duration::from_millis(rng.gen_range(0, 1000) + 1));
79+
worker
80+
.send(&identity, zmq::SNDMORE)
81+
.expect("worker failed sending identity");
82+
worker
83+
.send(&message, 0)
84+
.expect("worker failed sending message");
85+
}
86+
}
87+
}
88+
89+
fn main() {
90+
thread::spawn(|| client_task());
91+
thread::spawn(|| client_task());
92+
thread::spawn(|| client_task());
93+
thread::spawn(|| server_task());
94+
thread::sleep(Duration::from_secs(5));
95+
}

0 commit comments

Comments
 (0)