1#include <gtest/gtest.h>
2
3#include <stdexcept>
4#include <Poco/File.h>
5#include <IO/CascadeWriteBuffer.h>
6#include <IO/MemoryReadWriteBuffer.h>
7#include <IO/WriteBufferFromTemporaryFile.h>
8#include <IO/ReadBufferFromString.h>
9#include <IO/WriteBufferFromString.h>
10#include <IO/ConcatReadBuffer.h>
11#include <IO/copyData.h>
12#include <Common/typeid_cast.h>
13
14using namespace DB;
15
16
17static std::string makeTestArray(size_t size)
18{
19 std::string res(size, '\0');
20 for (size_t i = 0; i < res.size(); ++i)
21 res[i] = i % 256;
22 return res;
23}
24
25static void testCascadeBufferRedability(
26 std::string data,
27 CascadeWriteBuffer::WriteBufferPtrs && arg1,
28 CascadeWriteBuffer::WriteBufferConstructors && arg2)
29{
30 CascadeWriteBuffer cascade{std::move(arg1), std::move(arg2)};
31
32 cascade.write(data.data(), data.size());
33 EXPECT_EQ(cascade.count(), data.size());
34
35 std::vector<WriteBufferPtr> write_buffers;
36 std::vector<ReadBufferPtr> read_buffers;
37 std::vector<ReadBuffer *> read_buffers_raw;
38 cascade.getResultBuffers(write_buffers);
39
40 for (WriteBufferPtr & wbuf : write_buffers)
41 {
42 if (!wbuf)
43 continue;
44
45 auto wbuf_readable = dynamic_cast<IReadableWriteBuffer *>(wbuf.get());
46 ASSERT_FALSE(!wbuf_readable);
47
48 auto rbuf = wbuf_readable->tryGetReadBuffer();
49 ASSERT_FALSE(!rbuf);
50
51 read_buffers.emplace_back(rbuf);
52 read_buffers_raw.emplace_back(rbuf.get());
53 }
54
55 ConcatReadBuffer concat(read_buffers_raw);
56 std::string decoded_data;
57 {
58 WriteBufferFromString decoded_data_writer(decoded_data);
59 copyData(concat, decoded_data_writer);
60 }
61
62 ASSERT_EQ(data, decoded_data);
63}
64
65
66TEST(CascadeWriteBuffer, RereadWithTwoMemoryBuffers)
67try
68{
69 size_t max_s = 32;
70 for (size_t s = 0; s < max_s; ++s)
71 {
72 testCascadeBufferRedability(makeTestArray(s),
73 {
74 std::make_shared<MemoryWriteBuffer>(s/2, 1, 2.0),
75 std::make_shared<MemoryWriteBuffer>(s - s/2, 1, 2.0)
76 },
77 {});
78
79 testCascadeBufferRedability(makeTestArray(s),
80 {
81 std::make_shared<MemoryWriteBuffer>(s, 2, 1.5),
82 },
83 {});
84
85 testCascadeBufferRedability(makeTestArray(s),
86 {
87 std::make_shared<MemoryWriteBuffer>(0, 1, 1.0),
88 },
89 {});
90
91 testCascadeBufferRedability(makeTestArray(s),
92 {
93 std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/2), std::max(2ul, s/4), 0.5),
94 std::make_shared<MemoryWriteBuffer>(0, 4, 1.0),
95 },
96 {});
97
98 testCascadeBufferRedability(makeTestArray(max_s),
99 {
100 std::make_shared<MemoryWriteBuffer>(s, 1, 2.0)
101 },
102 {
103 [=] (auto) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); }
104 });
105
106 testCascadeBufferRedability(makeTestArray(max_s),
107 {},
108 {
109 [=] (auto) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); },
110 [=] (auto) { return std::make_shared<MemoryWriteBuffer>(s, 1, 2.0); }
111 });
112 }
113}
114catch (...)
115{
116 std::cerr << getCurrentExceptionMessage(true) << "\n";
117 throw;
118}
119
120
121static void checkHTTPHandlerCase(size_t input_size, size_t memory_buffer_size)
122{
123 std::string src = makeTestArray(input_size);
124 std::string res_str(DBMS_DEFAULT_BUFFER_SIZE, '\0');
125
126 {
127 auto res_buf = std::make_shared<WriteBufferFromString>(res_str);
128
129 CascadeWriteBuffer cascade(
130 {
131 std::make_shared<MemoryWriteBuffer>(memory_buffer_size)
132 },
133 {
134 [res_buf] (const WriteBufferPtr & prev_buf)
135 {
136 auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get());
137 if (prev_memory_buffer != nullptr)
138 {
139 auto rdbuf = prev_memory_buffer->tryGetReadBuffer();
140 if (rdbuf != nullptr)
141 {
142 copyData(*rdbuf, *res_buf);
143 }
144 }
145 return res_buf;
146 }
147 });
148
149 cascade.write(src.data(), src.size());
150 EXPECT_EQ(cascade.count(), src.size());
151 }
152
153 ASSERT_EQ(src.size(), res_str.size());
154 ASSERT_TRUE(src == res_str);
155}
156
157TEST(CascadeWriteBuffer, HTTPHandlerCase)
158{
159 std::vector<size_t> sizes{1, 500000, DBMS_DEFAULT_BUFFER_SIZE, 1000000, 1451424, 1500000, 2000000, 2500000};
160
161 for (size_t input_size : sizes)
162 {
163 for (size_t memory_buffer_size : sizes)
164 {
165 if (input_size > memory_buffer_size)
166 checkHTTPHandlerCase(input_size, memory_buffer_size);
167 }
168 }
169}
170
171
172static void checkMemoryWriteBuffer(std::string data, MemoryWriteBuffer && buf)
173{
174 buf.write(data.data(), data.size());
175 ASSERT_EQ(buf.count(), data.size());
176
177 auto rbuf = buf.tryGetReadBuffer();
178 ASSERT_TRUE(rbuf != nullptr);
179 ASSERT_TRUE(buf.tryGetReadBuffer() == nullptr);
180
181 String res;
182 {
183 WriteBufferFromString res_buf(res);
184 copyData(*rbuf, res_buf);
185 }
186
187 ASSERT_EQ(data, res);
188}
189
190
191TEST(MemoryWriteBuffer, WriteAndReread)
192{
193 for (size_t s = 0; s < 2500000; s += 500000)
194 {
195 std::string data = makeTestArray(s);
196 size_t min_s = std::max(s, 1ul);
197
198 checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s));
199 checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s * 2, min_s));
200 checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s * 4, min_s));
201
202 if (s > 1)
203 {
204 MemoryWriteBuffer buf(s - 1);
205 EXPECT_THROW(buf.write(data.data(), data.size()), DB::Exception);
206 }
207 }
208
209 checkMemoryWriteBuffer(makeTestArray(1451424), MemoryWriteBuffer(1451424));
210}
211
212
213TEST(TemporaryFileWriteBuffer, WriteAndReread)
214try
215{
216 for (size_t s = 0; s < 2500000; s += 500000)
217 {
218 std::string tmp_template = "tmp/TemporaryFileWriteBuffer/";
219 std::string data = makeTestArray(s);
220
221 auto buf = WriteBufferFromTemporaryFile::create(tmp_template);
222 buf->write(data.data(), data.size());
223
224 std::string tmp_filename = buf->getFileName();
225 ASSERT_EQ(tmp_template, tmp_filename.substr(0, tmp_template.size()));
226
227 auto reread_buf = buf->tryGetReadBuffer();
228 ASSERT_TRUE(reread_buf != nullptr);
229 std::string decoded_data;
230 {
231 WriteBufferFromString wbuf_decode(decoded_data);
232 copyData(*reread_buf, wbuf_decode);
233 }
234
235 ASSERT_EQ(data.size(), decoded_data.size());
236 ASSERT_TRUE(data == decoded_data);
237
238 buf.reset();
239 reread_buf.reset();
240 ASSERT_TRUE(!Poco::File(tmp_filename).exists());
241 }
242}
243catch (...)
244{
245 std::cerr << getCurrentExceptionMessage(true) << "\n";
246 throw;
247}
248
249
250TEST(CascadeWriteBuffer, RereadWithTemporaryFileWriteBuffer)
251try
252{
253 const std::string tmp_template = "tmp/RereadWithTemporaryFileWriteBuffer/";
254
255 for (size_t s = 0; s < 4000000; s += 1000000)
256 {
257 testCascadeBufferRedability(makeTestArray(s),
258 {},
259 {
260 [=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); }
261 });
262
263 testCascadeBufferRedability(makeTestArray(s),
264 {
265 std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/3ul), 2, 1.5),
266 },
267 {
268 [=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); }
269 });
270 }
271}
272catch (...)
273{
274 std::cerr << getCurrentExceptionMessage(true) << "\n";
275 throw;
276}
277