1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) { size_t cur_win_max_idx = _first_unassembled_index + _capacity - _output.buffer_size(); if (index >= cur_win_max_idx) return;
if (eof) { _eof = true; } size_t data_start_idx = max(index, _first_unassembled_index); size_t data_end_idx = min(cur_win_max_idx, index + data.size()); if (data_end_idx >= data_start_idx) { pair<size_t, size_t> cur_data_start_end_index = make_pair(data_start_idx, data_end_idx); string cur_data = data.substr(data_start_idx - index, data_end_idx - data_start_idx + 1); recv_bytes_t cur_recv_bytes = {cur_data_start_end_index, cur_data}; while (!this->empty()) { auto iter = _str_to_assemble.lower_bound(cur_data_start_end_index); bool cur_data_could_merge_right = (iter != _str_to_assemble.end()) && (cur_recv_bytes.first.second >= iter->first.first); if (cur_data_could_merge_right) { cur_recv_bytes = merge_two_unassembled_strs(cur_recv_bytes, *iter); _unassembled_bytes -= iter->second.size(); _str_to_assemble.erase(iter); iter = _str_to_assemble.lower_bound(cur_data_start_end_index); }
bool cur_data_could_merge_left = (iter != _str_to_assemble.begin()) && ((--iter)->first.second >= cur_recv_bytes.first.first); if (cur_data_could_merge_left) { cur_recv_bytes = merge_two_unassembled_strs(*iter, cur_recv_bytes); _unassembled_bytes -= iter->second.size(); _str_to_assemble.erase(iter); } if (!cur_data_could_merge_right && !cur_data_could_merge_left) { break; } } _str_to_assemble.insert(cur_recv_bytes); _unassembled_bytes += cur_recv_bytes.second.size(); }
auto iter = _str_to_assemble.begin(); if (!_str_to_assemble.empty() && (iter->first.first <= _first_unassembled_index)) { auto temp_map_head = *iter; _str_to_assemble.erase(iter); size_t written_len = _output.write(temp_map_head.second); _unassembled_bytes -= written_len; if (written_len == temp_map_head.second.size()) { _first_unassembled_index = temp_map_head.first.first + temp_map_head.second.size(); } else { size_t new_data_start_index = temp_map_head.first.first + written_len; _str_to_assemble.insert( {{new_data_start_index, temp_map_head.first.second}, temp_map_head.second.substr(written_len)}); _first_unassembled_index = new_data_start_index; } } if (empty() && _eof) { _output.end_input(); } }
StreamReassembler::recv_bytes_t StreamReassembler::merge_two_unassembled_strs( const StreamReassembler::recv_bytes_t &a, const StreamReassembler::recv_bytes_t &b) const { recv_bytes_t res; res.first.first = a.first.first; if (a.first.second > b.first.second) { res.first.second = a.first.second; res.second = a.second; } else { res.first.second = b.first.second; res.second = a.second.substr(0, b.first.first - a.first.first) + b.second; } return res; }
|