1 | #include <gtest/gtest.h> |
2 | |
3 | #include <stdexcept> |
4 | #include <Poco/File.h> |
5 | #include <IO/CascadeWriteBuffer.h> |
6 | #include <IO/MemoryReadWriteBuffer.h> |
7 | #include <IO/WriteBufferFromTemporaryFile.h> |
8 | #include <IO/ReadBufferFromString.h> |
9 | #include <IO/WriteBufferFromString.h> |
10 | #include <IO/ConcatReadBuffer.h> |
11 | #include <IO/copyData.h> |
12 | #include <Common/typeid_cast.h> |
13 | |
14 | using namespace DB; |
15 | |
16 | |
17 | static std::string makeTestArray(size_t size) |
18 | { |
19 | std::string res(size, '\0'); |
20 | for (size_t i = 0; i < res.size(); ++i) |
21 | res[i] = i % 256; |
22 | return res; |
23 | } |
24 | |
25 | static void testCascadeBufferRedability( |
26 | std::string data, |
27 | CascadeWriteBuffer::WriteBufferPtrs && arg1, |
28 | CascadeWriteBuffer::WriteBufferConstructors && arg2) |
29 | { |
30 | CascadeWriteBuffer cascade{std::move(arg1), std::move(arg2)}; |
31 | |
32 | cascade.write(data.data(), data.size()); |
33 | EXPECT_EQ(cascade.count(), data.size()); |
34 | |
35 | std::vector<WriteBufferPtr> write_buffers; |
36 | std::vector<ReadBufferPtr> read_buffers; |
37 | std::vector<ReadBuffer *> read_buffers_raw; |
38 | cascade.getResultBuffers(write_buffers); |
39 | |
40 | for (WriteBufferPtr & wbuf : write_buffers) |
41 | { |
42 | if (!wbuf) |
43 | continue; |
44 | |
45 | auto wbuf_readable = dynamic_cast<IReadableWriteBuffer *>(wbuf.get()); |
46 | ASSERT_FALSE(!wbuf_readable); |
47 | |
48 | auto rbuf = wbuf_readable->tryGetReadBuffer(); |
49 | ASSERT_FALSE(!rbuf); |
50 | |
51 | read_buffers.emplace_back(rbuf); |
52 | read_buffers_raw.emplace_back(rbuf.get()); |
53 | } |
54 | |
55 | ConcatReadBuffer concat(read_buffers_raw); |
56 | std::string decoded_data; |
57 | { |
58 | WriteBufferFromString decoded_data_writer(decoded_data); |
59 | copyData(concat, decoded_data_writer); |
60 | } |
61 | |
62 | ASSERT_EQ(data, decoded_data); |
63 | } |
64 | |
65 | |
66 | TEST(CascadeWriteBuffer, RereadWithTwoMemoryBuffers) |
67 | try |
68 | { |
69 | size_t max_s = 32; |
70 | for (size_t s = 0; s < max_s; ++s) |
71 | { |
72 | testCascadeBufferRedability(makeTestArray(s), |
73 | { |
74 | std::make_shared<MemoryWriteBuffer>(s/2, 1, 2.0), |
75 | std::make_shared<MemoryWriteBuffer>(s - s/2, 1, 2.0) |
76 | }, |
77 | {}); |
78 | |
79 | testCascadeBufferRedability(makeTestArray(s), |
80 | { |
81 | std::make_shared<MemoryWriteBuffer>(s, 2, 1.5), |
82 | }, |
83 | {}); |
84 | |
85 | testCascadeBufferRedability(makeTestArray(s), |
86 | { |
87 | std::make_shared<MemoryWriteBuffer>(0, 1, 1.0), |
88 | }, |
89 | {}); |
90 | |
91 | testCascadeBufferRedability(makeTestArray(s), |
92 | { |
93 | std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/2), std::max(2ul, s/4), 0.5), |
94 | std::make_shared<MemoryWriteBuffer>(0, 4, 1.0), |
95 | }, |
96 | {}); |
97 | |
98 | testCascadeBufferRedability(makeTestArray(max_s), |
99 | { |
100 | std::make_shared<MemoryWriteBuffer>(s, 1, 2.0) |
101 | }, |
102 | { |
103 | [=] (auto) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); } |
104 | }); |
105 | |
106 | testCascadeBufferRedability(makeTestArray(max_s), |
107 | {}, |
108 | { |
109 | [=] (auto) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); }, |
110 | [=] (auto) { return std::make_shared<MemoryWriteBuffer>(s, 1, 2.0); } |
111 | }); |
112 | } |
113 | } |
114 | catch (...) |
115 | { |
116 | std::cerr << getCurrentExceptionMessage(true) << "\n" ; |
117 | throw; |
118 | } |
119 | |
120 | |
121 | static void checkHTTPHandlerCase(size_t input_size, size_t memory_buffer_size) |
122 | { |
123 | std::string src = makeTestArray(input_size); |
124 | std::string res_str(DBMS_DEFAULT_BUFFER_SIZE, '\0'); |
125 | |
126 | { |
127 | auto res_buf = std::make_shared<WriteBufferFromString>(res_str); |
128 | |
129 | CascadeWriteBuffer cascade( |
130 | { |
131 | std::make_shared<MemoryWriteBuffer>(memory_buffer_size) |
132 | }, |
133 | { |
134 | [res_buf] (const WriteBufferPtr & prev_buf) |
135 | { |
136 | auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get()); |
137 | if (prev_memory_buffer != nullptr) |
138 | { |
139 | auto rdbuf = prev_memory_buffer->tryGetReadBuffer(); |
140 | if (rdbuf != nullptr) |
141 | { |
142 | copyData(*rdbuf, *res_buf); |
143 | } |
144 | } |
145 | return res_buf; |
146 | } |
147 | }); |
148 | |
149 | cascade.write(src.data(), src.size()); |
150 | EXPECT_EQ(cascade.count(), src.size()); |
151 | } |
152 | |
153 | ASSERT_EQ(src.size(), res_str.size()); |
154 | ASSERT_TRUE(src == res_str); |
155 | } |
156 | |
157 | TEST(CascadeWriteBuffer, HTTPHandlerCase) |
158 | { |
159 | std::vector<size_t> sizes{1, 500000, DBMS_DEFAULT_BUFFER_SIZE, 1000000, 1451424, 1500000, 2000000, 2500000}; |
160 | |
161 | for (size_t input_size : sizes) |
162 | { |
163 | for (size_t memory_buffer_size : sizes) |
164 | { |
165 | if (input_size > memory_buffer_size) |
166 | checkHTTPHandlerCase(input_size, memory_buffer_size); |
167 | } |
168 | } |
169 | } |
170 | |
171 | |
172 | static void checkMemoryWriteBuffer(std::string data, MemoryWriteBuffer && buf) |
173 | { |
174 | buf.write(data.data(), data.size()); |
175 | ASSERT_EQ(buf.count(), data.size()); |
176 | |
177 | auto rbuf = buf.tryGetReadBuffer(); |
178 | ASSERT_TRUE(rbuf != nullptr); |
179 | ASSERT_TRUE(buf.tryGetReadBuffer() == nullptr); |
180 | |
181 | String res; |
182 | { |
183 | WriteBufferFromString res_buf(res); |
184 | copyData(*rbuf, res_buf); |
185 | } |
186 | |
187 | ASSERT_EQ(data, res); |
188 | } |
189 | |
190 | |
191 | TEST(MemoryWriteBuffer, WriteAndReread) |
192 | { |
193 | for (size_t s = 0; s < 2500000; s += 500000) |
194 | { |
195 | std::string data = makeTestArray(s); |
196 | size_t min_s = std::max(s, 1ul); |
197 | |
198 | checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s)); |
199 | checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s * 2, min_s)); |
200 | checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s * 4, min_s)); |
201 | |
202 | if (s > 1) |
203 | { |
204 | MemoryWriteBuffer buf(s - 1); |
205 | EXPECT_THROW(buf.write(data.data(), data.size()), DB::Exception); |
206 | } |
207 | } |
208 | |
209 | checkMemoryWriteBuffer(makeTestArray(1451424), MemoryWriteBuffer(1451424)); |
210 | } |
211 | |
212 | |
213 | TEST(TemporaryFileWriteBuffer, WriteAndReread) |
214 | try |
215 | { |
216 | for (size_t s = 0; s < 2500000; s += 500000) |
217 | { |
218 | std::string tmp_template = "tmp/TemporaryFileWriteBuffer/" ; |
219 | std::string data = makeTestArray(s); |
220 | |
221 | auto buf = WriteBufferFromTemporaryFile::create(tmp_template); |
222 | buf->write(data.data(), data.size()); |
223 | |
224 | std::string tmp_filename = buf->getFileName(); |
225 | ASSERT_EQ(tmp_template, tmp_filename.substr(0, tmp_template.size())); |
226 | |
227 | auto reread_buf = buf->tryGetReadBuffer(); |
228 | ASSERT_TRUE(reread_buf != nullptr); |
229 | std::string decoded_data; |
230 | { |
231 | WriteBufferFromString wbuf_decode(decoded_data); |
232 | copyData(*reread_buf, wbuf_decode); |
233 | } |
234 | |
235 | ASSERT_EQ(data.size(), decoded_data.size()); |
236 | ASSERT_TRUE(data == decoded_data); |
237 | |
238 | buf.reset(); |
239 | reread_buf.reset(); |
240 | ASSERT_TRUE(!Poco::File(tmp_filename).exists()); |
241 | } |
242 | } |
243 | catch (...) |
244 | { |
245 | std::cerr << getCurrentExceptionMessage(true) << "\n" ; |
246 | throw; |
247 | } |
248 | |
249 | |
250 | TEST(CascadeWriteBuffer, RereadWithTemporaryFileWriteBuffer) |
251 | try |
252 | { |
253 | const std::string tmp_template = "tmp/RereadWithTemporaryFileWriteBuffer/" ; |
254 | |
255 | for (size_t s = 0; s < 4000000; s += 1000000) |
256 | { |
257 | testCascadeBufferRedability(makeTestArray(s), |
258 | {}, |
259 | { |
260 | [=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); } |
261 | }); |
262 | |
263 | testCascadeBufferRedability(makeTestArray(s), |
264 | { |
265 | std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/3ul), 2, 1.5), |
266 | }, |
267 | { |
268 | [=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); } |
269 | }); |
270 | } |
271 | } |
272 | catch (...) |
273 | { |
274 | std::cerr << getCurrentExceptionMessage(true) << "\n" ; |
275 | throw; |
276 | } |
277 | |