| 1 | #include <gtest/gtest.h> |
| 2 | |
| 3 | #include <stdexcept> |
| 4 | #include <Poco/File.h> |
| 5 | #include <IO/CascadeWriteBuffer.h> |
| 6 | #include <IO/MemoryReadWriteBuffer.h> |
| 7 | #include <IO/WriteBufferFromTemporaryFile.h> |
| 8 | #include <IO/ReadBufferFromString.h> |
| 9 | #include <IO/WriteBufferFromString.h> |
| 10 | #include <IO/ConcatReadBuffer.h> |
| 11 | #include <IO/copyData.h> |
| 12 | #include <Common/typeid_cast.h> |
| 13 | |
| 14 | using namespace DB; |
| 15 | |
| 16 | |
| 17 | static std::string makeTestArray(size_t size) |
| 18 | { |
| 19 | std::string res(size, '\0'); |
| 20 | for (size_t i = 0; i < res.size(); ++i) |
| 21 | res[i] = i % 256; |
| 22 | return res; |
| 23 | } |
| 24 | |
| 25 | static void testCascadeBufferRedability( |
| 26 | std::string data, |
| 27 | CascadeWriteBuffer::WriteBufferPtrs && arg1, |
| 28 | CascadeWriteBuffer::WriteBufferConstructors && arg2) |
| 29 | { |
| 30 | CascadeWriteBuffer cascade{std::move(arg1), std::move(arg2)}; |
| 31 | |
| 32 | cascade.write(data.data(), data.size()); |
| 33 | EXPECT_EQ(cascade.count(), data.size()); |
| 34 | |
| 35 | std::vector<WriteBufferPtr> write_buffers; |
| 36 | std::vector<ReadBufferPtr> read_buffers; |
| 37 | std::vector<ReadBuffer *> read_buffers_raw; |
| 38 | cascade.getResultBuffers(write_buffers); |
| 39 | |
| 40 | for (WriteBufferPtr & wbuf : write_buffers) |
| 41 | { |
| 42 | if (!wbuf) |
| 43 | continue; |
| 44 | |
| 45 | auto wbuf_readable = dynamic_cast<IReadableWriteBuffer *>(wbuf.get()); |
| 46 | ASSERT_FALSE(!wbuf_readable); |
| 47 | |
| 48 | auto rbuf = wbuf_readable->tryGetReadBuffer(); |
| 49 | ASSERT_FALSE(!rbuf); |
| 50 | |
| 51 | read_buffers.emplace_back(rbuf); |
| 52 | read_buffers_raw.emplace_back(rbuf.get()); |
| 53 | } |
| 54 | |
| 55 | ConcatReadBuffer concat(read_buffers_raw); |
| 56 | std::string decoded_data; |
| 57 | { |
| 58 | WriteBufferFromString decoded_data_writer(decoded_data); |
| 59 | copyData(concat, decoded_data_writer); |
| 60 | } |
| 61 | |
| 62 | ASSERT_EQ(data, decoded_data); |
| 63 | } |
| 64 | |
| 65 | |
| 66 | TEST(CascadeWriteBuffer, RereadWithTwoMemoryBuffers) |
| 67 | try |
| 68 | { |
| 69 | size_t max_s = 32; |
| 70 | for (size_t s = 0; s < max_s; ++s) |
| 71 | { |
| 72 | testCascadeBufferRedability(makeTestArray(s), |
| 73 | { |
| 74 | std::make_shared<MemoryWriteBuffer>(s/2, 1, 2.0), |
| 75 | std::make_shared<MemoryWriteBuffer>(s - s/2, 1, 2.0) |
| 76 | }, |
| 77 | {}); |
| 78 | |
| 79 | testCascadeBufferRedability(makeTestArray(s), |
| 80 | { |
| 81 | std::make_shared<MemoryWriteBuffer>(s, 2, 1.5), |
| 82 | }, |
| 83 | {}); |
| 84 | |
| 85 | testCascadeBufferRedability(makeTestArray(s), |
| 86 | { |
| 87 | std::make_shared<MemoryWriteBuffer>(0, 1, 1.0), |
| 88 | }, |
| 89 | {}); |
| 90 | |
| 91 | testCascadeBufferRedability(makeTestArray(s), |
| 92 | { |
| 93 | std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/2), std::max(2ul, s/4), 0.5), |
| 94 | std::make_shared<MemoryWriteBuffer>(0, 4, 1.0), |
| 95 | }, |
| 96 | {}); |
| 97 | |
| 98 | testCascadeBufferRedability(makeTestArray(max_s), |
| 99 | { |
| 100 | std::make_shared<MemoryWriteBuffer>(s, 1, 2.0) |
| 101 | }, |
| 102 | { |
| 103 | [=] (auto) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); } |
| 104 | }); |
| 105 | |
| 106 | testCascadeBufferRedability(makeTestArray(max_s), |
| 107 | {}, |
| 108 | { |
| 109 | [=] (auto) { return std::make_shared<MemoryWriteBuffer>(max_s - s, 1, 2.0); }, |
| 110 | [=] (auto) { return std::make_shared<MemoryWriteBuffer>(s, 1, 2.0); } |
| 111 | }); |
| 112 | } |
| 113 | } |
| 114 | catch (...) |
| 115 | { |
| 116 | std::cerr << getCurrentExceptionMessage(true) << "\n" ; |
| 117 | throw; |
| 118 | } |
| 119 | |
| 120 | |
| 121 | static void checkHTTPHandlerCase(size_t input_size, size_t memory_buffer_size) |
| 122 | { |
| 123 | std::string src = makeTestArray(input_size); |
| 124 | std::string res_str(DBMS_DEFAULT_BUFFER_SIZE, '\0'); |
| 125 | |
| 126 | { |
| 127 | auto res_buf = std::make_shared<WriteBufferFromString>(res_str); |
| 128 | |
| 129 | CascadeWriteBuffer cascade( |
| 130 | { |
| 131 | std::make_shared<MemoryWriteBuffer>(memory_buffer_size) |
| 132 | }, |
| 133 | { |
| 134 | [res_buf] (const WriteBufferPtr & prev_buf) |
| 135 | { |
| 136 | auto prev_memory_buffer = typeid_cast<MemoryWriteBuffer *>(prev_buf.get()); |
| 137 | if (prev_memory_buffer != nullptr) |
| 138 | { |
| 139 | auto rdbuf = prev_memory_buffer->tryGetReadBuffer(); |
| 140 | if (rdbuf != nullptr) |
| 141 | { |
| 142 | copyData(*rdbuf, *res_buf); |
| 143 | } |
| 144 | } |
| 145 | return res_buf; |
| 146 | } |
| 147 | }); |
| 148 | |
| 149 | cascade.write(src.data(), src.size()); |
| 150 | EXPECT_EQ(cascade.count(), src.size()); |
| 151 | } |
| 152 | |
| 153 | ASSERT_EQ(src.size(), res_str.size()); |
| 154 | ASSERT_TRUE(src == res_str); |
| 155 | } |
| 156 | |
| 157 | TEST(CascadeWriteBuffer, HTTPHandlerCase) |
| 158 | { |
| 159 | std::vector<size_t> sizes{1, 500000, DBMS_DEFAULT_BUFFER_SIZE, 1000000, 1451424, 1500000, 2000000, 2500000}; |
| 160 | |
| 161 | for (size_t input_size : sizes) |
| 162 | { |
| 163 | for (size_t memory_buffer_size : sizes) |
| 164 | { |
| 165 | if (input_size > memory_buffer_size) |
| 166 | checkHTTPHandlerCase(input_size, memory_buffer_size); |
| 167 | } |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | |
| 172 | static void checkMemoryWriteBuffer(std::string data, MemoryWriteBuffer && buf) |
| 173 | { |
| 174 | buf.write(data.data(), data.size()); |
| 175 | ASSERT_EQ(buf.count(), data.size()); |
| 176 | |
| 177 | auto rbuf = buf.tryGetReadBuffer(); |
| 178 | ASSERT_TRUE(rbuf != nullptr); |
| 179 | ASSERT_TRUE(buf.tryGetReadBuffer() == nullptr); |
| 180 | |
| 181 | String res; |
| 182 | { |
| 183 | WriteBufferFromString res_buf(res); |
| 184 | copyData(*rbuf, res_buf); |
| 185 | } |
| 186 | |
| 187 | ASSERT_EQ(data, res); |
| 188 | } |
| 189 | |
| 190 | |
| 191 | TEST(MemoryWriteBuffer, WriteAndReread) |
| 192 | { |
| 193 | for (size_t s = 0; s < 2500000; s += 500000) |
| 194 | { |
| 195 | std::string data = makeTestArray(s); |
| 196 | size_t min_s = std::max(s, 1ul); |
| 197 | |
| 198 | checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s)); |
| 199 | checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s * 2, min_s)); |
| 200 | checkMemoryWriteBuffer(data, MemoryWriteBuffer(min_s * 4, min_s)); |
| 201 | |
| 202 | if (s > 1) |
| 203 | { |
| 204 | MemoryWriteBuffer buf(s - 1); |
| 205 | EXPECT_THROW(buf.write(data.data(), data.size()), DB::Exception); |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | checkMemoryWriteBuffer(makeTestArray(1451424), MemoryWriteBuffer(1451424)); |
| 210 | } |
| 211 | |
| 212 | |
| 213 | TEST(TemporaryFileWriteBuffer, WriteAndReread) |
| 214 | try |
| 215 | { |
| 216 | for (size_t s = 0; s < 2500000; s += 500000) |
| 217 | { |
| 218 | std::string tmp_template = "tmp/TemporaryFileWriteBuffer/" ; |
| 219 | std::string data = makeTestArray(s); |
| 220 | |
| 221 | auto buf = WriteBufferFromTemporaryFile::create(tmp_template); |
| 222 | buf->write(data.data(), data.size()); |
| 223 | |
| 224 | std::string tmp_filename = buf->getFileName(); |
| 225 | ASSERT_EQ(tmp_template, tmp_filename.substr(0, tmp_template.size())); |
| 226 | |
| 227 | auto reread_buf = buf->tryGetReadBuffer(); |
| 228 | ASSERT_TRUE(reread_buf != nullptr); |
| 229 | std::string decoded_data; |
| 230 | { |
| 231 | WriteBufferFromString wbuf_decode(decoded_data); |
| 232 | copyData(*reread_buf, wbuf_decode); |
| 233 | } |
| 234 | |
| 235 | ASSERT_EQ(data.size(), decoded_data.size()); |
| 236 | ASSERT_TRUE(data == decoded_data); |
| 237 | |
| 238 | buf.reset(); |
| 239 | reread_buf.reset(); |
| 240 | ASSERT_TRUE(!Poco::File(tmp_filename).exists()); |
| 241 | } |
| 242 | } |
| 243 | catch (...) |
| 244 | { |
| 245 | std::cerr << getCurrentExceptionMessage(true) << "\n" ; |
| 246 | throw; |
| 247 | } |
| 248 | |
| 249 | |
| 250 | TEST(CascadeWriteBuffer, RereadWithTemporaryFileWriteBuffer) |
| 251 | try |
| 252 | { |
| 253 | const std::string tmp_template = "tmp/RereadWithTemporaryFileWriteBuffer/" ; |
| 254 | |
| 255 | for (size_t s = 0; s < 4000000; s += 1000000) |
| 256 | { |
| 257 | testCascadeBufferRedability(makeTestArray(s), |
| 258 | {}, |
| 259 | { |
| 260 | [=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); } |
| 261 | }); |
| 262 | |
| 263 | testCascadeBufferRedability(makeTestArray(s), |
| 264 | { |
| 265 | std::make_shared<MemoryWriteBuffer>(std::max(1ul, s/3ul), 2, 1.5), |
| 266 | }, |
| 267 | { |
| 268 | [=] (auto) { return WriteBufferFromTemporaryFile::create(tmp_template); } |
| 269 | }); |
| 270 | } |
| 271 | } |
| 272 | catch (...) |
| 273 | { |
| 274 | std::cerr << getCurrentExceptionMessage(true) << "\n" ; |
| 275 | throw; |
| 276 | } |
| 277 | |