| 1 | /* Copyright (c) 2018 BlackBerry Limited |
| 2 | |
| 3 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | you may not use this file except in compliance with the License. |
| 5 | You may obtain a copy of the License at |
| 6 | http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | Unless required by applicable law or agreed to in writing, software |
| 8 | distributed under the License is distributed on an "AS IS" BASIS, |
| 9 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 10 | See the License for the specific language governing permissions and |
| 11 | limitations under the License. */ |
| 12 | #include <Core/Settings.h> |
| 13 | #include <Common/typeid_cast.h> |
| 14 | #include <Parsers/ASTWatchQuery.h> |
| 15 | #include <Interpreters/InterpreterWatchQuery.h> |
| 16 | #include <DataStreams/IBlockInputStream.h> |
| 17 | #include <DataStreams/OneBlockInputStream.h> |
| 18 | |
| 19 | |
| 20 | namespace DB |
| 21 | { |
| 22 | |
| 23 | namespace ErrorCodes |
| 24 | { |
| 25 | extern const int UNKNOWN_STORAGE; |
| 26 | extern const int UNKNOWN_TABLE; |
| 27 | extern const int TOO_MANY_COLUMNS; |
| 28 | extern const int SUPPORT_IS_DISABLED; |
| 29 | } |
| 30 | |
| 31 | BlockInputStreamPtr InterpreterWatchQuery::executeImpl() |
| 32 | { |
| 33 | return std::make_shared<OneBlockInputStream>(Block()); |
| 34 | } |
| 35 | |
| 36 | BlockIO InterpreterWatchQuery::execute() |
| 37 | { |
| 38 | if (!context.getSettingsRef().allow_experimental_live_view) |
| 39 | throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')" , ErrorCodes::SUPPORT_IS_DISABLED); |
| 40 | |
| 41 | BlockIO res; |
| 42 | const ASTWatchQuery & query = typeid_cast<const ASTWatchQuery &>(*query_ptr); |
| 43 | String database; |
| 44 | String table; |
| 45 | /// Get database |
| 46 | if (!query.database.empty()) |
| 47 | database = query.database; |
| 48 | else |
| 49 | database = context.getCurrentDatabase(); |
| 50 | |
| 51 | /// Get table |
| 52 | table = query.table; |
| 53 | |
| 54 | /// Get storage |
| 55 | storage = context.tryGetTable(database, table); |
| 56 | |
| 57 | if (!storage) |
| 58 | throw Exception("Table " + backQuoteIfNeed(database) + "." + |
| 59 | backQuoteIfNeed(table) + " doesn't exist." , |
| 60 | ErrorCodes::UNKNOWN_TABLE); |
| 61 | |
| 62 | /// List of columns to read to execute the query. |
| 63 | Names required_columns = storage->getColumns().getNamesOfPhysical(); |
| 64 | |
| 65 | /// Get context settings for this query |
| 66 | const Settings & settings = context.getSettingsRef(); |
| 67 | |
| 68 | /// Limitation on the number of columns to read. |
| 69 | if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) |
| 70 | throw Exception("Limit for number of columns to read exceeded. " |
| 71 | "Requested: " + std::to_string(required_columns.size()) |
| 72 | + ", maximum: " + settings.max_columns_to_read.toString(), |
| 73 | ErrorCodes::TOO_MANY_COLUMNS); |
| 74 | |
| 75 | size_t max_block_size = settings.max_block_size; |
| 76 | size_t max_streams = 1; |
| 77 | |
| 78 | /// Define query info |
| 79 | SelectQueryInfo query_info; |
| 80 | query_info.query = query_ptr; |
| 81 | |
| 82 | /// From stage |
| 83 | QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; |
| 84 | QueryProcessingStage::Enum to_stage = QueryProcessingStage::Complete; |
| 85 | |
| 86 | /// Watch storage |
| 87 | streams = storage->watch(required_columns, query_info, context, from_stage, max_block_size, max_streams); |
| 88 | |
| 89 | /// Constraints on the result, the quota on the result, and also callback for progress. |
| 90 | if (IBlockInputStream * stream = dynamic_cast<IBlockInputStream *>(streams[0].get())) |
| 91 | { |
| 92 | /// Constraints apply only to the final result. |
| 93 | if (to_stage == QueryProcessingStage::Complete) |
| 94 | { |
| 95 | IBlockInputStream::LocalLimits limits; |
| 96 | limits.mode = IBlockInputStream::LIMITS_CURRENT; |
| 97 | limits.size_limits.max_rows = settings.max_result_rows; |
| 98 | limits.size_limits.max_bytes = settings.max_result_bytes; |
| 99 | limits.size_limits.overflow_mode = settings.result_overflow_mode; |
| 100 | |
| 101 | stream->setLimits(limits); |
| 102 | stream->setQuota(context.getQuota()); |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | res.in = streams[0]; |
| 107 | |
| 108 | return res; |
| 109 | } |
| 110 | |
| 111 | |
| 112 | } |
| 113 | |