1 | /* Copyright (c) 2018 BlackBerry Limited |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | http://www.apache.org/licenses/LICENSE-2.0 |
7 | Unless required by applicable law or agreed to in writing, software |
8 | distributed under the License is distributed on an "AS IS" BASIS, |
9 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
10 | See the License for the specific language governing permissions and |
11 | limitations under the License. */ |
12 | |
13 | #include <Parsers/ASTSelectQuery.h> |
14 | #include <Parsers/ASTCreateQuery.h> |
15 | #include <Parsers/ASTWatchQuery.h> |
16 | #include <Parsers/ASTDropQuery.h> |
17 | #include <Parsers/ASTIdentifier.h> |
18 | #include <Parsers/ASTLiteral.h> |
19 | #include <Interpreters/Context.h> |
20 | #include <Interpreters/InterpreterDropQuery.h> |
21 | #include <Interpreters/InterpreterSelectQuery.h> |
22 | #include <DataStreams/IBlockOutputStream.h> |
23 | #include <DataStreams/OneBlockInputStream.h> |
24 | #include <DataStreams/BlocksBlockInputStream.h> |
25 | #include <DataStreams/MaterializingBlockInputStream.h> |
26 | #include <DataStreams/SquashingBlockInputStream.h> |
27 | #include <DataStreams/copyData.h> |
28 | #include <Common/typeid_cast.h> |
29 | #include <Common/SipHash.h> |
30 | |
31 | #include <Storages/LiveView/StorageLiveView.h> |
32 | #include <Storages/LiveView/LiveViewBlockInputStream.h> |
33 | #include <Storages/LiveView/LiveViewBlockOutputStream.h> |
34 | #include <Storages/LiveView/LiveViewEventsBlockInputStream.h> |
35 | #include <Storages/LiveView/ProxyStorage.h> |
36 | |
37 | #include <Storages/StorageFactory.h> |
38 | #include <Parsers/ASTTablesInSelectQuery.h> |
39 | #include <Parsers/ASTSubquery.h> |
40 | #include <Interpreters/getTableExpressions.h> |
41 | #include <Interpreters/AddDefaultDatabaseVisitor.h> |
42 | |
43 | namespace DB |
44 | { |
45 | |
46 | namespace ErrorCodes |
47 | { |
48 | extern const int LOGICAL_ERROR; |
49 | extern const int INCORRECT_QUERY; |
50 | extern const int TABLE_WAS_NOT_DROPPED; |
51 | extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW; |
52 | extern const int SUPPORT_IS_DISABLED; |
53 | } |
54 | |
55 | static void (ASTSelectQuery & query, String & select_database_name, String & select_table_name) |
56 | { |
57 | auto db_and_table = getDatabaseAndTable(query, 0); |
58 | ASTPtr subquery = extractTableExpression(query, 0); |
59 | |
60 | if (!db_and_table && !subquery) |
61 | return; |
62 | |
63 | if (db_and_table) |
64 | { |
65 | select_table_name = db_and_table->table; |
66 | |
67 | if (db_and_table->database.empty()) |
68 | { |
69 | db_and_table->database = select_database_name; |
70 | AddDefaultDatabaseVisitor visitor(select_database_name); |
71 | visitor.visit(query); |
72 | } |
73 | else |
74 | select_database_name = db_and_table->database; |
75 | } |
76 | else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>()) |
77 | { |
78 | if (ast_select->list_of_selects->children.size() != 1) |
79 | throw Exception("UNION is not supported for LIVE VIEW" , ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); |
80 | |
81 | auto & inner_query = ast_select->list_of_selects->children.at(0); |
82 | |
83 | extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name); |
84 | } |
85 | else |
86 | throw Exception("Logical error while creating StorageLiveView." |
87 | " Could not retrieve table name from select query." , |
88 | DB::ErrorCodes::LOGICAL_ERROR); |
89 | } |
90 | |
91 | |
92 | void StorageLiveView::writeIntoLiveView( |
93 | StorageLiveView & live_view, |
94 | const Block & block, |
95 | const Context & context) |
96 | { |
97 | BlockOutputStreamPtr output = std::make_shared<LiveViewBlockOutputStream>(live_view); |
98 | |
99 | /// Check if live view has any readers if not |
100 | /// just reset blocks to empty and do nothing else |
101 | /// When first reader comes the blocks will be read. |
102 | { |
103 | std::lock_guard lock(live_view.mutex); |
104 | if (!live_view.hasActiveUsers()) |
105 | { |
106 | live_view.reset(); |
107 | return; |
108 | } |
109 | } |
110 | |
111 | bool is_block_processed = false; |
112 | BlockInputStreams from; |
113 | BlocksPtrs mergeable_blocks; |
114 | BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>(); |
115 | |
116 | { |
117 | std::lock_guard lock(live_view.mutex); |
118 | |
119 | mergeable_blocks = live_view.getMergeableBlocks(); |
120 | if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh) |
121 | { |
122 | mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>(); |
123 | BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>(); |
124 | InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); |
125 | auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>( |
126 | interpreter.execute().in); |
127 | while (Block this_block = view_mergeable_stream->read()) |
128 | base_mergeable_blocks->push_back(this_block); |
129 | mergeable_blocks->push_back(base_mergeable_blocks); |
130 | live_view.setMergeableBlocks(mergeable_blocks); |
131 | |
132 | /// Create from streams |
133 | for (auto & blocks_ : *mergeable_blocks) |
134 | { |
135 | if (blocks_->empty()) |
136 | continue; |
137 | auto sample_block = blocks_->front().cloneEmpty(); |
138 | BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block); |
139 | from.push_back(std::move(stream)); |
140 | } |
141 | |
142 | is_block_processed = true; |
143 | } |
144 | } |
145 | |
146 | if (!is_block_processed) |
147 | { |
148 | auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); |
149 | BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)}; |
150 | auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns); |
151 | InterpreterSelectQuery select_block(live_view.getInnerQuery(), |
152 | context, proxy_storage, |
153 | QueryProcessingStage::WithMergeableState); |
154 | auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>( |
155 | select_block.execute().in); |
156 | while (Block this_block = data_mergeable_stream->read()) |
157 | new_mergeable_blocks->push_back(this_block); |
158 | |
159 | if (new_mergeable_blocks->empty()) |
160 | return; |
161 | |
162 | { |
163 | std::lock_guard lock(live_view.mutex); |
164 | |
165 | mergeable_blocks = live_view.getMergeableBlocks(); |
166 | mergeable_blocks->push_back(new_mergeable_blocks); |
167 | |
168 | /// Create from streams |
169 | for (auto & blocks_ : *mergeable_blocks) |
170 | { |
171 | if (blocks_->empty()) |
172 | continue; |
173 | auto sample_block = blocks_->front().cloneEmpty(); |
174 | BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block); |
175 | from.push_back(std::move(stream)); |
176 | } |
177 | } |
178 | } |
179 | |
180 | auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); |
181 | auto proxy_storage = std::make_shared<ProxyStorage>(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState); |
182 | InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete); |
183 | BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in); |
184 | |
185 | /// Squashing is needed here because the view query can generate a lot of blocks |
186 | /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY |
187 | /// and two-level aggregation is triggered). |
188 | data = std::make_shared<SquashingBlockInputStream>( |
189 | data, context.getGlobalContext().getSettingsRef().min_insert_block_size_rows, context.getGlobalContext().getSettingsRef().min_insert_block_size_bytes); |
190 | |
191 | copyData(*data, *output); |
192 | } |
193 | |
194 | |
195 | StorageLiveView::StorageLiveView( |
196 | const String & table_name_, |
197 | const String & database_name_, |
198 | Context & local_context, |
199 | const ASTCreateQuery & query, |
200 | const ColumnsDescription & columns_) |
201 | : table_name(table_name_), |
202 | database_name(database_name_), global_context(local_context.getGlobalContext()) |
203 | { |
204 | setColumns(columns_); |
205 | |
206 | if (!query.select) |
207 | throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); |
208 | |
209 | /// Default value, if only table name exist in the query |
210 | select_database_name = local_context.getCurrentDatabase(); |
211 | if (query.select->list_of_selects->children.size() != 1) |
212 | throw Exception("UNION is not supported for LIVE VIEW" , ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); |
213 | |
214 | inner_query = query.select->list_of_selects->children.at(0); |
215 | |
216 | ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query); |
217 | extractDependentTable(select_query, select_database_name, select_table_name); |
218 | |
219 | /// If the table is not specified - use the table `system.one` |
220 | if (select_table_name.empty()) |
221 | { |
222 | select_database_name = "system" ; |
223 | select_table_name = "one" ; |
224 | } |
225 | |
226 | global_context.addDependency( |
227 | DatabaseAndTableName(select_database_name, select_table_name), |
228 | DatabaseAndTableName(database_name, table_name)); |
229 | |
230 | is_temporary = query.temporary; |
231 | temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds(); |
232 | |
233 | blocks_ptr = std::make_shared<BlocksPtr>(); |
234 | blocks_metadata_ptr = std::make_shared<BlocksMetadataPtr>(); |
235 | active_ptr = std::make_shared<bool>(true); |
236 | } |
237 | |
238 | NameAndTypePair StorageLiveView::getColumn(const String & column_name) const |
239 | { |
240 | if (column_name == "_version" ) |
241 | return NameAndTypePair("_version" , std::make_shared<DataTypeUInt64>()); |
242 | |
243 | return IStorage::getColumn(column_name); |
244 | } |
245 | |
246 | bool StorageLiveView::hasColumn(const String & column_name) const |
247 | { |
248 | if (column_name == "_version" ) |
249 | return true; |
250 | |
251 | return IStorage::hasColumn(column_name); |
252 | } |
253 | |
254 | Block StorageLiveView::() const |
255 | { |
256 | std::lock_guard lock(sample_block_lock); |
257 | |
258 | if (!sample_block) |
259 | { |
260 | auto storage = global_context.getTable(select_database_name, select_table_name); |
261 | sample_block = InterpreterSelectQuery(inner_query, global_context, storage, |
262 | SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock(); |
263 | sample_block.insert({DataTypeUInt64().createColumnConst( |
264 | sample_block.rows(), 0)->convertToFullColumnIfConst(), |
265 | std::make_shared<DataTypeUInt64>(), |
266 | "_version" }); |
267 | /// convert all columns to full columns |
268 | /// in case some of them are constant |
269 | for (size_t i = 0; i < sample_block.columns(); ++i) |
270 | { |
271 | sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); |
272 | } |
273 | } |
274 | |
275 | return sample_block; |
276 | } |
277 | |
278 | bool StorageLiveView::getNewBlocks() |
279 | { |
280 | SipHash hash; |
281 | UInt128 key; |
282 | BlocksPtr new_blocks = std::make_shared<Blocks>(); |
283 | BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>(); |
284 | BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>(); |
285 | |
286 | InterpreterSelectQuery interpreter(inner_query->clone(), global_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); |
287 | auto mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in); |
288 | |
289 | while (Block block = mergeable_stream->read()) |
290 | new_mergeable_blocks->push_back(block); |
291 | |
292 | mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>(); |
293 | mergeable_blocks->push_back(new_mergeable_blocks); |
294 | BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader()); |
295 | auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_database_name, select_table_name), {from}, QueryProcessingStage::WithMergeableState); |
296 | InterpreterSelectQuery select(inner_query->clone(), global_context, proxy_storage, SelectQueryOptions(QueryProcessingStage::Complete)); |
297 | BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in); |
298 | |
299 | /// Squashing is needed here because the view query can generate a lot of blocks |
300 | /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY |
301 | /// and two-level aggregation is triggered). |
302 | data = std::make_shared<SquashingBlockInputStream>( |
303 | data, global_context.getSettingsRef().min_insert_block_size_rows, global_context.getSettingsRef().min_insert_block_size_bytes); |
304 | |
305 | while (Block block = data->read()) |
306 | { |
307 | /// calculate hash before virtual column is added |
308 | block.updateHash(hash); |
309 | /// add result version meta column |
310 | block.insert({DataTypeUInt64().createColumnConst( |
311 | block.rows(), getBlocksVersion() + 1)->convertToFullColumnIfConst(), |
312 | std::make_shared<DataTypeUInt64>(), |
313 | "_version" }); |
314 | new_blocks->push_back(block); |
315 | } |
316 | |
317 | hash.get128(key.low, key.high); |
318 | |
319 | /// Update blocks only if hash keys do not match |
320 | /// NOTE: hash could be different for the same result |
321 | /// if blocks are not in the same order |
322 | bool updated = false; |
323 | { |
324 | if (getBlocksHashKey() != key.toHexString()) |
325 | { |
326 | if (new_blocks->empty()) |
327 | { |
328 | new_blocks->push_back(getHeader()); |
329 | } |
330 | new_blocks_metadata->hash = key.toHexString(); |
331 | new_blocks_metadata->version = getBlocksVersion() + 1; |
332 | (*blocks_ptr) = new_blocks; |
333 | (*blocks_metadata_ptr) = new_blocks_metadata; |
334 | updated = true; |
335 | } |
336 | } |
337 | return updated; |
338 | } |
339 | |
340 | void StorageLiveView::checkTableCanBeDropped() const |
341 | { |
342 | Dependencies dependencies = global_context.getDependencies(database_name, table_name); |
343 | if (!dependencies.empty()) |
344 | { |
345 | DatabaseAndTableName database_and_table_name = dependencies.front(); |
346 | throw Exception("Table has dependency " + database_and_table_name.first + "." + database_and_table_name.second, ErrorCodes::TABLE_WAS_NOT_DROPPED); |
347 | } |
348 | } |
349 | |
350 | void StorageLiveView::noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout) |
351 | { |
352 | bool drop_table = false; |
353 | |
354 | if (storage->shutdown_called) |
355 | return; |
356 | |
357 | { |
358 | while (1) |
359 | { |
360 | std::unique_lock lock(storage->no_users_thread_wakeup_mutex); |
361 | if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; })) |
362 | { |
363 | storage->no_users_thread_wakeup = false; |
364 | if (storage->shutdown_called) |
365 | return; |
366 | if (storage->hasUsers()) |
367 | return; |
368 | if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) |
369 | continue; |
370 | drop_table = true; |
371 | } |
372 | break; |
373 | } |
374 | } |
375 | |
376 | if (drop_table) |
377 | { |
378 | if (storage->global_context.tryGetTable(storage->database_name, storage->table_name)) |
379 | { |
380 | try |
381 | { |
382 | /// We create and execute `drop` query for this table |
383 | auto drop_query = std::make_shared<ASTDropQuery>(); |
384 | drop_query->database = storage->database_name; |
385 | drop_query->table = storage->table_name; |
386 | drop_query->kind = ASTDropQuery::Kind::Drop; |
387 | ASTPtr ast_drop_query = drop_query; |
388 | InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context); |
389 | drop_interpreter.execute(); |
390 | } |
391 | catch (...) |
392 | { |
393 | } |
394 | } |
395 | } |
396 | } |
397 | |
398 | void StorageLiveView::startNoUsersThread(const UInt64 & timeout) |
399 | { |
400 | bool expected = false; |
401 | if (!start_no_users_thread_called.compare_exchange_strong(expected, true)) |
402 | return; |
403 | |
404 | if (is_temporary) |
405 | { |
406 | std::lock_guard no_users_thread_lock(no_users_thread_mutex); |
407 | |
408 | if (shutdown_called) |
409 | return; |
410 | |
411 | if (no_users_thread.joinable()) |
412 | { |
413 | { |
414 | std::lock_guard lock(no_users_thread_wakeup_mutex); |
415 | no_users_thread_wakeup = true; |
416 | no_users_thread_condition.notify_one(); |
417 | } |
418 | no_users_thread.join(); |
419 | } |
420 | { |
421 | std::lock_guard lock(no_users_thread_wakeup_mutex); |
422 | no_users_thread_wakeup = false; |
423 | } |
424 | if (!is_dropped) |
425 | no_users_thread = std::thread(&StorageLiveView::noUsersThread, |
426 | std::static_pointer_cast<StorageLiveView>(shared_from_this()), timeout); |
427 | } |
428 | |
429 | start_no_users_thread_called = false; |
430 | } |
431 | |
432 | void StorageLiveView::startup() |
433 | { |
434 | startNoUsersThread(temporary_live_view_timeout); |
435 | } |
436 | |
437 | void StorageLiveView::shutdown() |
438 | { |
439 | bool expected = false; |
440 | if (!shutdown_called.compare_exchange_strong(expected, true)) |
441 | return; |
442 | |
443 | { |
444 | std::lock_guard no_users_thread_lock(no_users_thread_mutex); |
445 | if (no_users_thread.joinable()) |
446 | { |
447 | { |
448 | std::lock_guard lock(no_users_thread_wakeup_mutex); |
449 | no_users_thread_wakeup = true; |
450 | no_users_thread_condition.notify_one(); |
451 | } |
452 | } |
453 | } |
454 | } |
455 | |
456 | StorageLiveView::~StorageLiveView() |
457 | { |
458 | shutdown(); |
459 | |
460 | { |
461 | std::lock_guard lock(no_users_thread_mutex); |
462 | if (no_users_thread.joinable()) |
463 | no_users_thread.detach(); |
464 | } |
465 | } |
466 | |
467 | void StorageLiveView::drop(TableStructureWriteLockHolder &) |
468 | { |
469 | global_context.removeDependency( |
470 | DatabaseAndTableName(select_database_name, select_table_name), |
471 | DatabaseAndTableName(database_name, table_name)); |
472 | |
473 | std::lock_guard lock(mutex); |
474 | is_dropped = true; |
475 | condition.notify_all(); |
476 | } |
477 | |
478 | void StorageLiveView::refresh(const Context & context) |
479 | { |
480 | auto alter_lock = lockAlterIntention(context.getCurrentQueryId()); |
481 | { |
482 | std::lock_guard lock(mutex); |
483 | if (getNewBlocks()) |
484 | condition.notify_all(); |
485 | } |
486 | } |
487 | |
488 | BlockInputStreams StorageLiveView::read( |
489 | const Names & /*column_names*/, |
490 | const SelectQueryInfo & /*query_info*/, |
491 | const Context & /*context*/, |
492 | QueryProcessingStage::Enum /*processed_stage*/, |
493 | const size_t /*max_block_size*/, |
494 | const unsigned /*num_streams*/) |
495 | { |
496 | std::shared_ptr<BlocksBlockInputStream> stream; |
497 | { |
498 | std::lock_guard lock(mutex); |
499 | if (!(*blocks_ptr)) |
500 | { |
501 | if (getNewBlocks()) |
502 | condition.notify_all(); |
503 | } |
504 | stream = std::make_shared<BlocksBlockInputStream>(blocks_ptr, getHeader()); |
505 | } |
506 | return { stream }; |
507 | } |
508 | |
509 | BlockInputStreams StorageLiveView::watch( |
510 | const Names & /*column_names*/, |
511 | const SelectQueryInfo & query_info, |
512 | const Context & context, |
513 | QueryProcessingStage::Enum & processed_stage, |
514 | size_t /*max_block_size*/, |
515 | const unsigned /*num_streams*/) |
516 | { |
517 | ASTWatchQuery & query = typeid_cast<ASTWatchQuery &>(*query_info.query); |
518 | |
519 | bool has_limit = false; |
520 | UInt64 limit = 0; |
521 | |
522 | if (query.limit_length) |
523 | { |
524 | has_limit = true; |
525 | limit = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*query.limit_length).value); |
526 | } |
527 | |
528 | if (query.is_watch_events) |
529 | { |
530 | auto reader = std::make_shared<LiveViewEventsBlockInputStream>( |
531 | std::static_pointer_cast<StorageLiveView>(shared_from_this()), |
532 | blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, |
533 | context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), |
534 | context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); |
535 | |
536 | { |
537 | std::lock_guard no_users_thread_lock(no_users_thread_mutex); |
538 | if (no_users_thread.joinable()) |
539 | { |
540 | std::lock_guard lock(no_users_thread_wakeup_mutex); |
541 | no_users_thread_wakeup = true; |
542 | no_users_thread_condition.notify_one(); |
543 | } |
544 | } |
545 | |
546 | { |
547 | std::lock_guard lock(mutex); |
548 | if (!(*blocks_ptr)) |
549 | { |
550 | if (getNewBlocks()) |
551 | condition.notify_all(); |
552 | } |
553 | } |
554 | |
555 | processed_stage = QueryProcessingStage::Complete; |
556 | |
557 | return { reader }; |
558 | } |
559 | else |
560 | { |
561 | auto reader = std::make_shared<LiveViewBlockInputStream>( |
562 | std::static_pointer_cast<StorageLiveView>(shared_from_this()), |
563 | blocks_ptr, blocks_metadata_ptr, active_ptr, has_limit, limit, |
564 | context.getSettingsRef().live_view_heartbeat_interval.totalSeconds(), |
565 | context.getSettingsRef().temporary_live_view_timeout.totalSeconds()); |
566 | |
567 | { |
568 | std::lock_guard no_users_thread_lock(no_users_thread_mutex); |
569 | if (no_users_thread.joinable()) |
570 | { |
571 | std::lock_guard lock(no_users_thread_wakeup_mutex); |
572 | no_users_thread_wakeup = true; |
573 | no_users_thread_condition.notify_one(); |
574 | } |
575 | } |
576 | |
577 | { |
578 | std::lock_guard lock(mutex); |
579 | if (!(*blocks_ptr)) |
580 | { |
581 | if (getNewBlocks()) |
582 | condition.notify_all(); |
583 | } |
584 | } |
585 | |
586 | processed_stage = QueryProcessingStage::Complete; |
587 | |
588 | return { reader }; |
589 | } |
590 | } |
591 | |
592 | void registerStorageLiveView(StorageFactory & factory) |
593 | { |
594 | factory.registerStorage("LiveView" , [](const StorageFactory::Arguments & args) |
595 | { |
596 | if (!args.attach && !args.local_context.getSettingsRef().allow_experimental_live_view) |
597 | throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')" , ErrorCodes::SUPPORT_IS_DISABLED); |
598 | |
599 | return StorageLiveView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns); |
600 | }); |
601 | } |
602 | |
603 | } |
604 | |