1/* Copyright (c) 2018 BlackBerry Limited
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6http://www.apache.org/licenses/LICENSE-2.0
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License. */
12#include <Core/Settings.h>
13#include <Common/typeid_cast.h>
14#include <Parsers/ASTWatchQuery.h>
15#include <Interpreters/InterpreterWatchQuery.h>
16#include <DataStreams/IBlockInputStream.h>
17#include <DataStreams/OneBlockInputStream.h>
18
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int UNKNOWN_STORAGE;
26 extern const int UNKNOWN_TABLE;
27 extern const int TOO_MANY_COLUMNS;
28 extern const int SUPPORT_IS_DISABLED;
29}
30
31BlockInputStreamPtr InterpreterWatchQuery::executeImpl()
32{
33 return std::make_shared<OneBlockInputStream>(Block());
34}
35
36BlockIO InterpreterWatchQuery::execute()
37{
38 if (!context.getSettingsRef().allow_experimental_live_view)
39 throw Exception("Experimental LIVE VIEW feature is not enabled (the setting 'allow_experimental_live_view')", ErrorCodes::SUPPORT_IS_DISABLED);
40
41 BlockIO res;
42 const ASTWatchQuery & query = typeid_cast<const ASTWatchQuery &>(*query_ptr);
43 String database;
44 String table;
45 /// Get database
46 if (!query.database.empty())
47 database = query.database;
48 else
49 database = context.getCurrentDatabase();
50
51 /// Get table
52 table = query.table;
53
54 /// Get storage
55 storage = context.tryGetTable(database, table);
56
57 if (!storage)
58 throw Exception("Table " + backQuoteIfNeed(database) + "." +
59 backQuoteIfNeed(table) + " doesn't exist.",
60 ErrorCodes::UNKNOWN_TABLE);
61
62 /// List of columns to read to execute the query.
63 Names required_columns = storage->getColumns().getNamesOfPhysical();
64
65 /// Get context settings for this query
66 const Settings & settings = context.getSettingsRef();
67
68 /// Limitation on the number of columns to read.
69 if (settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
70 throw Exception("Limit for number of columns to read exceeded. "
71 "Requested: " + std::to_string(required_columns.size())
72 + ", maximum: " + settings.max_columns_to_read.toString(),
73 ErrorCodes::TOO_MANY_COLUMNS);
74
75 size_t max_block_size = settings.max_block_size;
76 size_t max_streams = 1;
77
78 /// Define query info
79 SelectQueryInfo query_info;
80 query_info.query = query_ptr;
81
82 /// From stage
83 QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
84 QueryProcessingStage::Enum to_stage = QueryProcessingStage::Complete;
85
86 /// Watch storage
87 streams = storage->watch(required_columns, query_info, context, from_stage, max_block_size, max_streams);
88
89 /// Constraints on the result, the quota on the result, and also callback for progress.
90 if (IBlockInputStream * stream = dynamic_cast<IBlockInputStream *>(streams[0].get()))
91 {
92 /// Constraints apply only to the final result.
93 if (to_stage == QueryProcessingStage::Complete)
94 {
95 IBlockInputStream::LocalLimits limits;
96 limits.mode = IBlockInputStream::LIMITS_CURRENT;
97 limits.size_limits.max_rows = settings.max_result_rows;
98 limits.size_limits.max_bytes = settings.max_result_bytes;
99 limits.size_limits.overflow_mode = settings.result_overflow_mode;
100
101 stream->setLimits(limits);
102 stream->setQuota(context.getQuota());
103 }
104 }
105
106 res.in = streams[0];
107
108 return res;
109}
110
111
112}
113