1 | #include <Storages/StorageLog.h> |
2 | #include <Storages/StorageFactory.h> |
3 | |
4 | #include <Common/Exception.h> |
5 | #include <Common/StringUtils/StringUtils.h> |
6 | |
7 | #include <IO/ReadBufferFromFile.h> |
8 | #include <IO/WriteBufferFromFile.h> |
9 | #include <Compression/CompressedReadBuffer.h> |
10 | #include <Compression/CompressedWriteBuffer.h> |
11 | #include <IO/ReadHelpers.h> |
12 | #include <IO/WriteHelpers.h> |
13 | |
14 | #include <DataTypes/NestedUtils.h> |
15 | |
16 | #include <DataStreams/IBlockInputStream.h> |
17 | #include <DataStreams/IBlockOutputStream.h> |
18 | |
19 | #include <Columns/ColumnArray.h> |
20 | |
21 | #include <Common/typeid_cast.h> |
22 | |
23 | #include <Interpreters/Context.h> |
24 | |
25 | #include <Poco/Path.h> |
26 | #include <Poco/DirectoryIterator.h> |
27 | |
28 | |
29 | #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" |
30 | #define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk" |
31 | |
32 | |
33 | namespace DB |
34 | { |
35 | |
36 | namespace ErrorCodes |
37 | { |
38 | extern const int LOGICAL_ERROR; |
39 | extern const int EMPTY_LIST_OF_COLUMNS_PASSED; |
40 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
41 | extern const int DUPLICATE_COLUMN; |
42 | extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT; |
43 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
44 | extern const int INCORRECT_FILE_NAME; |
45 | } |
46 | |
47 | |
48 | class LogBlockInputStream final : public IBlockInputStream |
49 | { |
50 | public: |
51 | LogBlockInputStream( |
52 | size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_, |
53 | size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_) |
54 | : block_size(block_size_), |
55 | columns(columns_), |
56 | storage(storage_), |
57 | mark_number(mark_number_), |
58 | rows_limit(rows_limit_), |
59 | max_read_buffer_size(max_read_buffer_size_) |
60 | { |
61 | } |
62 | |
63 | String getName() const override { return "Log" ; } |
64 | |
65 | Block () const override |
66 | { |
67 | Block res; |
68 | |
69 | for (const auto & name_type : columns) |
70 | res.insert({ name_type.type->createColumn(), name_type.type, name_type.name }); |
71 | |
72 | return Nested::flatten(res); |
73 | } |
74 | |
75 | protected: |
76 | Block readImpl() override; |
77 | |
78 | private: |
79 | |
80 | size_t block_size; |
81 | NamesAndTypesList columns; |
82 | StorageLog & storage; |
83 | size_t mark_number; /// from what mark to read data |
84 | size_t rows_limit; /// The maximum number of rows that can be read |
85 | size_t rows_read = 0; |
86 | size_t max_read_buffer_size; |
87 | |
88 | struct Stream |
89 | { |
90 | Stream(const std::string & data_path, size_t offset, size_t max_read_buffer_size_) |
91 | : plain(data_path, std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size_), Poco::File(data_path).getSize())), |
92 | compressed(plain) |
93 | { |
94 | if (offset) |
95 | plain.seek(offset); |
96 | } |
97 | |
98 | ReadBufferFromFile plain; |
99 | CompressedReadBuffer compressed; |
100 | }; |
101 | |
102 | using FileStreams = std::map<std::string, Stream>; |
103 | FileStreams streams; |
104 | |
105 | using DeserializeState = IDataType::DeserializeBinaryBulkStatePtr; |
106 | using DeserializeStates = std::map<String, DeserializeState>; |
107 | DeserializeStates deserialize_states; |
108 | |
109 | void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read); |
110 | |
111 | }; |
112 | |
113 | |
114 | class LogBlockOutputStream final : public IBlockOutputStream |
115 | { |
116 | public: |
117 | explicit LogBlockOutputStream(StorageLog & storage_) |
118 | : storage(storage_), |
119 | lock(storage.rwlock), |
120 | marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY) |
121 | { |
122 | } |
123 | |
124 | ~LogBlockOutputStream() override |
125 | { |
126 | try |
127 | { |
128 | writeSuffix(); |
129 | } |
130 | catch (...) |
131 | { |
132 | tryLogCurrentException(__PRETTY_FUNCTION__); |
133 | } |
134 | } |
135 | |
136 | Block () const override { return storage.getSampleBlock(); } |
137 | void write(const Block & block) override; |
138 | void writeSuffix() override; |
139 | |
140 | private: |
141 | StorageLog & storage; |
142 | std::unique_lock<std::shared_mutex> lock; |
143 | bool done = false; |
144 | |
145 | struct Stream |
146 | { |
147 | Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) : |
148 | plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), |
149 | compressed(plain, std::move(codec), max_compress_block_size) |
150 | { |
151 | plain_offset = Poco::File(data_path).getSize(); |
152 | } |
153 | |
154 | WriteBufferFromFile plain; |
155 | CompressedWriteBuffer compressed; |
156 | |
157 | size_t plain_offset; /// How many bytes were in the file at the time the LogBlockOutputStream was created. |
158 | |
159 | void finalize() |
160 | { |
161 | compressed.next(); |
162 | plain.next(); |
163 | } |
164 | }; |
165 | |
166 | using Mark = StorageLog::Mark; |
167 | using MarksForColumns = std::vector<std::pair<size_t, Mark>>; |
168 | |
169 | using FileStreams = std::map<std::string, Stream>; |
170 | FileStreams streams; |
171 | |
172 | using WrittenStreams = std::set<std::string>; |
173 | |
174 | WriteBufferFromFile marks_stream; /// Declared below `lock` to make the file open when rwlock is captured. |
175 | |
176 | using SerializeState = IDataType::SerializeBinaryBulkStatePtr; |
177 | using SerializeStates = std::map<String, SerializeState>; |
178 | SerializeStates serialize_states; |
179 | |
180 | IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams); |
181 | |
182 | void writeData(const String & name, const IDataType & type, const IColumn & column, |
183 | MarksForColumns & out_marks, |
184 | WrittenStreams & written_streams); |
185 | |
186 | void writeMarks(MarksForColumns && marks); |
187 | }; |
188 | |
189 | |
190 | Block LogBlockInputStream::readImpl() |
191 | { |
192 | Block res; |
193 | |
194 | if (rows_read == rows_limit) |
195 | return res; |
196 | |
197 | /// If there are no files in the folder, the table is empty. |
198 | if (Poco::DirectoryIterator(storage.getFullPath()) == Poco::DirectoryIterator()) |
199 | return res; |
200 | |
201 | /// How many rows to read for the next block. |
202 | size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read); |
203 | |
204 | for (const auto & name_type : columns) |
205 | { |
206 | MutableColumnPtr column = name_type.type->createColumn(); |
207 | |
208 | try |
209 | { |
210 | readData(name_type.name, *name_type.type, *column, max_rows_to_read); |
211 | } |
212 | catch (Exception & e) |
213 | { |
214 | e.addMessage("while reading column " + name_type.name + " at " + storage.path); |
215 | throw; |
216 | } |
217 | |
218 | if (column->size()) |
219 | res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name)); |
220 | } |
221 | |
222 | if (res) |
223 | rows_read += res.rows(); |
224 | |
225 | if (!res || rows_read == rows_limit) |
226 | { |
227 | /** Close the files (before destroying the object). |
228 | * When many sources are created, but simultaneously reading only a few of them, |
229 | * buffers don't waste memory. |
230 | */ |
231 | streams.clear(); |
232 | } |
233 | |
234 | return Nested::flatten(res); |
235 | } |
236 | |
237 | |
238 | void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read) |
239 | { |
240 | IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint. |
241 | |
242 | auto createStringGetter = [&](bool stream_for_prefix) |
243 | { |
244 | return [&, stream_for_prefix] (const IDataType::SubstreamPath & path) -> ReadBuffer * |
245 | { |
246 | String stream_name = IDataType::getFileNameForStream(name, path); |
247 | |
248 | const auto & file_it = storage.files.find(stream_name); |
249 | if (storage.files.end() == file_it) |
250 | throw Exception("Logical error: no information about file " + stream_name + " in StorageLog" , ErrorCodes::LOGICAL_ERROR); |
251 | |
252 | UInt64 offset = 0; |
253 | if (!stream_for_prefix && mark_number) |
254 | offset = file_it->second.marks[mark_number].offset; |
255 | |
256 | auto & data_file_path = file_it->second.data_file.path(); |
257 | auto it = streams.try_emplace(stream_name, data_file_path, offset, max_read_buffer_size).first; |
258 | return &it->second.compressed; |
259 | }; |
260 | }; |
261 | |
262 | if (deserialize_states.count(name) == 0) |
263 | { |
264 | settings.getter = createStringGetter(true); |
265 | type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); |
266 | } |
267 | |
268 | settings.getter = createStringGetter(false); |
269 | type.deserializeBinaryBulkWithMultipleStreams(column, max_rows_to_read, settings, deserialize_states[name]); |
270 | } |
271 | |
272 | |
273 | void LogBlockOutputStream::write(const Block & block) |
274 | { |
275 | storage.check(block, true); |
276 | |
277 | /// The set of written offset columns so that you do not write shared offsets of columns for nested structures multiple times |
278 | WrittenStreams written_streams; |
279 | |
280 | MarksForColumns marks; |
281 | marks.reserve(storage.file_count); |
282 | |
283 | for (size_t i = 0; i < block.columns(); ++i) |
284 | { |
285 | const ColumnWithTypeAndName & column = block.safeGetByPosition(i); |
286 | writeData(column.name, *column.type, *column.column, marks, written_streams); |
287 | } |
288 | |
289 | writeMarks(std::move(marks)); |
290 | } |
291 | |
292 | |
293 | void LogBlockOutputStream::writeSuffix() |
294 | { |
295 | if (done) |
296 | return; |
297 | done = true; |
298 | |
299 | WrittenStreams written_streams; |
300 | IDataType::SerializeBinaryBulkSettings settings; |
301 | for (const auto & column : getHeader()) |
302 | { |
303 | auto it = serialize_states.find(column.name); |
304 | if (it != serialize_states.end()) |
305 | { |
306 | settings.getter = createStreamGetter(column.name, written_streams); |
307 | column.type->serializeBinaryBulkStateSuffix(settings, it->second); |
308 | } |
309 | } |
310 | |
311 | /// Finish write. |
312 | marks_stream.next(); |
313 | |
314 | for (auto & name_stream : streams) |
315 | name_stream.second.finalize(); |
316 | |
317 | std::vector<Poco::File> column_files; |
318 | for (const auto & name_stream : streams) |
319 | column_files.push_back(storage.files[name_stream.first].data_file); |
320 | column_files.push_back(storage.marks_file); |
321 | |
322 | storage.file_checker.update(column_files.begin(), column_files.end()); |
323 | |
324 | streams.clear(); |
325 | } |
326 | |
327 | |
328 | IDataType::OutputStreamGetter LogBlockOutputStream::createStreamGetter(const String & name, |
329 | WrittenStreams & written_streams) |
330 | { |
331 | return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer * |
332 | { |
333 | String stream_name = IDataType::getFileNameForStream(name, path); |
334 | if (written_streams.count(stream_name)) |
335 | return nullptr; |
336 | |
337 | auto it = streams.find(stream_name); |
338 | if (streams.end() == it) |
339 | throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream" , |
340 | ErrorCodes::LOGICAL_ERROR); |
341 | return &it->second.compressed; |
342 | }; |
343 | } |
344 | |
345 | |
346 | void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, |
347 | MarksForColumns & out_marks, |
348 | WrittenStreams & written_streams) |
349 | { |
350 | IDataType::SerializeBinaryBulkSettings settings; |
351 | |
352 | type.enumerateStreams([&] (const IDataType::SubstreamPath & path) |
353 | { |
354 | String stream_name = IDataType::getFileNameForStream(name, path); |
355 | if (written_streams.count(stream_name)) |
356 | return; |
357 | |
358 | const auto & columns = storage.getColumns(); |
359 | streams.try_emplace( |
360 | stream_name, |
361 | storage.files[stream_name].data_file.path(), |
362 | columns.getCodecOrDefault(name), |
363 | storage.max_compress_block_size); |
364 | }, settings.path); |
365 | |
366 | settings.getter = createStreamGetter(name, written_streams); |
367 | |
368 | if (serialize_states.count(name) == 0) |
369 | type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]); |
370 | |
371 | type.enumerateStreams([&] (const IDataType::SubstreamPath & path) |
372 | { |
373 | String stream_name = IDataType::getFileNameForStream(name, path); |
374 | if (written_streams.count(stream_name)) |
375 | return; |
376 | |
377 | const auto & file = storage.files[stream_name]; |
378 | const auto stream_it = streams.find(stream_name); |
379 | |
380 | Mark mark; |
381 | mark.rows = (file.marks.empty() ? 0 : file.marks.back().rows) + column.size(); |
382 | mark.offset = stream_it->second.plain_offset + stream_it->second.plain.count(); |
383 | |
384 | out_marks.emplace_back(file.column_index, mark); |
385 | }, settings.path); |
386 | |
387 | type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]); |
388 | |
389 | type.enumerateStreams([&] (const IDataType::SubstreamPath & path) |
390 | { |
391 | String stream_name = IDataType::getFileNameForStream(name, path); |
392 | if (!written_streams.emplace(stream_name).second) |
393 | return; |
394 | |
395 | auto it = streams.find(stream_name); |
396 | if (streams.end() == it) |
397 | throw Exception("Logical error: stream was not created when writing data in LogBlockOutputStream" , ErrorCodes::LOGICAL_ERROR); |
398 | it->second.compressed.next(); |
399 | }, settings.path); |
400 | } |
401 | |
402 | |
403 | void LogBlockOutputStream::writeMarks(MarksForColumns && marks) |
404 | { |
405 | if (marks.size() != storage.file_count) |
406 | throw Exception("Wrong number of marks generated from block. Makes no sense." , ErrorCodes::LOGICAL_ERROR); |
407 | |
408 | std::sort(marks.begin(), marks.end(), [](const auto & a, const auto & b) { return a.first < b.first; }); |
409 | |
410 | for (const auto & mark : marks) |
411 | { |
412 | writeIntBinary(mark.second.rows, marks_stream); |
413 | writeIntBinary(mark.second.offset, marks_stream); |
414 | |
415 | size_t column_index = mark.first; |
416 | storage.files[storage.column_names_by_idx[column_index]].marks.push_back(mark.second); |
417 | } |
418 | } |
419 | |
420 | StorageLog::StorageLog( |
421 | const std::string & relative_path_, |
422 | const std::string & database_name_, |
423 | const std::string & table_name_, |
424 | const ColumnsDescription & columns_, |
425 | const ConstraintsDescription & constraints_, |
426 | size_t max_compress_block_size_, |
427 | const Context & context_) |
428 | : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_), |
429 | max_compress_block_size(max_compress_block_size_), |
430 | file_checker(path + "sizes.json" ) |
431 | { |
432 | setColumns(columns_); |
433 | setConstraints(constraints_); |
434 | |
435 | if (relative_path_.empty()) |
436 | throw Exception("Storage " + getName() + " requires data path" , ErrorCodes::INCORRECT_FILE_NAME); |
437 | |
438 | /// create files if they do not exist |
439 | Poco::File(path).createDirectories(); |
440 | |
441 | for (const auto & column : getColumns().getAllPhysical()) |
442 | addFiles(column.name, *column.type); |
443 | |
444 | marks_file = Poco::File(path + DBMS_STORAGE_LOG_MARKS_FILE_NAME); |
445 | } |
446 | |
447 | |
448 | void StorageLog::addFiles(const String & column_name, const IDataType & type) |
449 | { |
450 | if (files.end() != files.find(column_name)) |
451 | throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog." , |
452 | ErrorCodes::DUPLICATE_COLUMN); |
453 | |
454 | IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path) |
455 | { |
456 | String stream_name = IDataType::getFileNameForStream(column_name, substream_path); |
457 | |
458 | if (!files.count(stream_name)) |
459 | { |
460 | ColumnData & column_data = files[stream_name]; |
461 | column_data.column_index = file_count; |
462 | column_data.data_file = Poco::File{ |
463 | path + stream_name + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION}; |
464 | |
465 | column_names_by_idx.push_back(stream_name); |
466 | ++file_count; |
467 | } |
468 | }; |
469 | |
470 | IDataType::SubstreamPath substream_path; |
471 | type.enumerateStreams(stream_callback, substream_path); |
472 | } |
473 | |
474 | |
475 | void StorageLog::loadMarks() |
476 | { |
477 | std::unique_lock<std::shared_mutex> lock(rwlock); |
478 | |
479 | if (loaded_marks) |
480 | return; |
481 | |
482 | using FilesByIndex = std::vector<Files_t::iterator>; |
483 | |
484 | FilesByIndex files_by_index(file_count); |
485 | for (Files_t::iterator it = files.begin(); it != files.end(); ++it) |
486 | files_by_index[it->second.column_index] = it; |
487 | |
488 | if (marks_file.exists()) |
489 | { |
490 | size_t file_size = marks_file.getSize(); |
491 | if (file_size % (file_count * sizeof(Mark)) != 0) |
492 | throw Exception("Size of marks file is inconsistent" , ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT); |
493 | |
494 | size_t marks_count = file_size / (file_count * sizeof(Mark)); |
495 | |
496 | for (auto & file : files_by_index) |
497 | file->second.marks.reserve(marks_count); |
498 | |
499 | ReadBufferFromFile marks_rb(marks_file.path(), 32768); |
500 | while (!marks_rb.eof()) |
501 | { |
502 | for (size_t i = 0; i < files_by_index.size(); ++i) |
503 | { |
504 | Mark mark; |
505 | readIntBinary(mark.rows, marks_rb); |
506 | readIntBinary(mark.offset, marks_rb); |
507 | files_by_index[i]->second.marks.push_back(mark); |
508 | } |
509 | } |
510 | } |
511 | |
512 | loaded_marks = true; |
513 | } |
514 | |
515 | |
516 | void StorageLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
517 | { |
518 | std::unique_lock<std::shared_mutex> lock(rwlock); |
519 | |
520 | /// Rename directory with data. |
521 | String new_path = base_path + new_path_to_table_data; |
522 | Poco::File(path).renameTo(new_path); |
523 | |
524 | path = new_path; |
525 | table_name = new_table_name; |
526 | database_name = new_database_name; |
527 | file_checker.setPath(path + "sizes.json" ); |
528 | |
529 | for (auto & file : files) |
530 | file.second.data_file = Poco::File(path + Poco::Path(file.second.data_file.path()).getFileName()); |
531 | |
532 | marks_file = Poco::File(path + DBMS_STORAGE_LOG_MARKS_FILE_NAME); |
533 | } |
534 | |
535 | void StorageLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
536 | { |
537 | std::shared_lock<std::shared_mutex> lock(rwlock); |
538 | |
539 | String table_dir = path; |
540 | |
541 | files.clear(); |
542 | file_count = 0; |
543 | loaded_marks = false; |
544 | |
545 | std::vector<Poco::File> data_files; |
546 | Poco::File(table_dir).list(data_files); |
547 | |
548 | for (auto & file : data_files) |
549 | file.remove(false); |
550 | |
551 | for (const auto & column : getColumns().getAllPhysical()) |
552 | addFiles(column.name, *column.type); |
553 | |
554 | file_checker = FileChecker{table_dir + "/" + "sizes.json" }; |
555 | marks_file = Poco::File(table_dir + "/" + DBMS_STORAGE_LOG_MARKS_FILE_NAME); |
556 | } |
557 | |
558 | |
559 | const StorageLog::Marks & StorageLog::getMarksWithRealRowCount() const |
560 | { |
561 | const String & column_name = getColumns().begin()->name; |
562 | const IDataType & column_type = *getColumns().begin()->type; |
563 | String filename; |
564 | |
565 | /** We take marks from first column. |
566 | * If this is a data type with multiple stream, get the first stream, that we assume have real row count. |
567 | * (Example: for Array data type, first stream is array sizes; and number of array sizes is the number of arrays). |
568 | */ |
569 | IDataType::SubstreamPath substream_root_path; |
570 | column_type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path) |
571 | { |
572 | if (filename.empty()) |
573 | filename = IDataType::getFileNameForStream(column_name, substream_path); |
574 | }, substream_root_path); |
575 | |
576 | Files_t::const_iterator it = files.find(filename); |
577 | if (files.end() == it) |
578 | throw Exception("Cannot find file " + filename, ErrorCodes::LOGICAL_ERROR); |
579 | |
580 | return it->second.marks; |
581 | } |
582 | |
583 | BlockInputStreams StorageLog::read( |
584 | const Names & column_names, |
585 | const SelectQueryInfo & /*query_info*/, |
586 | const Context & context, |
587 | QueryProcessingStage::Enum /*processed_stage*/, |
588 | size_t max_block_size, |
589 | unsigned num_streams) |
590 | { |
591 | check(column_names); |
592 | loadMarks(); |
593 | |
594 | NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names)); |
595 | |
596 | std::shared_lock<std::shared_mutex> lock(rwlock); |
597 | |
598 | BlockInputStreams res; |
599 | |
600 | const Marks & marks = getMarksWithRealRowCount(); |
601 | size_t marks_size = marks.size(); |
602 | |
603 | if (num_streams > marks_size) |
604 | num_streams = marks_size; |
605 | |
606 | size_t max_read_buffer_size = context.getSettingsRef().max_read_buffer_size; |
607 | |
608 | for (size_t stream = 0; stream < num_streams; ++stream) |
609 | { |
610 | size_t mark_begin = stream * marks_size / num_streams; |
611 | size_t mark_end = (stream + 1) * marks_size / num_streams; |
612 | |
613 | size_t rows_begin = mark_begin ? marks[mark_begin - 1].rows : 0; |
614 | size_t rows_end = mark_end ? marks[mark_end - 1].rows : 0; |
615 | |
616 | res.emplace_back(std::make_shared<LogBlockInputStream>( |
617 | max_block_size, |
618 | all_columns, |
619 | *this, |
620 | mark_begin, |
621 | rows_end - rows_begin, |
622 | max_read_buffer_size)); |
623 | } |
624 | |
625 | return res; |
626 | } |
627 | |
628 | BlockOutputStreamPtr StorageLog::write( |
629 | const ASTPtr & /*query*/, const Context & /*context*/) |
630 | { |
631 | loadMarks(); |
632 | return std::make_shared<LogBlockOutputStream>(*this); |
633 | } |
634 | |
635 | CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */) |
636 | { |
637 | std::shared_lock<std::shared_mutex> lock(rwlock); |
638 | return file_checker.check(); |
639 | } |
640 | |
641 | |
642 | void registerStorageLog(StorageFactory & factory) |
643 | { |
644 | factory.registerStorage("Log" , [](const StorageFactory::Arguments & args) |
645 | { |
646 | if (!args.engine_args.empty()) |
647 | throw Exception( |
648 | "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)" , |
649 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
650 | |
651 | return StorageLog::create( |
652 | args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints, |
653 | args.context.getSettings().max_compress_block_size, args.context); |
654 | }); |
655 | } |
656 | |
657 | } |
658 | |