1 | /* Copyright (c) 2018 BlackBerry Limited |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | http://www.apache.org/licenses/LICENSE-2.0 |
7 | Unless required by applicable law or agreed to in writing, software |
8 | distributed under the License is distributed on an "AS IS" BASIS, |
9 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
10 | See the License for the specific language governing permissions and |
11 | limitations under the License. */ |
12 | #include <Core/Settings.h> |
13 | #include <Common/typeid_cast.h> |
14 | #include <Parsers/ASTWatchQuery.h> |
15 | #include <Interpreters/InterpreterWatchQuery.h> |
16 | #include <DataStreams/IBlockInputStream.h> |
17 | #include <DataStreams/OneBlockInputStream.h> |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | namespace ErrorCodes |
24 | { |
25 | extern const int UNKNOWN_STORAGE; |
26 | extern const int UNKNOWN_TABLE; |
27 | extern const int TOO_MANY_COLUMNS; |
28 | extern const int SUPPORT_IS_DISABLED; |
29 | } |
30 | |
31 | BlockInputStreamPtr InterpreterWatchQuery::executeImpl() |
32 | { |
33 | return std::make_shared<OneBlockInputStream>(Block()); |
34 | } |
35 | |
36 | BlockIO InterpreterWatchQuery::execute() |
37 | { |
38 | if (!context.getSettingsRef().allow_experimental_live_view) |
39 | throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')" , ErrorCodes::SUPPORT_IS_DISABLED); |
40 | |
41 | BlockIO res; |
42 | const ASTWatchQuery & query = typeid_cast<const ASTWatchQuery &>(*query_ptr); |
43 | String database; |
44 | String table; |
45 | /// Get database |
46 | if (!query.database.empty()) |
47 | database = query.database; |
48 | else |
49 | database = context.getCurrentDatabase(); |
50 | |
51 | /// Get table |
52 | table = query.table; |
53 | |
54 | /// Get storage |
55 | storage = context.tryGetTable(database, table); |
56 | |
57 | if (!storage) |
58 | throw Exception("Table " + backQuoteIfNeed(database) + "." + |
59 | backQuoteIfNeed(table) + " doesn't exist." , |
60 | ErrorCodes::UNKNOWN_TABLE); |
61 | |
62 | /// List of columns to read to execute the query. |
63 | Names required_columns = storage->getColumns().getNamesOfPhysical(); |
64 | |
65 | /// Get context settings for this query |
66 | const Settings & settings = context.getSettingsRef(); |
67 | |
68 | /// Limitation on the number of columns to read. |
69 | if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) |
70 | throw Exception("Limit for number of columns to read exceeded. " |
71 | "Requested: " + std::to_string(required_columns.size()) |
72 | + ", maximum: " + settings.max_columns_to_read.toString(), |
73 | ErrorCodes::TOO_MANY_COLUMNS); |
74 | |
75 | size_t max_block_size = settings.max_block_size; |
76 | size_t max_streams = 1; |
77 | |
78 | /// Define query info |
79 | SelectQueryInfo query_info; |
80 | query_info.query = query_ptr; |
81 | |
82 | /// From stage |
83 | QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; |
84 | QueryProcessingStage::Enum to_stage = QueryProcessingStage::Complete; |
85 | |
86 | /// Watch storage |
87 | streams = storage->watch(required_columns, query_info, context, from_stage, max_block_size, max_streams); |
88 | |
89 | /// Constraints on the result, the quota on the result, and also callback for progress. |
90 | if (IBlockInputStream * stream = dynamic_cast<IBlockInputStream *>(streams[0].get())) |
91 | { |
92 | /// Constraints apply only to the final result. |
93 | if (to_stage == QueryProcessingStage::Complete) |
94 | { |
95 | IBlockInputStream::LocalLimits limits; |
96 | limits.mode = IBlockInputStream::LIMITS_CURRENT; |
97 | limits.size_limits.max_rows = settings.max_result_rows; |
98 | limits.size_limits.max_bytes = settings.max_result_bytes; |
99 | limits.size_limits.overflow_mode = settings.result_overflow_mode; |
100 | |
101 | stream->setLimits(limits); |
102 | stream->setQuota(context.getQuota()); |
103 | } |
104 | } |
105 | |
106 | res.in = streams[0]; |
107 | |
108 | return res; |
109 | } |
110 | |
111 | |
112 | } |
113 | |