CS144-Lab3实验笔记

实验简介

Lab3这个实验主要完成的是TCP发送端的功能。应用程序写入字节流到socket,TCP Sender负责将本端的字节流发送到对端。

具体地说,TCP Sender负责:

  • 根据对端的“接受窗口大小 + 期望收到的下一个字节”,来构造要发送的TCP报文头部和数据。

  • 如果超出规定的时间后仍未收到一个已发送报文的应答,则需要重传这个报文。

要实现的功能有:1. 如何构造要发送的TCP头部和数据;2. 处理对端对于发送报文的响应;3. 重传没有收到响应的报文

实验思路

1. 构造要发送的TCP头部和数据

1.1 想发送数据需要先建立连接(也就是三次握手后才能发送数据)

RFC 793定义的TCP不允许第一个SYN包携带数据,CS144按此实现。

但TCP Fast Open是允许第一个SYN包携带数据的。
参考TCP Fast Open: expediting web services

1.2 建立连接后按照对端窗口大小减去已发送未被应答的部分,尽可能地用ByteStream内的数据填充TCP DataPayload。报文的序列号应该设置为“报文第一个字节的相对序列号”

1.3 如果本端ByteStream读到了EOF,并且本TCP报文可以再多携带一个字节,则发送FIN报文。

1.4 发送新建的报文时,记录这个报文以便重传,直到收到应答;更新下一次会发送的ByteStream用到的绝对序列号。

1.5 第一个SYN报文,假定对方窗口为1。

2. 对端对于发送报文的响应

对端响应的报文有两个字段需要发送方关心,

  • ackno 表示对端期望收到下一个数据包的序列号
  • win_size 表示对端期望接受数据包的最大长度

收到ackno 后先转换为基于isn的绝对序列号,如果新的绝对序列号小于本端的绝对序列号,说明收到了重复的确认,直接返回即可。

否则认为所有绝对序列号在新序列号之前的那些报文都已经成功地被对端接受了。

3. 重传没有收到响应的报文

这部分主要参考RFC6298 Section 5的算法实现,按照这个算法,在各个函数里面加一些逻辑即可~

这个RFC主要规定了TCP如何度量RTO(retransmission timeout),以及在经过一个RTO时间后如何进行重传的算法。

本实验主要关心如何实现重传算法,这部分内容在Section 5中描述。

下面给出大致翻译

5.管理RTO计时器

对于一个重传计时器,必须满足不能过早地重传这个要求,也就是不能在小于一个RTO时间内进行重传。
以下是管理重传计时器的推荐算法:

5.1 每次发送包含数据的数据包时(这里的发送包括重传),如果计时器没有启动,则启动计时器。这样计时器才有可能在一个RTO时间后过期(指的是当前的RTO)

5.2 如果收到了所有“已发送但未被确认”数据的ACK报文后,关闭重传计时器

5.3 每次收到对新数据的ACK报文时,重启重传计时器,使其可以在一个RTO后过期(指的是当前的RTO)

当重传计时器过期后,执行如下操作
5.4 重传最早的”已发送但未被确认”的TCP段

5.5 主机(注,这里指TCP发送端)必须将新的RTO设置为当前RTO值的两倍。(RTO的上界在本RFC 2.5节中讨论了)

5.6 启动重传计时器,使其可以在一个RTO后过期(指的是当前的RTO)

5.7 如果在等待SYN-ACK这个TCP报文时候,重传计时器过期了,并且这个计时器的RTO小于3秒,则当数据传输开始的时候(即,三次握手完成后),RTO必须重新初始化为3秒

实现

计时器的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class RetransTimer {
private:
uint32_t _remaining_time{0};
bool _is_stopped{false};
bool _is_expired{false};

public:
RetransTimer() : _remaining_time(0), _is_stopped(true), _is_expired(false) {}
RetransTimer(uint32_t init_time, bool stopped, bool expired)
: _remaining_time(init_time), _is_stopped(stopped), _is_expired(expired) {}
void tick_to_retrans_timer(uint32_t ms_since_last_tick) {
if (_is_stopped) {
return;
}
if (ms_since_last_tick >= _remaining_time) {
_remaining_time = 0;
_is_expired = true;
} else {
_remaining_time -= ms_since_last_tick;
}
}
bool is_expired() const { return _is_expired; }
bool is_stopped() const { return _is_stopped; }
void start_new_timer(uint32_t new_rto) {
_remaining_time = new_rto;
_is_expired = false;
_is_stopped = false;
}

void stop_retrans_timer() {
_is_stopped = true;
_is_expired = false;
_remaining_time = 0;
}
};

