Skip to content

Commit 797fae6

Browse files
b5dignifiedquire
andauthored
docs(iroh): add screening-connection example (#3360)
## Description Adds an example showing how to interrupt conenctions with the `on_connecting` method in the protocol handler trait. ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist <!-- Remove any that are not relevant. --> - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. - [ ] List all breaking changes in the above "Breaking Changes" section. - [ ] Open an issue or PR on any number0 repos that are affected by this breaking change. Give guidance on how the updates should be handled or do the actual updates themselves. The major ones are: - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc) - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip) - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs) - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe) - [ ] [`sendme`](https://github.com/n0-computer/sendme) --------- Co-authored-by: dignifiedquire <[email protected]>
1 parent d815cae commit 797fae6

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
//! Very basic example to showcase how to write a protocol that rejects new
2+
//! connections based on internal state. Useful when you want an endpoint to
3+
//! stop accepting new connections for some reason only known to the node. Maybe
4+
//! it's doing a migration, starting up, in a "maintenance mode", or serving
5+
//! too many connections.
6+
//!
7+
//! ## Usage
8+
//!
9+
//! cargo run --example screening-connection --features=examples
10+
use std::sync::{
11+
Arc,
12+
atomic::{AtomicU64, Ordering},
13+
};
14+
15+
use iroh::{
16+
Endpoint, NodeAddr,
17+
endpoint::{Connecting, Connection},
18+
protocol::{AcceptError, ProtocolHandler, Router},
19+
};
20+
use n0_snafu::{Result, ResultExt};
21+
22+
/// Each protocol is identified by its ALPN string.
23+
///
24+
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
25+
/// and the connection is aborted unless both nodes pass the same bytestring.
26+
const ALPN: &[u8] = b"iroh-example/screening-connection/0";
27+
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
let router = start_accept_side().await?;
31+
// Wait for the endpoint to be reachable
32+
router.endpoint().online().await;
33+
let node_addr = router.endpoint().node_addr();
34+
35+
// call connect three times. connection index 1 will be an odd number, and rejected.
36+
connect_side(&node_addr).await?;
37+
if let Err(err) = connect_side(&node_addr).await {
38+
println!("Error connecting: {}", err);
39+
}
40+
connect_side(&node_addr).await?;
41+
42+
// This makes sure the endpoint in the router is closed properly and connections close gracefully
43+
router.shutdown().await.e()?;
44+
45+
Ok(())
46+
}
47+
48+
async fn connect_side(addr: &NodeAddr) -> Result<()> {
49+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
50+
51+
// Open a connection to the accepting node
52+
let conn = endpoint.connect(addr.clone(), ALPN).await?;
53+
54+
// Open a bidirectional QUIC stream
55+
let (mut send, mut recv) = conn.open_bi().await.e()?;
56+
57+
// Send some data to be echoed
58+
send.write_all(b"Hello, world!").await.e()?;
59+
60+
// Signal the end of data for this particular stream
61+
send.finish().e()?;
62+
63+
// Receive the echo, but limit reading up to maximum 1000 bytes
64+
let response = recv.read_to_end(1000).await.e()?;
65+
assert_eq!(&response, b"Hello, world!");
66+
67+
// Explicitly close the whole connection.
68+
conn.close(0u32.into(), b"bye!");
69+
70+
// The above call only queues a close message to be sent (see how it's not async!).
71+
// We need to actually call this to make sure this message is sent out.
72+
endpoint.close().await;
73+
// If we don't call this, but continue using the endpoint, we then the queued
74+
// close call will eventually be picked up and sent.
75+
// But always try to wait for endpoint.close().await to go through before dropping
76+
// the endpoint to ensure any queued messages are sent through and connections are
77+
// closed gracefully.
78+
Ok(())
79+
}
80+
81+
async fn start_accept_side() -> Result<Router> {
82+
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
83+
84+
let echo = ScreenedEcho {
85+
conn_attempt_count: Arc::new(AtomicU64::new(0)),
86+
};
87+
88+
// Build our protocol handler and add our protocol, identified by its ALPN, and spawn the node.
89+
let router = Router::builder(endpoint).accept(ALPN, echo).spawn();
90+
91+
Ok(router)
92+
}
93+
94+
/// This is the same as the echo example, but keeps an internal count of the
95+
/// number of connections that have been attempted. This is to demonstrate how
96+
/// to plumb state into the protocol handler
97+
#[derive(Debug, Clone)]
98+
struct ScreenedEcho {
99+
conn_attempt_count: Arc<AtomicU64>,
100+
}
101+
102+
impl ProtocolHandler for ScreenedEcho {
103+
/// `on_connecting` allows us to intercept a connection as it's being formed,
104+
/// which is the right place to cut off a connection as early as possible.
105+
/// This is an optional method on the ProtocolHandler trait.
106+
async fn on_connecting(&self, connecting: Connecting) -> Result<Connection, AcceptError> {
107+
self.conn_attempt_count.fetch_add(1, Ordering::Relaxed);
108+
let count = self.conn_attempt_count.load(Ordering::Relaxed);
109+
110+
// reject every other connection
111+
if count % 2 == 0 {
112+
println!("rejecting connection");
113+
return Err(AcceptError::NotAllowed {});
114+
}
115+
116+
// To allow normal connection construction, await the connecting future & return
117+
let conn = connecting.await?;
118+
Ok(conn)
119+
}
120+
121+
/// The `accept` method is called for each incoming connection for our ALPN.
122+
/// This is the primary place to kick off work in response to a new connection.
123+
///
124+
/// The returned future runs on a newly spawned tokio task, so it can run as long as
125+
/// the connection lasts.
126+
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
127+
// We can get the remote's node id from the connection.
128+
let node_id = connection.remote_node_id()?;
129+
println!("accepted connection from {node_id}");
130+
131+
// Our protocol is a simple request-response protocol, so we expect the
132+
// connecting peer to open a single bi-directional stream.
133+
let (mut send, mut recv) = connection.accept_bi().await?;
134+
135+
// Echo any bytes received back directly.
136+
// This will keep copying until the sender signals the end of data on the stream.
137+
let bytes_sent = tokio::io::copy(&mut recv, &mut send).await?;
138+
println!("Copied over {bytes_sent} byte(s)");
139+
140+
// By calling `finish` on the send stream we signal that we will not send anything
141+
// further, which makes the receive stream on the other end terminate.
142+
send.finish()?;
143+
144+
// Wait until the remote closes the connection, which it does once it
145+
// received the response.
146+
connection.closed().await;
147+
148+
Ok(())
149+
}
150+
}

0 commit comments

Comments
 (0)