1/* Copyright (c) 2018 BlackBerry Limited
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6http://www.apache.org/licenses/LICENSE-2.0
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License. */
12
13#include <Parsers/ASTSelectQuery.h>
14#include <Parsers/ASTCreateQuery.h>
15#include <Parsers/ASTWatchQuery.h>
16#include <Parsers/ASTDropQuery.h>
17#include <Parsers/ASTIdentifier.h>
18#include <Parsers/ASTLiteral.h>
19#include <Interpreters/Context.h>
20#include <Interpreters/InterpreterDropQuery.h>
21#include <Interpreters/InterpreterSelectQuery.h>
22#include <DataStreams/IBlockOutputStream.h>
23#include <DataStreams/OneBlockInputStream.h>
24#include <DataStreams/BlocksBlockInputStream.h>
25#include <DataStreams/MaterializingBlockInputStream.h>
26#include <DataStreams/SquashingBlockInputStream.h>
27#include <DataStreams/copyData.h>
28#include <Common/typeid_cast.h>
29#include <Common/SipHash.h>
30
31#include <Storages/LiveView/StorageLiveView.h>
32#include <Storages/LiveView/LiveViewBlockInputStream.h>
33#include <Storages/LiveView/LiveViewBlockOutputStream.h>
34#include <Storages/LiveView/LiveViewEventsBlockInputStream.h>
35#include <Storages/LiveView/ProxyStorage.h>
36
37#include <Storages/StorageFactory.h>
38#include <Parsers/ASTTablesInSelectQuery.h>
39#include <Parsers/ASTSubquery.h>
40#include <Interpreters/getTableExpressions.h>
41#include <Interpreters/AddDefaultDatabaseVisitor.h>
42
43namespace DB
44{
45
46namespace ErrorCodes
47{
48 extern const int LOGICAL_ERROR;
49 extern const int INCORRECT_QUERY;
50 extern const int TABLE_WAS_NOT_DROPPED;
51 extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW;
52 extern const int SUPPORT_IS_DISABLED;
53}
54
55static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name)
56{
57 auto db_and_table = getDatabaseAndTable(query, 0);
58 ASTPtr subquery = extractTableExpression(query, 0);
59
60 if (!db_and_table && !subquery)
61 return;
62
63 if (db_and_table)
64 {
65 select_table_name = db_and_table->table;
66
67 if (db_and_table->database.empty())
68 {
69 db_and_table->database = select_database_name;
70 AddDefaultDatabaseVisitor visitor(select_database_name);
71 visitor.visit(query);
72 }
73 else
74 select_database_name = db_and_table->database;
75 }
76 else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>())
77 {
78 if (ast_select->list_of_selects->children.size() != 1)
79 throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
80
81 auto & inner_query = ast_select->list_of_selects->children.at(0);
82
83 extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name);
84 }
85 else
86 throw Exception("Logical error while creating StorageLiveView."
87 " Could not retrieve table name from select query.",
88 DB::ErrorCodes::LOGICAL_ERROR);
89}
90
91
92void StorageLiveView::writeIntoLiveView(
93 StorageLiveView & live_view,
94 const Block & block,
95 const Context & context)
96{
97 BlockOutputStreamPtr output = std::make_shared<LiveViewBlockOutputStream>(live_view);
98
99 /// Check if live view has any readers if not
100 /// just reset blocks to empty and do nothing else
101 /// When first reader comes the blocks will be read.
102 {
103 std::lock_guard lock(live_view.mutex);
104 if (!live_view.hasActiveUsers())
105 {
106 live_view.reset();
107 return;
108 }
109 }
110
111 bool is_block_processed = false;
112 BlockInputStreams from;
113 BlocksPtrs mergeable_blocks;
114 BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
115
116 {
117 std::lock_guard lock(live_view.mutex);
118
119 mergeable_blocks = live_view.getMergeableBlocks();
120 if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
121 {
122 mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
123 BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
124 InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
125 auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
126 interpreter.execute().in);
127 while (Block this_block = view_mergeable_stream->read())
128 base_mergeable_blocks->push_back(this_block);
129 mergeable_blocks->push_back(base_mergeable_blocks);
130 live_view.setMergeableBlocks(mergeable_blocks);
131
132 /// Create from streams
133 for (auto & blocks_ : *mergeable_blocks)
134 {
135 if (blocks_->empty())
136 continue;
137 auto sample_block = blocks_->front().cloneEmpty();
138 BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
139 from.push_back(std::move(stream));
140 }
141
142 is_block_processed = true;
143 }
144 }
145
146 if (!is_block_processed)
147 {
148 auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
149 BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
150 auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns);
151 InterpreterSelectQuery select_block(live_view.getInnerQuery(),
152 context, proxy_storage,
153 QueryProcessingStage::WithMergeableState);
154 auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
155 select_block.execute().in);
156 while (Block this_block = data_mergeable_stream->read())
157 new_mergeable_blocks->push_back(this_block);
158
159 if (new_mergeable_blocks->empty())
160 return;
161
162 {
163 std::lock_guard lock(live_view.mutex);
164
165 mergeable_blocks = live_view.getMergeableBlocks();
166 mergeable_blocks->push_back(new_mergeable_blocks);
167
168 /// Create from streams
169 for (auto & blocks_ : *mergeable_blocks)
170 {
171 if (blocks_->empty())
172 continue;
173 auto sample_block = blocks_->front().cloneEmpty();
174 BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
175 from.push_back(std::move(stream));
176 }
177 }
178 }
179
180 auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
181 auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState);
182 InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete);
183 BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
184
185 /// Squashing is needed here because the view query can generate a lot of blocks
186 /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
187 /// and two-level aggregation is triggered).
188 data = std::make_shared<SquashingBlockInputStream>(
189 data, context.getGlobalContext().getSettingsRef().min_insert_block_size_rows, context.getGlobalContext().getSettingsRef().min_insert_block_size_bytes);
190
191 copyData(*data, *output);
192}
193
194
195StorageLiveView::StorageLiveView(
196 const String & table_name_,
197 const String & database_name_,
198 Context & local_context,
199 const ASTCreateQuery & query,
200 const ColumnsDescription & columns_)
201 : table_name(table_name_),
202 database_name(database_name_), global_context(local_context.getGlobalContext())
203{
204 setColumns(columns_);
205
206 if (!query.select)
207 throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
208
209 /// Default value, if only table name exist in the query
210 select_database_name = local_context.getCurrentDatabase();
211 if (query.select->list_of_selects->children.size() != 1)
212 throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
213
214 inner_query = query.select->list_of_selects->children.at(0);
215
216 ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query);
217 extractDependentTable(select_query, select_database_name, select_table_name);
218
219 /// If the table is not specified - use the table `system.one`
220 if (select_table_name.empty())
221 {
222 select_database_name = "system";
223 select_table_name = "one";
224 }
225
226 global_context.addDependency(
227 DatabaseAndTableName(select_database_name, select_table_name),
228 DatabaseAndTableName(database_name, table_name));
229
230 is_temporary = query.temporary;
231 temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds();
232
233 blocks_ptr = std::make_shared<BlocksPtr>();
234 blocks_metadata_ptr = std::make_shared<BlocksMetadataPtr>();
235 active_ptr = std::make_shared<bool>(true);
236}
237
238NameAndTypePair StorageLiveView::getColumn(const String & column_name) const
239{
240 if (column_name == "_version")
241 return NameAndTypePair("_version", std::make_shared<DataTypeUInt64>());
242
243 return IStorage::getColumn(column_name);
244}
245
246bool StorageLiveView::hasColumn(const String & column_name) const
247{
248 if (column_name == "_version")
249 return true;
250
251 return IStorage::hasColumn(column_name);
252}
253
254Block StorageLiveView::getHeader() const
255{
256 std::lock_guard lock(sample_block_lock);
257
258 if (!sample_block)
259 {
260 auto storage = global_context.getTable(select_database_name, select_table_name);
261 sample_block = InterpreterSelectQuery(inner_query, global_context, storage,
262 SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock();
263 sample_block.insert({DataTypeUInt64().createColumnConst(
264 sample_block.rows(), 0)->convertToFullColumnIfConst(),
265 std::make_shared<DataTypeUInt64>(),
266 "_version"});
267 /// convert all columns to full columns
268 /// in case some of them are constant
269 for (size_t i = 0; i < sample_block.columns(); ++i)
270 {
271 sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
272 }
273 }
274
275 return sample_block;
276}
277
278bool StorageLiveView::getNewBlocks()
279{
280 SipHash hash;
281 UInt128 key;
282 BlocksPtr new_blocks = std::make_shared<Blocks>();
283 BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
284 BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
285
286 InterpreterSelectQuery interpreter(inner_query->clone(), global_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
287 auto mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
288
289 while (Block block = mergeable_stream->read())
290 new_mergeable_blocks->push_back(block);
291
292 mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
293 mergeable_blocks->push_back(new_mergeable_blocks);
294 BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader());
295 auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_database_name, select_table_name), {from}, QueryProcessingStage::WithMergeableState);
296 InterpreterSelectQuery select(inner_query->clone(), global_context, proxy_storage, SelectQueryOptions(QueryProcessingStage::Complete));
297 BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
298
299 /// Squashing is needed here because the view query can generate a lot of blocks
300 /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
301 /// and two-level aggregation is triggered).
302 data = std::make_shared<SquashingBlockInputStream>(
303 data, global_context.getSettingsRef().min_insert_block_size_rows, global_context.getSettingsRef().min_insert_block_size_bytes);
304
305 while (Block block = data->read())
306 {
307 /// calculate hash before virtual column is added
308 block.updateHash(hash);
309 /// add result version meta column
310 block.insert({DataTypeUInt64().createColumnConst(
311 block.rows(), getBlocksVersion() + 1)->convertToFullColumnIfConst(),
312 std::make_shared<DataTypeUInt64>(),
313 "_version"});
314 new_blocks->push_back(block);
315 }
316
317 hash.get128(key.low, key.high);
318
319 /// Update blocks only if hash keys do not match
320 /// NOTE: hash could be different for the same result
321 /// if blocks are not in the same order
322 bool updated = false;
323 {
324 if (getBlocksHashKey() != key.toHexString())
325 {
326 if (new_blocks->empty())
327 {
328 new_blocks->push_back(getHeader());
329 }
330 new_blocks_metadata->hash = key.toHexString();
331 new_blocks_metadata->version = getBlocksVersion() + 1;
332 (*blocks_ptr) = new_blocks;
333 (*blocks_metadata_ptr) = new_blocks_metadata;
334 updated = true;
335 }
336 }
337 return updated;
338}
339
340void StorageLiveView::checkTableCanBeDropped() const
341{
342 Dependencies dependencies = global_context.getDependencies(database_name, table_name);
343 if (!dependencies.empty())
344 {
345 DatabaseAndTableName database_and_table_name = dependencies.front();
346 throw Exception("Table has dependency " + database_and_table_name.first + "." + database_and_table_name.second, ErrorCodes::TABLE_WAS_NOT_DROPPED);
347 }
348}
349
350void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout)
351{
352 bool drop_table = false;
353
354 if (storage->shutdown_called)
355 return;
356
357 {
358 while (1)
359 {
360 std::unique_lock lock(storage->no_users_thread_wakeup_mutex);
361 if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; }))
362 {
363 storage->no_users_thread_wakeup = false;
364 if (storage->shutdown_called)
365 return;
366 if (storage->hasUsers())
367 return;
368 if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty())
369 continue;
370 drop_table = true;
371 }
372 break;
373 }
374 }
375
376 if (drop_table)
377 {
378 if (storage->global_context.tryGetTable(storage->database_name, storage->table_name))
379 {
380 try
381 {
382 /// We create and execute `drop` query for this table
383 auto drop_query = std::make_shared<ASTDropQuery>();
384 drop_query->database = storage->database_name;
385 drop_query->table = storage->table_name;
386 drop_query->kind = ASTDropQuery::Kind::Drop;
387 ASTPtr ast_drop_query = drop_query;
388 InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context);
389 drop_interpreter.execute();
390 }
391 catch (...)
392 {
393 }
394 }
395 }
396}
397
398void StorageLiveView::startNoUsersThread(const UInt64 & timeout)
399{
400 bool expected = false;
401 if (!start_no_users_thread_called.compare_exchange_strong(expected, true))
402 return;
403
404 if (is_temporary)
405 {
406 std::lock_guard no_users_thread_lock(no_users_thread_mutex);
407
408 if (shutdown_called)
409 return;
410
411 if (no_users_thread.joinable())
412 {
413 {
414 std::lock_guard lock(no_users_thread_wakeup_mutex);
415 no_users_thread_wakeup = true;
416 no_users_thread_condition.notify_one();
417 }
418 no_users_thread.join();
419 }
420 {
421 std::lock_guard lock(no_users_thread_wakeup_mutex);
422 no_users_thread_wakeup = false;
423 }
424 if (!is_dropped)
425 no_users_thread = std::thread(&StorageLiveView::noUsersThread,
426 std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout);
427 }
428
429 start_no_users_thread_called = false;
430}
431
432void StorageLiveView::startup()
433{
434 startNoUsersThread(temporary_live_view_timeout);
435}
436
437void StorageLiveView::shutdown()
438{
439 bool expected = false;
440 if (!shutdown_called.compare_exchange_strong(expected, true))
441 return;
442
443 {
444 std::lock_guard no_users_thread_lock(no_users_thread_mutex);
445 if (no_users_thread.joinable())
446 {
447 {
448 std::lock_guard lock(no_users_thread_wakeup_mutex);
449 no_users_thread_wakeup = true;
450 no_users_thread_condition.notify_one();
451 }
452 }
453 }
454}
455
456StorageLiveView::~StorageLiveView()
457{
458 shutdown();
459
460 {
461 std::lock_guard lock(no_users_thread_mutex);
462 if (no_users_thread.joinable())
463 no_users_thread.detach();
464 }
465}
466
467void StorageLiveView::drop(TableStructureWriteLockHolder &)
468{
469 global_context.removeDependency(
470 DatabaseAndTableName(select_database_name, select_table_name),
471 DatabaseAndTableName(database_name, table_name));
472
473 std::lock_guard lock(mutex);
474 is_dropped = true;
475 condition.notify_all();
476}
477
478void StorageLiveView::refresh(const Context & context)
479{
480 auto alter_lock = lockAlterIntention(context.getCurrentQueryId());
481 {
482 std::lock_guard lock(mutex);
483 if (getNewBlocks())
484 condition.notify_all();
485 }
486}
487
488BlockInputStreams StorageLiveView::read(
489 const Names & /*column_names*/,
490 const SelectQueryInfo & /*query_info*/,
491 const Context & /*context*/,
492 QueryProcessingStage::Enum /*processed_stage*/,
493 const size_t /*max_block_size*/,
494 const unsigned /*num_streams*/)
495{
496 std::shared_ptr<BlocksBlockInputStream> stream;
497 {
498 std::lock_guard lock(mutex);
499 if (!(*blocks_ptr))
500 {
501 if (getNewBlocks())
502 condition.notify_all();
503 }
504 stream = std::make_shared<BlocksBlockInputStream>(blocks_ptr, getHeader());
505 }
506 return { stream };
507}
508
509BlockInputStreams StorageLiveView::watch(
510 const Names & /*column_names*/,
511 const SelectQueryInfo & query_info,
512 const Context & context,
513 QueryProcessingStage::Enum & processed_stage,
514 size_t /*max_block_size*/,
515 const unsigned /*num_streams*/)
516{
517 ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query);
518
519 bool has_limit = false;
520 UInt64 limit = 0;
521
522 if (query.limit_length)
523 {
524 has_limit = true;
525 limit = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*query.limit_length).value);
526 }
527
528 if (query.is_watch_events)
529 {
530 auto reader = std::make_shared<LiveViewEventsBlockInputStream>(
531 std::static_pointer_cast<StorageLiveView>(shared_from_this()),
532 blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
533 context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
534 context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
535
536 {
537 std::lock_guard no_users_thread_lock(no_users_thread_mutex);
538 if (no_users_thread.joinable())
539 {
540 std::lock_guard lock(no_users_thread_wakeup_mutex);
541 no_users_thread_wakeup = true;
542 no_users_thread_condition.notify_one();
543 }
544 }
545
546 {
547 std::lock_guard lock(mutex);
548 if (!(*blocks_ptr))
549 {
550 if (getNewBlocks())
551 condition.notify_all();
552 }
553 }
554
555 processed_stage = QueryProcessingStage::Complete;
556
557 return { reader };
558 }
559 else
560 {
561 auto reader = std::make_shared<LiveViewBlockInputStream>(
562 std::static_pointer_cast<StorageLiveView>(shared_from_this()),
563 blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit,
564 context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(),
565 context.getSettingsRef().temporary_live_view_timeout.totalSeconds());
566
567 {
568 std::lock_guard no_users_thread_lock(no_users_thread_mutex);
569 if (no_users_thread.joinable())
570 {
571 std::lock_guard lock(no_users_thread_wakeup_mutex);
572 no_users_thread_wakeup = true;
573 no_users_thread_condition.notify_one();
574 }
575 }
576
577 {
578 std::lock_guard lock(mutex);
579 if (!(*blocks_ptr))
580 {
581 if (getNewBlocks())
582 condition.notify_all();
583 }
584 }
585
586 processed_stage = QueryProcessingStage::Complete;
587
588 return { reader };
589 }
590}
591
592void registerStorageLiveView(StorageFactory & factory)
593{
594 factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args)
595 {
596 if (!args.attach && !args.local_context.getSettingsRef().allow_experimental_live_view)
597 throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')", ErrorCodes::SUPPORT_IS_DISABLED);
598
599 return StorageLiveView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns);
600 });
601}
602
603}
604