1 | #include <IO/MemoryReadWriteBuffer.h> |
2 | #include <common/likely.h> |
3 | #include <boost/noncopyable.hpp> |
4 | |
5 | |
6 | namespace DB |
7 | { |
8 | |
9 | namespace ErrorCodes |
10 | { |
11 | extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED; |
12 | } |
13 | |
14 | |
15 | class ReadBufferFromMemoryWriteBuffer : public ReadBuffer, boost::noncopyable, private Allocator<false> |
16 | { |
17 | public: |
18 | explicit ReadBufferFromMemoryWriteBuffer(MemoryWriteBuffer && origin) |
19 | : ReadBuffer(nullptr, 0), |
20 | chunk_list(std::move(origin.chunk_list)), |
21 | end_pos(origin.position()) |
22 | { |
23 | chunk_head = chunk_list.begin(); |
24 | setChunk(); |
25 | } |
26 | |
27 | bool nextImpl() override |
28 | { |
29 | if (chunk_head == chunk_list.end()) |
30 | return false; |
31 | |
32 | ++chunk_head; |
33 | return setChunk(); |
34 | } |
35 | |
36 | ~ReadBufferFromMemoryWriteBuffer() override |
37 | { |
38 | for (const auto & range : chunk_list) |
39 | free(range.begin(), range.size()); |
40 | } |
41 | |
42 | private: |
43 | |
44 | /// update buffers and position according to chunk_head pointer |
45 | bool setChunk() |
46 | { |
47 | if (chunk_head != chunk_list.end()) |
48 | { |
49 | internalBuffer() = *chunk_head; |
50 | |
51 | /// It is last chunk, it should be truncated |
52 | if (std::next(chunk_head) != chunk_list.end()) |
53 | buffer() = internalBuffer(); |
54 | else |
55 | buffer() = Buffer(internalBuffer().begin(), end_pos); |
56 | |
57 | position() = buffer().begin(); |
58 | } |
59 | else |
60 | { |
61 | buffer() = internalBuffer() = Buffer(nullptr, nullptr); |
62 | position() = nullptr; |
63 | } |
64 | |
65 | return buffer().size() != 0; |
66 | } |
67 | |
68 | using Container = std::forward_list<BufferBase::Buffer>; |
69 | |
70 | Container chunk_list; |
71 | Container::iterator chunk_head; |
72 | Position end_pos; |
73 | }; |
74 | |
75 | |
76 | MemoryWriteBuffer::MemoryWriteBuffer(size_t max_total_size_, size_t initial_chunk_size_, double growth_rate_, size_t max_chunk_size_) |
77 | : WriteBuffer(nullptr, 0), |
78 | max_total_size(max_total_size_), |
79 | initial_chunk_size(initial_chunk_size_), |
80 | max_chunk_size(max_chunk_size_), |
81 | growth_rate(growth_rate_) |
82 | { |
83 | addChunk(); |
84 | } |
85 | |
86 | |
87 | void MemoryWriteBuffer::nextImpl() |
88 | { |
89 | if (unlikely(hasPendingData())) |
90 | { |
91 | /// ignore flush |
92 | buffer() = Buffer(pos, buffer().end()); |
93 | return; |
94 | } |
95 | |
96 | addChunk(); |
97 | } |
98 | |
99 | |
100 | void MemoryWriteBuffer::addChunk() |
101 | { |
102 | size_t next_chunk_size; |
103 | if (chunk_list.empty()) |
104 | { |
105 | chunk_tail = chunk_list.before_begin(); |
106 | next_chunk_size = initial_chunk_size; |
107 | } |
108 | else |
109 | { |
110 | next_chunk_size = std::max(static_cast<size_t>(1), static_cast<size_t>(chunk_tail->size() * growth_rate)); |
111 | next_chunk_size = std::min(next_chunk_size, max_chunk_size); |
112 | } |
113 | |
114 | if (max_total_size) |
115 | { |
116 | if (total_chunks_size + next_chunk_size > max_total_size) |
117 | next_chunk_size = max_total_size - total_chunks_size; |
118 | |
119 | if (0 == next_chunk_size) |
120 | { |
121 | set(position(), 0); |
122 | throw Exception("MemoryWriteBuffer limit is exhausted" , ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED); |
123 | } |
124 | } |
125 | |
126 | Position begin = reinterpret_cast<Position>(alloc(next_chunk_size)); |
127 | chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size); |
128 | total_chunks_size += next_chunk_size; |
129 | |
130 | set(chunk_tail->begin(), chunk_tail->size()); |
131 | } |
132 | |
133 | |
134 | std::shared_ptr<ReadBuffer> MemoryWriteBuffer::getReadBufferImpl() |
135 | { |
136 | auto res = std::make_shared<ReadBufferFromMemoryWriteBuffer>(std::move(*this)); |
137 | |
138 | /// invalidate members |
139 | chunk_list.clear(); |
140 | chunk_tail = chunk_list.begin(); |
141 | |
142 | return res; |
143 | } |
144 | |
145 | |
146 | MemoryWriteBuffer::~MemoryWriteBuffer() |
147 | { |
148 | for (const auto & range : chunk_list) |
149 | free(range.begin(), range.size()); |
150 | } |
151 | |
152 | } |
153 | |