1#include <Storages/StorageLog.h>
2#include <Storages/StorageFactory.h>
3
4#include <Common/Exception.h>
5#include <Common/StringUtils/StringUtils.h>
6
7#include <IO/ReadBufferFromFile.h>
8#include <IO/WriteBufferFromFile.h>
9#include <Compression/CompressedReadBuffer.h>
10#include <Compression/CompressedWriteBuffer.h>
11#include <IO/ReadHelpers.h>
12#include <IO/WriteHelpers.h>
13
14#include <DataTypes/NestedUtils.h>
15
16#include <DataStreams/IBlockInputStream.h>
17#include <DataStreams/IBlockOutputStream.h>
18
19#include <Columns/ColumnArray.h>
20
21#include <Common/typeid_cast.h>
22
23#include <Interpreters/Context.h>
24
25#include <Poco/Path.h>
26#include <Poco/DirectoryIterator.h>
27
28
29#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
30#define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk"
31
32
33namespace DB
34{
35
36namespace ErrorCodes
37{
38 extern const int LOGICAL_ERROR;
39 extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
40 extern const int NO_SUCH_COLUMN_IN_TABLE;
41 extern const int DUPLICATE_COLUMN;
42 extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
43 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
44 extern const int INCORRECT_FILE_NAME;
45}
46
47
48class LogBlockInputStream final : public IBlockInputStream
49{
50public:
51 LogBlockInputStream(
52 size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_,
53 size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
54 : block_size(block_size_),
55 columns(columns_),
56 storage(storage_),
57 mark_number(mark_number_),
58 rows_limit(rows_limit_),
59 max_read_buffer_size(max_read_buffer_size_)
60 {
61 }
62
63 String getName() const override { return "Log"; }
64
65 Block getHeader() const override
66 {
67 Block res;
68
69 for (const auto & name_type : columns)
70 res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
71
72 return Nested::flatten(res);
73 }
74
75protected:
76 Block readImpl() override;
77
78private:
79
80 size_t block_size;
81 NamesAndTypesList columns;
82 StorageLog & storage;
83 size_t mark_number; /// from what mark to read data
84 size_t rows_limit; /// The maximum number of rows that can be read
85 size_t rows_read = 0;
86 size_t max_read_buffer_size;
87
88 struct Stream
89 {
90 Stream(const std::string & data_path, size_t offset, size_t max_read_buffer_size_)
91 : plain(data_path, std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size_), Poco::File(data_path).getSize())),
92 compressed(plain)
93 {
94 if (offset)
95 plain.seek(offset);
96 }
97
98 ReadBufferFromFile plain;
99 CompressedReadBuffer compressed;
100 };
101
102 using FileStreams = std::map<std::string, Stream>;
103 FileStreams streams;
104
105 using DeserializeState = IDataType::DeserializeBinaryBulkStatePtr;
106 using DeserializeStates = std::map<String, DeserializeState>;
107 DeserializeStates deserialize_states;
108
109 void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read);
110
111};
112
113
114class LogBlockOutputStream final : public IBlockOutputStream
115{
116public:
117 explicit LogBlockOutputStream(StorageLog & storage_)
118 : storage(storage_),
119 lock(storage.rwlock),
120 marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY)
121 {
122 }
123
124 ~LogBlockOutputStream() override
125 {
126 try
127 {
128 writeSuffix();
129 }
130 catch (...)
131 {
132 tryLogCurrentException(__PRETTY_FUNCTION__);
133 }
134 }
135
136 Block getHeader() const override { return storage.getSampleBlock(); }
137 void write(const Block & block) override;
138 void writeSuffix() override;
139
140private:
141 StorageLog & storage;
142 std::unique_lock<std::shared_mutex> lock;
143 bool done = false;
144
145 struct Stream
146 {
147 Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
148 plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
149 compressed(plain, std::move(codec), max_compress_block_size)
150 {
151 plain_offset = Poco::File(data_path).getSize();
152 }
153
154 WriteBufferFromFile plain;
155 CompressedWriteBuffer compressed;
156
157 size_t plain_offset; /// How many bytes were in the file at the time the LogBlockOutputStream was created.
158
159 void finalize()
160 {
161 compressed.next();
162 plain.next();
163 }
164 };
165
166 using Mark = StorageLog::Mark;
167 using MarksForColumns = std::vector<std::pair<size_t, Mark>>;
168
169 using FileStreams = std::map<std::string, Stream>;
170 FileStreams streams;
171
172 using WrittenStreams = std::set<std::string>;
173
174 WriteBufferFromFile marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
175
176 using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
177 using SerializeStates = std::map<String, SerializeState>;
178 SerializeStates serialize_states;
179
180 IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
181
182 void writeData(const String & name, const IDataType & type, const IColumn & column,
183 MarksForColumns & out_marks,
184 WrittenStreams & written_streams);
185
186 void writeMarks(MarksForColumns && marks);
187};
188
189
190Block LogBlockInputStream::readImpl()
191{
192 Block res;
193
194 if (rows_read == rows_limit)
195 return res;
196
197 /// If there are no files in the folder, the table is empty.
198 if (Poco::DirectoryIterator(storage.getFullPath()) == Poco::DirectoryIterator())
199 return res;
200
201 /// How many rows to read for the next block.
202 size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
203
204 for (const auto & name_type : columns)
205 {
206 MutableColumnPtr column = name_type.type->createColumn();
207
208 try
209 {
210 readData(name_type.name, *name_type.type, *column, max_rows_to_read);
211 }
212 catch (Exception & e)
213 {
214 e.addMessage("while reading column " + name_type.name + " at " + storage.path);
215 throw;
216 }
217
218 if (column->size())
219 res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name));
220 }
221
222 if (res)
223 rows_read += res.rows();
224
225 if (!res || rows_read == rows_limit)
226 {
227 /** Close the files (before destroying the object).
228 * When many sources are created, but simultaneously reading only a few of them,
229 * buffers don't waste memory.
230 */
231 streams.clear();
232 }
233
234 return Nested::flatten(res);
235}
236
237
238void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read)
239{
240 IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
241
242 auto createStringGetter = [&](bool stream_for_prefix)
243 {
244 return [&, stream_for_prefix] (const IDataType::SubstreamPath & path) -> ReadBuffer *
245 {
246 String stream_name = IDataType::getFileNameForStream(name, path);
247
248 const auto & file_it = storage.files.find(stream_name);
249 if (storage.files.end() == file_it)
250 throw Exception("Logical error: no information about file " + stream_name + " in StorageLog", ErrorCodes::LOGICAL_ERROR);
251
252 UInt64 offset = 0;
253 if (!stream_for_prefix && mark_number)
254 offset = file_it->second.marks[mark_number].offset;
255
256 auto & data_file_path = file_it->second.data_file.path();
257 auto it = streams.try_emplace(stream_name, data_file_path, offset, max_read_buffer_size).first;
258 return &it->second.compressed;
259 };
260 };
261
262 if (deserialize_states.count(name) == 0)
263 {
264 settings.getter = createStringGetter(true);
265 type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
266 }
267
268 settings.getter = createStringGetter(false);
269 type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_states[name]);
270}
271
272
273void LogBlockOutputStream::write(const Block & block)
274{
275 storage.check(block, true);
276
277 /// The set of written offset columns so that you do not write shared offsets of columns for nested structures multiple times
278 WrittenStreams written_streams;
279
280 MarksForColumns marks;
281 marks.reserve(storage.file_count);
282
283 for (size_t i = 0; i < block.columns(); ++i)
284 {
285 const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
286 writeData(column.name, *column.type, *column.column, marks, written_streams);
287 }
288
289 writeMarks(std::move(marks));
290}
291
292
293void LogBlockOutputStream::writeSuffix()
294{
295 if (done)
296 return;
297 done = true;
298
299 WrittenStreams written_streams;
300 IDataType::SerializeBinaryBulkSettings settings;
301 for (const auto & column : getHeader())
302 {
303 auto it = serialize_states.find(column.name);
304 if (it != serialize_states.end())
305 {
306 settings.getter = createStreamGetter(column.name, written_streams);
307 column.type->serializeBinaryBulkStateSuffix(settings, it->second);
308 }
309 }
310
311 /// Finish write.
312 marks_stream.next();
313
314 for (auto & name_stream : streams)
315 name_stream.second.finalize();
316
317 std::vector<Poco::File> column_files;
318 for (const auto & name_stream : streams)
319 column_files.push_back(storage.files[name_stream.first].data_file);
320 column_files.push_back(storage.marks_file);
321
322 storage.file_checker.update(column_files.begin(), column_files.end());
323
324 streams.clear();
325}
326
327
328IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const String & name,
329 WrittenStreams & written_streams)
330{
331 return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
332 {
333 String stream_name = IDataType::getFileNameForStream(name, path);
334 if (written_streams.count(stream_name))
335 return nullptr;
336
337 auto it = streams.find(stream_name);
338 if (streams.end() == it)
339 throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream",
340 ErrorCodes::LOGICAL_ERROR);
341 return &it->second.compressed;
342 };
343}
344
345
346void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
347 MarksForColumns & out_marks,
348 WrittenStreams & written_streams)
349{
350 IDataType::SerializeBinaryBulkSettings settings;
351
352 type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
353 {
354 String stream_name = IDataType::getFileNameForStream(name, path);
355 if (written_streams.count(stream_name))
356 return;
357
358 const auto & columns = storage.getColumns();
359 streams.try_emplace(
360 stream_name,
361 storage.files[stream_name].data_file.path(),
362 columns.getCodecOrDefault(name),
363 storage.max_compress_block_size);
364 }, settings.path);
365
366 settings.getter = createStreamGetter(name, written_streams);
367
368 if (serialize_states.count(name) == 0)
369 type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
370
371 type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
372 {
373 String stream_name = IDataType::getFileNameForStream(name, path);
374 if (written_streams.count(stream_name))
375 return;
376
377 const auto & file = storage.files[stream_name];
378 const auto stream_it = streams.find(stream_name);
379
380 Mark mark;
381 mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size();
382 mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count();
383
384 out_marks.emplace_back(file.column_index, mark);
385 }, settings.path);
386
387 type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
388
389 type.enumerateStreams([&] (const IDataType::SubstreamPath & path)
390 {
391 String stream_name = IDataType::getFileNameForStream(name, path);
392 if (!written_streams.emplace(stream_name).second)
393 return;
394
395 auto it = streams.find(stream_name);
396 if (streams.end() == it)
397 throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream", ErrorCodes::LOGICAL_ERROR);
398 it->second.compressed.next();
399 }, settings.path);
400}
401
402
403void LogBlockOutputStream::writeMarks(MarksForColumns && marks)
404{
405 if (marks.size() != storage.file_count)
406 throw Exception("Wrong number of marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
407
408 std::sort(marks.begin(), marks.end(), [](const auto & a, const auto & b) { return a.first < b.first; });
409
410 for (const auto & mark : marks)
411 {
412 writeIntBinary(mark.second.rows, marks_stream);
413 writeIntBinary(mark.second.offset, marks_stream);
414
415 size_t column_index = mark.first;
416 storage.files[storage.column_names_by_idx[column_index]].marks.push_back(mark.second);
417 }
418}
419
420StorageLog::StorageLog(
421 const std::string & relative_path_,
422 const std::string & database_name_,
423 const std::string & table_name_,
424 const ColumnsDescription & columns_,
425 const ConstraintsDescription & constraints_,
426 size_t max_compress_block_size_,
427 const Context & context_)
428 : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_),
429 max_compress_block_size(max_compress_block_size_),
430 file_checker(path + "sizes.json")
431{
432 setColumns(columns_);
433 setConstraints(constraints_);
434
435 if (relative_path_.empty())
436 throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
437
438 /// create files if they do not exist
439 Poco::File(path).createDirectories();
440
441 for (const auto & column : getColumns().getAllPhysical())
442 addFiles(column.name, *column.type);
443
444 marks_file = Poco::File(path + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
445}
446
447
448void StorageLog::addFiles(const String & column_name, const IDataType & type)
449{
450 if (files.end() != files.find(column_name))
451 throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
452 ErrorCodes::DUPLICATE_COLUMN);
453
454 IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
455 {
456 String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
457
458 if (!files.count(stream_name))
459 {
460 ColumnData & column_data = files[stream_name];
461 column_data.column_index = file_count;
462 column_data.data_file = Poco::File{
463 path + stream_name + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION};
464
465 column_names_by_idx.push_back(stream_name);
466 ++file_count;
467 }
468 };
469
470 IDataType::SubstreamPath substream_path;
471 type.enumerateStreams(stream_callback, substream_path);
472}
473
474
475void StorageLog::loadMarks()
476{
477 std::unique_lock<std::shared_mutex> lock(rwlock);
478
479 if (loaded_marks)
480 return;
481
482 using FilesByIndex = std::vector<Files_t::iterator>;
483
484 FilesByIndex files_by_index(file_count);
485 for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
486 files_by_index[it->second.column_index] = it;
487
488 if (marks_file.exists())
489 {
490 size_t file_size = marks_file.getSize();
491 if (file_size % (file_count * sizeof(Mark)) != 0)
492 throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
493
494 size_t marks_count = file_size / (file_count * sizeof(Mark));
495
496 for (auto & file : files_by_index)
497 file->second.marks.reserve(marks_count);
498
499 ReadBufferFromFile marks_rb(marks_file.path(), 32768);
500 while (!marks_rb.eof())
501 {
502 for (size_t i = 0; i < files_by_index.size(); ++i)
503 {
504 Mark mark;
505 readIntBinary(mark.rows, marks_rb);
506 readIntBinary(mark.offset, marks_rb);
507 files_by_index[i]->second.marks.push_back(mark);
508 }
509 }
510 }
511
512 loaded_marks = true;
513}
514
515
516void StorageLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
517{
518 std::unique_lock<std::shared_mutex> lock(rwlock);
519
520 /// Rename directory with data.
521 String new_path = base_path + new_path_to_table_data;
522 Poco::File(path).renameTo(new_path);
523
524 path = new_path;
525 table_name = new_table_name;
526 database_name = new_database_name;
527 file_checker.setPath(path + "sizes.json");
528
529 for (auto & file : files)
530 file.second.data_file = Poco::File(path + Poco::Path(file.second.data_file.path()).getFileName());
531
532 marks_file = Poco::File(path + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
533}
534
535void StorageLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
536{
537 std::shared_lock<std::shared_mutex> lock(rwlock);
538
539 String table_dir = path;
540
541 files.clear();
542 file_count = 0;
543 loaded_marks = false;
544
545 std::vector<Poco::File> data_files;
546 Poco::File(table_dir).list(data_files);
547
548 for (auto & file : data_files)
549 file.remove(false);
550
551 for (const auto & column : getColumns().getAllPhysical())
552 addFiles(column.name, *column.type);
553
554 file_checker = FileChecker{table_dir + "/" + "sizes.json"};
555 marks_file = Poco::File(table_dir + "/" + DBMS_STORAGE_LOG_MARKS_FILE_NAME);
556}
557
558
559const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const
560{
561 const String & column_name = getColumns().begin()->name;
562 const IDataType & column_type = *getColumns().begin()->type;
563 String filename;
564
565 /** We take marks from first column.
566 * If this is a data type with multiple stream, get the first stream, that we assume have real row count.
567 * (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays).
568 */
569 IDataType::SubstreamPath substream_root_path;
570 column_type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
571 {
572 if (filename.empty())
573 filename = IDataType::getFileNameForStream(column_name, substream_path);
574 }, substream_root_path);
575
576 Files_t::const_iterator it = files.find(filename);
577 if (files.end() == it)
578 throw Exception("Cannot find file " + filename, ErrorCodes::LOGICAL_ERROR);
579
580 return it->second.marks;
581}
582
583BlockInputStreams StorageLog::read(
584 const Names & column_names,
585 const SelectQueryInfo & /*query_info*/,
586 const Context & context,
587 QueryProcessingStage::Enum /*processed_stage*/,
588 size_t max_block_size,
589 unsigned num_streams)
590{
591 check(column_names);
592 loadMarks();
593
594 NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names));
595
596 std::shared_lock<std::shared_mutex> lock(rwlock);
597
598 BlockInputStreams res;
599
600 const Marks & marks = getMarksWithRealRowCount();
601 size_t marks_size = marks.size();
602
603 if (num_streams > marks_size)
604 num_streams = marks_size;
605
606 size_t max_read_buffer_size = context.getSettingsRef().max_read_buffer_size;
607
608 for (size_t stream = 0; stream < num_streams; ++stream)
609 {
610 size_t mark_begin = stream * marks_size / num_streams;
611 size_t mark_end = (stream + 1) * marks_size / num_streams;
612
613 size_t rows_begin = mark_begin ? marks[mark_begin - 1].rows : 0;
614 size_t rows_end = mark_end ? marks[mark_end - 1].rows : 0;
615
616 res.emplace_back(std::make_shared<LogBlockInputStream>(
617 max_block_size,
618 all_columns,
619 *this,
620 mark_begin,
621 rows_end - rows_begin,
622 max_read_buffer_size));
623 }
624
625 return res;
626}
627
628BlockOutputStreamPtr StorageLog::write(
629 const ASTPtr & /*query*/, const Context & /*context*/)
630{
631 loadMarks();
632 return std::make_shared<LogBlockOutputStream>(*this);
633}
634
635CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
636{
637 std::shared_lock<std::shared_mutex> lock(rwlock);
638 return file_checker.check();
639}
640
641
642void registerStorageLog(StorageFactory & factory)
643{
644 factory.registerStorage("Log", [](const StorageFactory::Arguments & args)
645 {
646 if (!args.engine_args.empty())
647 throw Exception(
648 "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
649 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
650
651 return StorageLog::create(
652 args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints,
653 args.context.getSettings().max_compress_block_size, args.context);
654 });
655}
656
657}
658