类内新增成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class TCPSender {
private:
//! Some TCP state flags send to the other side,
bool _syn_sent{false};
//! When FIN is sent, it means that the data stream is closed on its own side, `fill_window` will return directly
bool _fin_sent{false};
//! the receive windows size, from the other side
uint16_t _win_size{1};

//! Once the segment is filled the window(using the data payload), it will be sent to the other side
//! In this lab `send_segments` means move the segment to `_segments_out` FIFO and `_outstanding_segments` map
void send_segment(TCPSegment &seg);
//! keep track of segments which have been sent but not yet acked by the receiver
//!@{
// first-> the absolute sequence number, it will be mono increased
// second-> the outstanding tcp segment
std::vector<std::pair<size_t, TCPSegment>> _outstanding_segments{};
size_t _outstanding_bytes{0};
// !@}

//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;

//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};

//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
RetransTimer _retrans_timer{};
//! current retransmission timeout value, aka RTO
uint64_t _current_retransmission_timeout{0};
unsigned int _consecutive_retransmission_cnt{0};
...
}

方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
, _initial_retransmission_timeout{retx_timeout}
, _stream(capacity) {
_current_retransmission_timeout = _initial_retransmission_timeout;
}

uint64_t TCPSender::bytes_in_flight() const { return _outstanding_bytes; }

//! \details set the segment header and payload, fill the other side receive window size as much as possible
void TCPSender::fill_window() {
size_t receiver_win_size = _win_size ? _win_size : 1;
while ((receiver_win_size > _outstanding_bytes) && !_fin_sent) {
TCPSegment seg;
if (!_syn_sent) {
seg.header().syn = true;
_syn_sent = true;
}
seg.header().seqno = next_seqno();

// the max bytes could this segment carried
size_t max_payload_size =
min(TCPConfig::MAX_PAYLOAD_SIZE, receiver_win_size - _outstanding_bytes - seg.header().syn);
string payload = _stream.read(max_payload_size);
seg.payload() = Buffer(std::move(payload));
// send FIN flag if reached EOF of stream
if (!_fin_sent && _stream.eof() && seg.length_in_sequence_space() + _outstanding_bytes < receiver_win_size) {
seg.header().fin = true;
_fin_sent = true;
}
if (seg.length_in_sequence_space()) {
send_segment(seg);
} else {
break;
}
}
}
//! The segment here is NOT EMPTY (non zero length in sequence space)
void TCPSender::send_segment(TCPSegment &seg) {
_segments_out.push(seg);
_outstanding_segments.emplace_back(_next_seqno, seg);
const auto seg_length = seg.length_in_sequence_space();
_next_seqno += seg_length;
_outstanding_bytes += seg_length;

if (_retrans_timer.is_stopped()) {
_retrans_timer.start_new_timer(_current_retransmission_timeout);
}
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
//! `absolute_ackno` is the number of bytes that the receiver received.
//! `_next_seqno` is the number of bytes that the sender wants to send, i.e. the last `absolute-seqno`
size_t absolute_ackno = unwrap(ackno, _isn, _next_seqno);
if (absolute_ackno > _next_seqno) {
return;
}
_win_size = window_size;
//! Remove segments that have now been fully acknoledged segment in `_outstanding_segment`
auto iter = _outstanding_segments.begin();
bool acked_new_data = false;
while (!_outstanding_segments.empty()) {
const auto &seg = iter->second;
const auto seg_length = seg.length_in_sequence_space();
if (iter->first + seg_length <= absolute_ackno) {
// erase returns the iterator following the last removed element.
iter = _outstanding_segments.erase(iter);
_outstanding_bytes -= seg_length;
acked_new_data = true;
} else {
break;
}
}
//! When received a valid ackno, which means the receiver receipt of the new data
//! the retransmission timer will restart if there are outstanding segments (for the current value of RTO).,
//! otherwise the timer will stop
if (acked_new_data) {
_current_retransmission_timeout = _initial_retransmission_timeout;
if (!_outstanding_segments.empty()) {
_retrans_timer.start_new_timer(_current_retransmission_timeout);
} else {
_retrans_timer.stop_retrans_timer();
}
_consecutive_retransmission_cnt = 0;
}
fill_window();
return;
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_retrans_timer.tick_to_retrans_timer(ms_since_last_tick);
// If the retrans_timer is expired, it will retransmit the earliest segment when the window size is not zero
// then double the RTO, restart a new timer.
if (_retrans_timer.is_expired() && !_outstanding_segments.empty()) {
auto iter = _outstanding_segments.begin();
if (_win_size > 0) {
_current_retransmission_timeout <<= 1;
_consecutive_retransmission_cnt++;
}
if (_consecutive_retransmission_cnt <= TCPConfig::MAX_RETX_ATTEMPTS) {
_segments_out.push(iter->second);
}
_retrans_timer.start_new_timer(_current_retransmission_timeout);
}
}

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmission_cnt; }

//! \details The segment with zero data and correct `seqno` is useful for `ACK` the other side.
// it will never be retransmitted, and doesn't need to keep track.
void TCPSender::send_empty_segment() {
TCPSegment segment;
segment.header().seqno = next_seqno();
_segments_out.push(segment);
return;
}

总结

  1. 独立的重传计时器的确逻辑清晰很多,第一次实现的时候,发送报文的逻辑和计时器的逻辑混在一次,把我绕晕了。。。
  2. 重传计时器默认应该是关闭的,我第一次实现时候

CS144-Lab3实验笔记
https://gwzlchn.github.io/202205/cs144-lab3/
作者
Zelin Wang
发布于
2022年5月18日
更新于
2022年10月23日
许可协议