Skip to content

Commit ed8dce0

Browse files
jhwgh1968Homu
authored andcommitted
Minimal Implementation of TCP Selective Acknowledgement
Closes: #266 Approved by: whitequark
1 parent 556672f commit ed8dce0

File tree

2 files changed

+340
-7
lines changed

2 files changed

+340
-7
lines changed

src/socket/tcp.rs

Lines changed: 194 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,14 @@ pub struct TcpSocket<'a> {
234234
remote_win_len: usize,
235235
/// The receive window scaling factor for remotes which support RFC 1323, None if unsupported.
236236
remote_win_scale: Option<u8>,
237+
/// Whether or not the remote supports selective ACK as described in RFC 2018.
238+
remote_has_sack: bool,
237239
/// The maximum number of data octets that the remote side may receive.
238240
remote_mss: usize,
239241
/// The timestamp of the last packet received.
240242
remote_last_ts: Option<Instant>,
243+
/// The sequence number of the last packet recived, used for sACK
244+
local_rx_last_seq: Option<TcpSeqNumber>,
241245
/// The ACK number of the last packet recived.
242246
local_rx_last_ack: Option<TcpSeqNumber>,
243247
/// The number of packets recived directly after
@@ -286,9 +290,11 @@ impl<'a> TcpSocket<'a> {
286290
remote_win_len: 0,
287291
remote_win_shift: rx_cap_log2.saturating_sub(16) as u8,
288292
remote_win_scale: None,
293+
remote_has_sack: false,
289294
remote_mss: DEFAULT_MSS,
290295
remote_last_ts: None,
291296
local_rx_last_ack: None,
297+
local_rx_last_seq: None,
292298
local_rx_dup_acks: 0,
293299
}
294300
}
@@ -800,6 +806,8 @@ impl<'a> TcpSocket<'a> {
800806
window_len: 0,
801807
window_scale: None,
802808
max_seg_size: None,
809+
sack_permitted: false,
810+
sack_ranges: [None, None, None],
803811
payload: &[]
804812
};
805813
let ip_reply_repr = IpRepr::Unspecified {
@@ -829,7 +837,7 @@ impl<'a> TcpSocket<'a> {
829837
}
830838

831839
fn ack_reply(&mut self, ip_repr: &IpRepr, repr: &TcpRepr) -> (IpRepr, TcpRepr<'static>) {
832-
let (ip_reply_repr, mut reply_repr) = Self::reply(ip_repr, repr);
840+
let (mut ip_reply_repr, mut reply_repr) = Self::reply(ip_repr, repr);
833841

834842
// From RFC 793:
835843
// [...] an empty acknowledgment segment containing the current send-sequence number
@@ -844,6 +852,42 @@ impl<'a> TcpSocket<'a> {
844852
reply_repr.window_len = self.scaled_window();
845853
self.remote_last_win = reply_repr.window_len;
846854

855+
// If the remote supports selective acknowledgement, add the option to the outgoing
856+
// segment.
857+
if self.remote_has_sack {
858+
net_debug!("sending sACK option with current assembler ranges");
859+
860+
// RFC 2018: The first SACK block (i.e., the one immediately following the kind and
861+
// length fields in the option) MUST specify the contiguous block of data containing
862+
// the segment which triggered this ACK, unless that segment advanced the
863+
// Acknowledgment Number field in the header.
864+
reply_repr.sack_ranges[0] = None;
865+
866+
if let Some(last_seg_seq) = self.local_rx_last_seq.map(|s| s.0 as u32) {
867+
reply_repr.sack_ranges[0] = self.assembler.iter_data(
868+
reply_repr.ack_number.map(|s| s.0 as usize).unwrap_or(0))
869+
.map(|(left, right)| (left as u32, right as u32))
870+
.skip_while(|(left, right)| *left > last_seg_seq || *right < last_seg_seq)
871+
.next();
872+
}
873+
874+
if reply_repr.sack_ranges[0].is_none() {
875+
// The matching segment was removed from the assembler, meaning the acknowledgement
876+
// number has advanced, or there was no previous sACK.
877+
//
878+
// While the RFC says we SHOULD keep a list of reported sACK ranges, and iterate
879+
// through those, that is currently infeasable. Instead, we offer the range with
880+
// the lowest sequence number (if one exists) to hint at what segments would
881+
// most quickly advance the acknowledgement number.
882+
reply_repr.sack_ranges[0] = self.assembler.iter_data(
883+
reply_repr.ack_number.map(|s| s.0 as usize).unwrap_or(0))
884+
.map(|(left, right)| (left as u32, right as u32))
885+
.next();
886+
}
887+
}
888+
889+
// Since the sACK option may have changed the length of the payload, update that.
890+
ip_reply_repr.set_payload_len(reply_repr.buffer_len());
847891
(ip_reply_repr, reply_repr)
848892
}
849893

@@ -975,6 +1019,7 @@ impl<'a> TcpSocket<'a> {
9751019
if segment_in_window {
9761020
// We've checked that segment_start >= window_start above.
9771021
payload_offset = (segment_start - window_start) as usize;
1022+
self.local_rx_last_seq = Some(repr.seq_number);
9781023
} else {
9791024
// If we're in the TIME-WAIT state, restart the TIME-WAIT timeout, since
9801025
// the remote end may not have realized we've closed the connection.
@@ -1056,6 +1101,7 @@ impl<'a> TcpSocket<'a> {
10561101
self.local_seq_no = TcpSeqNumber(-repr.seq_number.0);
10571102
self.remote_seq_no = repr.seq_number + 1;
10581103
self.remote_last_seq = self.local_seq_no;
1104+
self.remote_has_sack = repr.sack_permitted;
10591105
if let Some(max_seg_size) = repr.max_seg_size {
10601106
self.remote_mss = max_seg_size as usize
10611107
}
@@ -1422,6 +1468,8 @@ impl<'a> TcpSocket<'a> {
14221468
window_len: self.scaled_window(),
14231469
window_scale: None,
14241470
max_seg_size: None,
1471+
sack_permitted: false,
1472+
sack_ranges: [None, None, None],
14251473
payload: &[]
14261474
};
14271475

@@ -1442,7 +1490,9 @@ impl<'a> TcpSocket<'a> {
14421490
if self.state == State::SynSent {
14431491
repr.ack_number = None;
14441492
repr.window_scale = Some(self.remote_win_shift);
1493+
repr.sack_permitted = true;
14451494
} else {
1495+
repr.sack_permitted = self.remote_has_sack;
14461496
repr.window_scale = self.remote_win_scale.map(
14471497
|_| self.remote_win_shift);
14481498
}
@@ -1641,6 +1691,8 @@ mod test {
16411691
seq_number: TcpSeqNumber(0), ack_number: Some(TcpSeqNumber(0)),
16421692
window_len: 256, window_scale: None,
16431693
max_seg_size: None,
1694+
sack_permitted: false,
1695+
sack_ranges: [None, None, None],
16441696
payload: &[]
16451697
};
16461698
const _RECV_IP_TEMPL: IpRepr = IpRepr::Unspecified {
@@ -1654,6 +1706,8 @@ mod test {
16541706
seq_number: TcpSeqNumber(0), ack_number: Some(TcpSeqNumber(0)),
16551707
window_len: 64, window_scale: None,
16561708
max_seg_size: None,
1709+
sack_permitted: false,
1710+
sack_ranges: [None, None, None],
16571711
payload: &[]
16581712
};
16591713

@@ -1795,8 +1849,11 @@ mod test {
17951849
TcpSocket::new(rx_buffer, tx_buffer)
17961850
}
17971851

1798-
fn socket_syn_received() -> TcpSocket<'static> {
1799-
let mut s = socket();
1852+
fn socket_syn_received_with_buffer_sizes(
1853+
tx_len: usize,
1854+
rx_len: usize
1855+
) -> TcpSocket<'static> {
1856+
let mut s = socket_with_buffer_sizes(tx_len, rx_len);
18001857
s.state = State::SynReceived;
18011858
s.local_endpoint = LOCAL_END;
18021859
s.remote_endpoint = REMOTE_END;
@@ -1807,6 +1864,10 @@ mod test {
18071864
s
18081865
}
18091866

1867+
fn socket_syn_received() -> TcpSocket<'static> {
1868+
socket_syn_received_with_buffer_sizes(64, 64)
1869+
}
1870+
18101871
fn socket_syn_sent() -> TcpSocket<'static> {
18111872
let mut s = socket();
18121873
s.state = State::SynSent;
@@ -1817,8 +1878,8 @@ mod test {
18171878
s
18181879
}
18191880

1820-
fn socket_established() -> TcpSocket<'static> {
1821-
let mut s = socket_syn_received();
1881+
fn socket_established_with_buffer_sizes(tx_len: usize, rx_len: usize) -> TcpSocket<'static> {
1882+
let mut s = socket_syn_received_with_buffer_sizes(tx_len, rx_len);
18221883
s.state = State::Established;
18231884
s.local_seq_no = LOCAL_SEQ + 1;
18241885
s.remote_last_seq = LOCAL_SEQ + 1;
@@ -1827,6 +1888,10 @@ mod test {
18271888
s
18281889
}
18291890

1891+
fn socket_established() -> TcpSocket<'static> {
1892+
socket_established_with_buffer_sizes(64, 64)
1893+
}
1894+
18301895
fn socket_fin_wait_1() -> TcpSocket<'static> {
18311896
let mut s = socket_established();
18321897
s.state = State::FinWait1;
@@ -1936,6 +2001,44 @@ mod test {
19362001
s
19372002
}
19382003

2004+
#[test]
2005+
fn test_listen_sack_option() {
2006+
let mut s = socket_listen();
2007+
send!(s, TcpRepr {
2008+
control: TcpControl::Syn,
2009+
seq_number: REMOTE_SEQ,
2010+
ack_number: None,
2011+
sack_permitted: false,
2012+
..SEND_TEMPL
2013+
});
2014+
assert!(!s.remote_has_sack);
2015+
recv!(s, [TcpRepr {
2016+
control: TcpControl::Syn,
2017+
seq_number: LOCAL_SEQ,
2018+
ack_number: Some(REMOTE_SEQ + 1),
2019+
max_seg_size: Some(BASE_MSS),
2020+
..RECV_TEMPL
2021+
}]);
2022+
2023+
let mut s = socket_listen();
2024+
send!(s, TcpRepr {
2025+
control: TcpControl::Syn,
2026+
seq_number: REMOTE_SEQ,
2027+
ack_number: None,
2028+
sack_permitted: true,
2029+
..SEND_TEMPL
2030+
});
2031+
assert!(s.remote_has_sack);
2032+
recv!(s, [TcpRepr {
2033+
control: TcpControl::Syn,
2034+
seq_number: LOCAL_SEQ,
2035+
ack_number: Some(REMOTE_SEQ + 1),
2036+
max_seg_size: Some(BASE_MSS),
2037+
sack_permitted: true,
2038+
..RECV_TEMPL
2039+
}]);
2040+
}
2041+
19392042
#[test]
19402043
fn test_listen_syn_win_scale_buffers() {
19412044
for (buffer_size, shift_amt) in &[
@@ -2213,6 +2316,7 @@ mod test {
22132316
ack_number: None,
22142317
max_seg_size: Some(BASE_MSS),
22152318
window_scale: Some(0),
2319+
sack_permitted: true,
22162320
..RECV_TEMPL
22172321
}]);
22182322
send!(s, TcpRepr {
@@ -2270,6 +2374,7 @@ mod test {
22702374
ack_number: None,
22712375
max_seg_size: Some(BASE_MSS),
22722376
window_scale: Some(0),
2377+
sack_permitted: true,
22732378
..RECV_TEMPL
22742379
}]);
22752380
send!(s, TcpRepr {
@@ -2358,6 +2463,7 @@ mod test {
23582463
max_seg_size: Some(BASE_MSS),
23592464
window_scale: Some(*shift_amt),
23602465
window_len: cmp::min(*buffer_size >> *shift_amt, 65535) as u16,
2466+
sack_permitted: true,
23612467
..RECV_TEMPL
23622468
}]);
23632469
}
@@ -2385,6 +2491,88 @@ mod test {
23852491
assert_eq!(s.rx_buffer.dequeue_many(6), &b"abcdef"[..]);
23862492
}
23872493

2494+
fn setup_rfc2018_cases() -> (TcpSocket<'static>, Vec<u8>) {
2495+
// This is a utility function used by the tests for RFC 2018 cases. It configures a socket
2496+
// in a particular way suitable for those cases.
2497+
//
2498+
// RFC 2018: Assume the left window edge is 5000 and that the data transmitter sends [...]
2499+
// segments, each containing 500 data bytes.
2500+
let mut s = socket_established_with_buffer_sizes(4000, 4000);
2501+
s.remote_has_sack = true;
2502+
2503+
// create a segment that is 500 bytes long
2504+
let mut segment: Vec<u8> = Vec::with_capacity(500);
2505+
2506+
// move the last ack to 5000 by sending ten of them
2507+
for _ in 0..50 { segment.extend_from_slice(b"abcdefghij") }
2508+
for offset in (0..5000).step_by(500) {
2509+
send!(s, TcpRepr {
2510+
seq_number: REMOTE_SEQ + 1 + offset,
2511+
ack_number: Some(LOCAL_SEQ + 1),
2512+
payload: &segment,
2513+
..SEND_TEMPL
2514+
});
2515+
recv!(s, [TcpRepr {
2516+
seq_number: LOCAL_SEQ + 1,
2517+
ack_number: Some(REMOTE_SEQ + 1 + offset + 500),
2518+
window_len: 3500,
2519+
..RECV_TEMPL
2520+
}]);
2521+
s.recv(|data| {
2522+
assert_eq!(data.len(), 500);
2523+
assert_eq!(data, segment.as_slice());
2524+
(500, ())
2525+
}).unwrap();
2526+
}
2527+
assert_eq!(s.remote_last_win, 3500);
2528+
(s, segment)
2529+
}
2530+
2531+
#[test]
2532+
fn test_established_rfc2018_cases() {
2533+
// This test case verifies the exact scenarios described on pages 8-9 of RFC 2018. Please
2534+
// ensure its behavior does not deviate from those scenarios.
2535+
2536+
let (mut s, segment) = setup_rfc2018_cases();
2537+
// RFC 2018:
2538+
//
2539+
// Case 2: The first segment is dropped but the remaining 7 are received.
2540+
//
2541+
// Upon receiving each of the last seven packets, the data receiver will return a TCP ACK
2542+
// segment that acknowledges sequence number 5000 and contains a SACK option specifying one
2543+
// block of queued data:
2544+
//
2545+
// Triggering ACK Left Edge Right Edge
2546+
// Segment
2547+
//
2548+
// 5000 (lost)
2549+
// 5500 5000 5500 6000
2550+
// 6000 5000 5500 6500
2551+
// 6500 5000 5500 7000
2552+
// 7000 5000 5500 7500
2553+
// 7500 5000 5500 8000
2554+
// 8000 5000 5500 8500
2555+
// 8500 5000 5500 9000
2556+
//
2557+
for offset in (500..3500).step_by(500) {
2558+
send!(s, TcpRepr {
2559+
seq_number: REMOTE_SEQ + 1 + offset + 5000,
2560+
ack_number: Some(LOCAL_SEQ + 1),
2561+
payload: &segment,
2562+
..SEND_TEMPL
2563+
}, Ok(Some(TcpRepr {
2564+
seq_number: LOCAL_SEQ + 1,
2565+
ack_number: Some(REMOTE_SEQ + 1 + 5000),
2566+
window_len: 4000,
2567+
sack_ranges: [
2568+
Some((REMOTE_SEQ.0 as u32 + 1 + 5500,
2569+
REMOTE_SEQ.0 as u32 + 1 + 5500 + offset as u32)),
2570+
None, None],
2571+
..RECV_TEMPL
2572+
})));
2573+
}
2574+
}
2575+
23882576
#[test]
23892577
fn test_established_sliding_window_recv() {
23902578
let mut s = socket_established();
@@ -3987,6 +4175,7 @@ mod test {
39874175
ack_number: None,
39884176
max_seg_size: Some(BASE_MSS),
39894177
window_scale: Some(0),
4178+
sack_permitted: true,
39904179
..RECV_TEMPL
39914180
}));
39924181
assert_eq!(s.state, State::SynSent);

0 commit comments

Comments
 (0)