1#include <IO/MemoryReadWriteBuffer.h>
2#include <common/likely.h>
3#include <boost/noncopyable.hpp>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
12}
13
14
15class ReadBufferFromMemoryWriteBuffer : public ReadBuffer, boost::noncopyable, private Allocator<false>
16{
17public:
18 explicit ReadBufferFromMemoryWriteBuffer(MemoryWriteBuffer && origin)
19 : ReadBuffer(nullptr, 0),
20 chunk_list(std::move(origin.chunk_list)),
21 end_pos(origin.position())
22 {
23 chunk_head = chunk_list.begin();
24 setChunk();
25 }
26
27 bool nextImpl() override
28 {
29 if (chunk_head == chunk_list.end())
30 return false;
31
32 ++chunk_head;
33 return setChunk();
34 }
35
36 ~ReadBufferFromMemoryWriteBuffer() override
37 {
38 for (const auto & range : chunk_list)
39 free(range.begin(), range.size());
40 }
41
42private:
43
44 /// update buffers and position according to chunk_head pointer
45 bool setChunk()
46 {
47 if (chunk_head != chunk_list.end())
48 {
49 internalBuffer() = *chunk_head;
50
51 /// It is last chunk, it should be truncated
52 if (std::next(chunk_head) != chunk_list.end())
53 buffer() = internalBuffer();
54 else
55 buffer() = Buffer(internalBuffer().begin(), end_pos);
56
57 position() = buffer().begin();
58 }
59 else
60 {
61 buffer() = internalBuffer() = Buffer(nullptr, nullptr);
62 position() = nullptr;
63 }
64
65 return buffer().size() != 0;
66 }
67
68 using Container = std::forward_list<BufferBase::Buffer>;
69
70 Container chunk_list;
71 Container::iterator chunk_head;
72 Position end_pos;
73};
74
75
76MemoryWriteBuffer::MemoryWriteBuffer(size_t max_total_size_, size_t initial_chunk_size_, double growth_rate_, size_t max_chunk_size_)
77 : WriteBuffer(nullptr, 0),
78 max_total_size(max_total_size_),
79 initial_chunk_size(initial_chunk_size_),
80 max_chunk_size(max_chunk_size_),
81 growth_rate(growth_rate_)
82{
83 addChunk();
84}
85
86
87void MemoryWriteBuffer::nextImpl()
88{
89 if (unlikely(hasPendingData()))
90 {
91 /// ignore flush
92 buffer() = Buffer(pos, buffer().end());
93 return;
94 }
95
96 addChunk();
97}
98
99
100void MemoryWriteBuffer::addChunk()
101{
102 size_t next_chunk_size;
103 if (chunk_list.empty())
104 {
105 chunk_tail = chunk_list.before_begin();
106 next_chunk_size = initial_chunk_size;
107 }
108 else
109 {
110 next_chunk_size = std::max(static_cast<size_t>(1), static_cast<size_t>(chunk_tail->size() * growth_rate));
111 next_chunk_size = std::min(next_chunk_size, max_chunk_size);
112 }
113
114 if (max_total_size)
115 {
116 if (total_chunks_size + next_chunk_size > max_total_size)
117 next_chunk_size = max_total_size - total_chunks_size;
118
119 if (0 == next_chunk_size)
120 {
121 set(position(), 0);
122 throw Exception("MemoryWriteBuffer limit is exhausted", ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED);
123 }
124 }
125
126 Position begin = reinterpret_cast<Position>(alloc(next_chunk_size));
127 chunk_tail = chunk_list.emplace_after(chunk_tail, begin, begin + next_chunk_size);
128 total_chunks_size += next_chunk_size;
129
130 set(chunk_tail->begin(), chunk_tail->size());
131}
132
133
134std::shared_ptr<ReadBuffer> MemoryWriteBuffer::getReadBufferImpl()
135{
136 auto res = std::make_shared<ReadBufferFromMemoryWriteBuffer>(std::move(*this));
137
138 /// invalidate members
139 chunk_list.clear();
140 chunk_tail = chunk_list.begin();
141
142 return res;
143}
144
145
146MemoryWriteBuffer::~MemoryWriteBuffer()
147{
148 for (const auto & range : chunk_list)
149 free(range.begin(), range.size());
150}
151
152}
153