1#include <boost/range/algorithm_ext/erase.hpp>
2#include <Interpreters/InterpreterSelectQuery.h>
3#include <Interpreters/InterpreterInsertQuery.h>
4#include <Interpreters/InterpreterAlterQuery.h>
5#include <Interpreters/castColumn.h>
6#include <Interpreters/evaluateConstantExpression.h>
7#include <DataStreams/AddingMissedBlockInputStream.h>
8#include <DataStreams/ConvertingBlockInputStream.h>
9#include <DataStreams/IBlockInputStream.h>
10#include <Databases/IDatabase.h>
11#include <Storages/StorageBuffer.h>
12#include <Storages/StorageFactory.h>
13#include <Storages/AlterCommands.h>
14#include <Parsers/ASTInsertQuery.h>
15#include <Parsers/ASTIdentifier.h>
16#include <Parsers/ASTLiteral.h>
17#include <Parsers/ASTExpressionList.h>
18#include <Common/setThreadName.h>
19#include <Common/CurrentMetrics.h>
20#include <Common/MemoryTracker.h>
21#include <Common/FieldVisitors.h>
22#include <Common/quoteString.h>
23#include <Common/typeid_cast.h>
24#include <Common/ProfileEvents.h>
25#include <common/logger_useful.h>
26#include <common/getThreadNumber.h>
27#include <ext/range.h>
28#include <DataStreams/FilterBlockInputStream.h>
29#include <DataStreams/ExpressionBlockInputStream.h>
30
31
32namespace ProfileEvents
33{
34 extern const Event StorageBufferFlush;
35 extern const Event StorageBufferErrorOnFlush;
36 extern const Event StorageBufferPassedAllMinThresholds;
37 extern const Event StorageBufferPassedTimeMaxThreshold;
38 extern const Event StorageBufferPassedRowsMaxThreshold;
39 extern const Event StorageBufferPassedBytesMaxThreshold;
40}
41
42namespace CurrentMetrics
43{
44 extern const Metric StorageBufferRows;
45 extern const Metric StorageBufferBytes;
46}
47
48
49namespace DB
50{
51
52namespace ErrorCodes
53{
54 extern const int INFINITE_LOOP;
55 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
56}
57
58
59StorageBuffer::StorageBuffer(const std::string & database_name_, const std::string & table_name_,
60 const ColumnsDescription & columns_, const ConstraintsDescription & constraints_,
61 Context & context_,
62 size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_,
63 const String & destination_database_, const String & destination_table_, bool allow_materialized_)
64 :
65 table_name(table_name_), database_name(database_name_), global_context(context_),
66 num_shards(num_shards_), buffers(num_shards_),
67 min_thresholds(min_thresholds_), max_thresholds(max_thresholds_),
68 destination_database(destination_database_), destination_table(destination_table_),
69 no_destination(destination_database.empty() && destination_table.empty()),
70 allow_materialized(allow_materialized_), log(&Logger::get("StorageBuffer (" + table_name + ")"))
71{
72 setColumns(columns_);
73 setConstraints(constraints_);
74}
75
76StorageBuffer::~StorageBuffer()
77{
78 // Should not happen if shutdown was called
79 if (flush_thread.joinable())
80 {
81 shutdown_event.set();
82 flush_thread.join();
83 }
84}
85
86
87/// Reads from one buffer (from one block) under its mutex.
88class BufferBlockInputStream : public IBlockInputStream
89{
90public:
91 BufferBlockInputStream(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage_)
92 : column_names(column_names_.begin(), column_names_.end()), buffer(buffer_), storage(storage_) {}
93
94 String getName() const override { return "Buffer"; }
95
96 Block getHeader() const override { return storage.getSampleBlockForColumns(column_names); }
97
98protected:
99 Block readImpl() override
100 {
101 Block res;
102
103 if (has_been_read)
104 return res;
105 has_been_read = true;
106
107 std::lock_guard lock(buffer.mutex);
108
109 if (!buffer.data.rows())
110 return res;
111
112 for (const auto & name : column_names)
113 res.insert(buffer.data.getByName(name));
114
115 return res;
116 }
117
118private:
119 Names column_names;
120 StorageBuffer::Buffer & buffer;
121 const StorageBuffer & storage;
122 bool has_been_read = false;
123};
124
125
126QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const
127{
128 if (!no_destination)
129 {
130 auto destination = context.getTable(destination_database, destination_table);
131
132 if (destination.get() == this)
133 throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
134
135 return destination->getQueryProcessingStage(context);
136 }
137
138 return QueryProcessingStage::FetchColumns;
139}
140
141BlockInputStreams StorageBuffer::read(
142 const Names & column_names,
143 const SelectQueryInfo & query_info,
144 const Context & context,
145 QueryProcessingStage::Enum processed_stage,
146 size_t max_block_size,
147 unsigned num_streams)
148{
149 BlockInputStreams streams_from_dst;
150
151 if (!no_destination)
152 {
153 auto destination = context.getTable(destination_database, destination_table);
154
155 if (destination.get() == this)
156 throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
157
158 auto destination_lock = destination->lockStructureForShare(false, context.getCurrentQueryId());
159
160 const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
161 {
162 return destination->hasColumn(column_name) &&
163 destination->getColumn(column_name).type->equals(*getColumn(column_name).type);
164 });
165
166 if (dst_has_same_structure)
167 {
168 if (query_info.order_by_optimizer)
169 query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(destination);
170
171 /// The destination table has the same structure of the requested columns and we can simply read blocks from there.
172 streams_from_dst = destination->read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
173 }
174 else
175 {
176 /// There is a struct mismatch and we need to convert read blocks from the destination table.
177 const Block header = getSampleBlock();
178 Names columns_intersection = column_names;
179 Block header_after_adding_defaults = header;
180 for (const String & column_name : column_names)
181 {
182 if (!destination->hasColumn(column_name))
183 {
184 LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
185 << " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used.");
186 boost::range::remove_erase(columns_intersection, column_name);
187 continue;
188 }
189 const auto & dst_col = destination->getColumn(column_name);
190 const auto & col = getColumn(column_name);
191 if (!dst_col.type->equals(*col.type))
192 {
193 LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
194 << " has different type of column " << backQuoteIfNeed(column_name) << " ("
195 << dst_col.type->getName() << " != " << col.type->getName() << "). Data from destination table are converted.");
196 header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name);
197 }
198 }
199
200 if (columns_intersection.empty())
201 {
202 LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
203 << " has no common columns with block in buffer. Block of data is skipped.");
204 }
205 else
206 {
207 streams_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
208 for (auto & stream : streams_from_dst)
209 {
210 stream = std::make_shared<AddingMissedBlockInputStream>(
211 stream, header_after_adding_defaults, getColumns().getDefaults(), context);
212 stream = std::make_shared<ConvertingBlockInputStream>(
213 context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
214 }
215 }
216 }
217
218 for (auto & stream : streams_from_dst)
219 stream->addTableLock(destination_lock);
220 }
221
222 BlockInputStreams streams_from_buffers;
223 streams_from_buffers.reserve(num_shards);
224 for (auto & buf : buffers)
225 streams_from_buffers.push_back(std::make_shared<BufferBlockInputStream>(column_names, buf, *this));
226
227 /** If the sources from the table were processed before some non-initial stage of query execution,
228 * then sources from the buffers must also be wrapped in the processing pipeline before the same stage.
229 */
230 if (processed_stage > QueryProcessingStage::FetchColumns)
231 for (auto & stream : streams_from_buffers)
232 stream = InterpreterSelectQuery(query_info.query, context, stream, SelectQueryOptions(processed_stage)).execute().in;
233
234 if (query_info.prewhere_info)
235 {
236 for (auto & stream : streams_from_buffers)
237 stream = std::make_shared<FilterBlockInputStream>(stream, query_info.prewhere_info->prewhere_actions,
238 query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column);
239
240 if (query_info.prewhere_info->alias_actions)
241 {
242 for (auto & stream : streams_from_buffers)
243 stream = std::make_shared<ExpressionBlockInputStream>(stream, query_info.prewhere_info->alias_actions);
244
245 }
246 }
247
248 streams_from_dst.insert(streams_from_dst.end(), streams_from_buffers.begin(), streams_from_buffers.end());
249 return streams_from_dst;
250}
251
252
253static void appendBlock(const Block & from, Block & to)
254{
255 if (!to)
256 throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);
257
258 assertBlocksHaveEqualStructure(from, to, "Buffer");
259
260 from.checkNumberOfRows();
261 to.checkNumberOfRows();
262
263 size_t rows = from.rows();
264 size_t bytes = from.bytes();
265
266 CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
267 CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, bytes);
268
269 size_t old_rows = to.rows();
270
271 auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
272
273 try
274 {
275 for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
276 {
277 const IColumn & col_from = *from.getByPosition(column_no).column.get();
278 MutableColumnPtr col_to = (*std::move(to.getByPosition(column_no).column)).mutate();
279
280 col_to->insertRangeFrom(col_from, 0, rows);
281
282 to.getByPosition(column_no).column = std::move(col_to);
283 }
284 }
285 catch (...)
286 {
287 /// Rollback changes.
288 try
289 {
290 for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
291 {
292 ColumnPtr & col_to = to.getByPosition(column_no).column;
293 if (col_to->size() != old_rows)
294 col_to = (*std::move(col_to)).mutate()->cut(0, old_rows);
295 }
296 }
297 catch (...)
298 {
299 /// In case when we cannot rollback, do not leave incorrect state in memory.
300 std::terminate();
301 }
302
303 throw;
304 }
305}
306
307
308class BufferBlockOutputStream : public IBlockOutputStream
309{
310public:
311 explicit BufferBlockOutputStream(StorageBuffer & storage_) : storage(storage_) {}
312
313 Block getHeader() const override { return storage.getSampleBlock(); }
314
315 void write(const Block & block) override
316 {
317 if (!block)
318 return;
319
320 // Check table structure.
321 storage.check(block, true);
322
323 size_t rows = block.rows();
324 if (!rows)
325 return;
326
327 StoragePtr destination;
328 if (!storage.no_destination)
329 {
330 destination = storage.global_context.tryGetTable(storage.destination_database, storage.destination_table);
331 if (destination.get() == &storage)
332 throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
333 }
334
335 size_t bytes = block.bytes();
336
337 /// If the block already exceeds the maximum limit, then we skip the buffer.
338 if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes)
339 {
340 if (!storage.no_destination)
341 {
342 LOG_TRACE(storage.log, "Writing block with " << rows << " rows, " << bytes << " bytes directly.");
343 storage.writeBlockToDestination(block, destination);
344 }
345 return;
346 }
347
348 /// We distribute the load on the shards by the stream number.
349 const auto start_shard_num = getThreadNumber() % storage.num_shards;
350
351 /// We loop through the buffers, trying to lock mutex. No more than one lap.
352 auto shard_num = start_shard_num;
353
354 StorageBuffer::Buffer * least_busy_buffer = nullptr;
355 std::unique_lock<std::mutex> least_busy_lock;
356 size_t least_busy_shard_rows = 0;
357
358 for (size_t try_no = 0; try_no < storage.num_shards; ++try_no)
359 {
360 std::unique_lock lock(storage.buffers[shard_num].mutex, std::try_to_lock);
361
362 if (lock.owns_lock())
363 {
364 size_t num_rows = storage.buffers[shard_num].data.rows();
365 if (!least_busy_buffer || num_rows < least_busy_shard_rows)
366 {
367 least_busy_buffer = &storage.buffers[shard_num];
368 least_busy_lock = std::move(lock);
369 least_busy_shard_rows = num_rows;
370 }
371 }
372
373 shard_num = (shard_num + 1) % storage.num_shards;
374 }
375
376 /// If you still can not lock anything at once, then we'll wait on mutex.
377 if (!least_busy_buffer)
378 {
379 least_busy_buffer = &storage.buffers[start_shard_num];
380 least_busy_lock = std::unique_lock(least_busy_buffer->mutex);
381 }
382 insertIntoBuffer(block, *least_busy_buffer);
383 }
384private:
385 StorageBuffer & storage;
386
387 void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
388 {
389 time_t current_time = time(nullptr);
390
391 /// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
392 Block sorted_block = block.sortColumns();
393
394 if (!buffer.data)
395 {
396 buffer.data = sorted_block.cloneEmpty();
397 }
398 else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes()))
399 {
400 /** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
401 * This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
402 * an exception will be thrown, and new data will not be added to the buffer.
403 */
404
405 storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */);
406 }
407
408 if (!buffer.first_write_time)
409 buffer.first_write_time = current_time;
410
411 appendBlock(sorted_block, buffer.data);
412 }
413};
414
415
416BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const Context & /*context*/)
417{
418 return std::make_shared<BufferBlockOutputStream>(*this);
419}
420
421
422bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const
423{
424 if (no_destination)
425 return false;
426
427 auto destination = global_context.getTable(destination_database, destination_table);
428
429 if (destination.get() == this)
430 throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
431
432 return destination->mayBenefitFromIndexForIn(left_in_operand, query_context);
433}
434
435
436void StorageBuffer::startup()
437{
438 if (global_context.getSettingsRef().readonly)
439 {
440 LOG_WARNING(log, "Storage " << getName() << " is run with readonly settings, it will not be able to insert data."
441 << " Set apropriate system_profile to fix this.");
442 }
443
444 flush_thread = ThreadFromGlobalPool(&StorageBuffer::flushThread, this);
445}
446
447
448void StorageBuffer::shutdown()
449{
450 shutdown_event.set();
451
452 if (flush_thread.joinable())
453 flush_thread.join();
454
455 try
456 {
457 optimize(nullptr /*query*/, {} /*partition*/, false /*final*/, false /*deduplicate*/, global_context);
458 }
459 catch (...)
460 {
461 tryLogCurrentException(__PRETTY_FUNCTION__);
462 }
463}
464
465
466/** NOTE If you do OPTIMIZE after insertion,
467 * it does not guarantee, that all data will be in destination table at the time of next SELECT just after OPTIMIZE.
468 *
469 * Because in case if there was already running flushBuffer method,
470 * then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly,
471 * but at the same time, the already running flushBuffer method possibly is not finished,
472 * so next SELECT will observe missing data.
473 *
474 * This kind of race condition make very hard to implement proper tests.
475 */
476bool StorageBuffer::optimize(const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & /*context*/)
477{
478 if (partition)
479 throw Exception("Partition cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
480
481 if (final)
482 throw Exception("FINAL cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
483
484 if (deduplicate)
485 throw Exception("DEDUPLICATE cannot be specified when optimizing table of type Buffer", ErrorCodes::NOT_IMPLEMENTED);
486
487 flushAllBuffers(false);
488 return true;
489}
490
491
492bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
493{
494 time_t time_passed = 0;
495 if (buffer.first_write_time)
496 time_passed = current_time - buffer.first_write_time;
497
498 size_t rows = buffer.data.rows() + additional_rows;
499 size_t bytes = buffer.data.bytes() + additional_bytes;
500
501 return checkThresholdsImpl(rows, bytes, time_passed);
502}
503
504
505bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
506{
507 if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
508 {
509 ProfileEvents::increment(ProfileEvents::StorageBufferPassedAllMinThresholds);
510 return true;
511 }
512
513 if (time_passed > max_thresholds.time)
514 {
515 ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeMaxThreshold);
516 return true;
517 }
518
519 if (rows > max_thresholds.rows)
520 {
521 ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsMaxThreshold);
522 return true;
523 }
524
525 if (bytes > max_thresholds.bytes)
526 {
527 ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesMaxThreshold);
528 return true;
529 }
530
531 return false;
532}
533
534
535void StorageBuffer::flushAllBuffers(const bool check_thresholds)
536{
537 for (auto & buf : buffers)
538 flushBuffer(buf, check_thresholds);
539}
540
541
542void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked)
543{
544 Block block_to_write;
545 time_t current_time = time(nullptr);
546
547 size_t rows = 0;
548 size_t bytes = 0;
549 time_t time_passed = 0;
550
551 std::unique_lock lock(buffer.mutex, std::defer_lock);
552 if (!locked)
553 lock.lock();
554
555 block_to_write = buffer.data.cloneEmpty();
556
557 rows = buffer.data.rows();
558 bytes = buffer.data.bytes();
559 if (buffer.first_write_time)
560 time_passed = current_time - buffer.first_write_time;
561
562 if (check_thresholds)
563 {
564 if (!checkThresholdsImpl(rows, bytes, time_passed))
565 return;
566 }
567 else
568 {
569 if (rows == 0)
570 return;
571 }
572
573 buffer.data.swap(block_to_write);
574 buffer.first_write_time = 0;
575
576 CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
577 CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
578
579 ProfileEvents::increment(ProfileEvents::StorageBufferFlush);
580
581 LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
582
583 if (no_destination)
584 return;
585
586 /** For simplicity, buffer is locked during write.
587 * We could unlock buffer temporary, but it would lead to too many difficulties:
588 * - data, that is written, will not be visible for SELECTs;
589 * - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written;
590 * - this could lead to infinite memory growth.
591 */
592 try
593 {
594 writeBlockToDestination(block_to_write, global_context.tryGetTable(destination_database, destination_table));
595 }
596 catch (...)
597 {
598 ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
599
600 /// Return the block to its place in the buffer.
601
602 CurrentMetrics::add(CurrentMetrics::StorageBufferRows, block_to_write.rows());
603 CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
604
605 buffer.data.swap(block_to_write);
606
607 if (!buffer.first_write_time)
608 buffer.first_write_time = current_time;
609
610 /// After a while, the next write attempt will happen.
611 throw;
612 }
613}
614
615
616void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
617{
618 if (no_destination || !block)
619 return;
620
621 if (!table)
622 {
623 LOG_ERROR(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) << " doesn't exist. Block of data is discarded.");
624 return;
625 }
626
627 auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
628
629 auto insert = std::make_shared<ASTInsertQuery>();
630
631 insert->database = destination_database;
632 insert->table = destination_table;
633
634 /** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
635 * This will support some of the cases (but not all) when the table structure does not match.
636 */
637 Block structure_of_destination_table = allow_materialized ? table->getSampleBlock() : table->getSampleBlockNonMaterialized();
638 Block block_to_write;
639 for (size_t i : ext::range(0, structure_of_destination_table.columns()))
640 {
641 auto dst_col = structure_of_destination_table.getByPosition(i);
642 if (block.has(dst_col.name))
643 {
644 auto column = block.getByName(dst_col.name);
645 if (!column.type->equals(*dst_col.type))
646 {
647 LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
648 << " have different type of column " << backQuoteIfNeed(column.name) << " ("
649 << dst_col.type->getName() << " != " << column.type->getName()
650 << "). Block of data is converted.");
651 column.column = castColumn(column, dst_col.type, global_context);
652 column.type = dst_col.type;
653 }
654
655 block_to_write.insert(column);
656 }
657 }
658
659 if (block_to_write.columns() == 0)
660 {
661 LOG_ERROR(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table)
662 << " have no common columns with block in buffer. Block of data is discarded.");
663 return;
664 }
665
666 if (block_to_write.columns() != block.columns())
667 LOG_WARNING(log, "Not all columns from block in buffer exist in destination table "
668 << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) << ". Some columns are discarded.");
669
670 auto list_of_columns = std::make_shared<ASTExpressionList>();
671 insert->columns = list_of_columns;
672 list_of_columns->children.reserve(block_to_write.columns());
673 for (const auto & column : block_to_write)
674 list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
675
676 InterpreterInsertQuery interpreter{insert, global_context, allow_materialized};
677
678 auto block_io = interpreter.execute();
679 block_io.out->writePrefix();
680 block_io.out->write(block_to_write);
681 block_io.out->writeSuffix();
682}
683
684
685void StorageBuffer::flushThread()
686{
687 setThreadName("BufferFlush");
688
689 do
690 {
691 try
692 {
693 flushAllBuffers(true);
694 }
695 catch (...)
696 {
697 tryLogCurrentException(__PRETTY_FUNCTION__);
698 }
699 } while (!shutdown_event.tryWait(1000));
700}
701
702void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
703{
704 for (const auto & command : commands)
705 {
706 if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
707 && command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
708 throw Exception(
709 "Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
710 ErrorCodes::NOT_IMPLEMENTED);
711 }
712}
713
714
715void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
716{
717 lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
718
719 const String database_name_ = getDatabaseName();
720 const String table_name_ = getTableName();
721
722 /// So that no blocks of the old structure remain.
723 optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
724
725 StorageInMemoryMetadata metadata = getInMemoryMetadata();
726 params.apply(metadata);
727 context.getDatabase(database_name_)->alterTable(context, table_name_, metadata);
728 setColumns(std::move(metadata.columns));
729}
730
731
732void registerStorageBuffer(StorageFactory & factory)
733{
734 /** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
735 *
736 * db, table - in which table to put data from buffer.
737 * num_buckets - level of parallelism.
738 * min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
739 */
740
741 factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
742 {
743 ASTs & engine_args = args.engine_args;
744
745 if (engine_args.size() != 9)
746 throw Exception("Storage Buffer requires 9 parameters: "
747 " destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
748 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
749
750 engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
751 engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
752
753 String destination_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
754 String destination_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
755
756 UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[2]->as<ASTLiteral &>().value);
757
758 Int64 min_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[3]->as<ASTLiteral &>().value);
759 Int64 max_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[4]->as<ASTLiteral &>().value);
760 UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[5]->as<ASTLiteral &>().value);
761 UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[6]->as<ASTLiteral &>().value);
762 UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
763 UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
764
765 return StorageBuffer::create(
766 args.database_name,
767 args.table_name, args.columns, args.constraints,
768 args.context,
769 num_buckets,
770 StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
771 StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
772 destination_database, destination_table,
773 static_cast<bool>(args.local_context.getSettingsRef().insert_allow_materialized_columns));
774 });
775}
776
777}
778