1 | #include <Storages/StorageMaterializedView.h> |
2 | |
3 | #include <Parsers/ASTSelectQuery.h> |
4 | #include <Parsers/ASTSelectWithUnionQuery.h> |
5 | #include <Parsers/ASTCreateQuery.h> |
6 | #include <Parsers/ASTDropQuery.h> |
7 | |
8 | #include <Interpreters/Context.h> |
9 | #include <Interpreters/InterpreterCreateQuery.h> |
10 | #include <Interpreters/InterpreterDropQuery.h> |
11 | #include <Interpreters/InterpreterRenameQuery.h> |
12 | #include <Interpreters/getTableExpressions.h> |
13 | #include <Interpreters/AddDefaultDatabaseVisitor.h> |
14 | #include <DataStreams/IBlockInputStream.h> |
15 | #include <DataStreams/IBlockOutputStream.h> |
16 | |
17 | #include <Storages/StorageFactory.h> |
18 | #include <Storages/ReadInOrderOptimizer.h> |
19 | |
20 | #include <Common/typeid_cast.h> |
21 | |
22 | |
23 | namespace DB |
24 | { |
25 | |
26 | namespace ErrorCodes |
27 | { |
28 | extern const int LOGICAL_ERROR; |
29 | extern const int INCORRECT_QUERY; |
30 | extern const int QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW; |
31 | } |
32 | |
33 | static inline String generateInnerTableName(const String & table_name) |
34 | { |
35 | return ".inner." + table_name; |
36 | } |
37 | |
38 | static void (ASTSelectQuery & query, String & select_database_name, String & select_table_name) |
39 | { |
40 | auto db_and_table = getDatabaseAndTable(query, 0); |
41 | ASTPtr subquery = extractTableExpression(query, 0); |
42 | |
43 | if (!db_and_table && !subquery) |
44 | return; |
45 | |
46 | if (db_and_table) |
47 | { |
48 | select_table_name = db_and_table->table; |
49 | |
50 | if (db_and_table->database.empty()) |
51 | { |
52 | db_and_table->database = select_database_name; |
53 | AddDefaultDatabaseVisitor visitor(select_database_name); |
54 | visitor.visit(query); |
55 | } |
56 | else |
57 | select_database_name = db_and_table->database; |
58 | } |
59 | else if (auto * ast_select = subquery->as<ASTSelectWithUnionQuery>()) |
60 | { |
61 | if (ast_select->list_of_selects->children.size() != 1) |
62 | throw Exception("UNION is not supported for MATERIALIZED VIEW" , ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW); |
63 | |
64 | auto & inner_query = ast_select->list_of_selects->children.at(0); |
65 | |
66 | extractDependentTable(inner_query->as<ASTSelectQuery &>(), select_database_name, select_table_name); |
67 | } |
68 | else |
69 | throw Exception("Logical error while creating StorageMaterializedView." |
70 | " Could not retrieve table name from select query." , |
71 | DB::ErrorCodes::LOGICAL_ERROR); |
72 | } |
73 | |
74 | |
75 | static void checkAllowedQueries(const ASTSelectQuery & query) |
76 | { |
77 | if (query.prewhere() || query.final() || query.sample_size()) |
78 | throw Exception("MATERIALIZED VIEW cannot have PREWHERE, SAMPLE or FINAL." , DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW); |
79 | |
80 | ASTPtr subquery = extractTableExpression(query, 0); |
81 | if (!subquery) |
82 | return; |
83 | |
84 | if (const auto * ast_select = subquery->as<ASTSelectWithUnionQuery>()) |
85 | { |
86 | if (ast_select->list_of_selects->children.size() != 1) |
87 | throw Exception("UNION is not supported for MATERIALIZED VIEW" , ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW); |
88 | |
89 | const auto & inner_query = ast_select->list_of_selects->children.at(0); |
90 | |
91 | checkAllowedQueries(inner_query->as<ASTSelectQuery &>()); |
92 | } |
93 | } |
94 | |
95 | |
96 | StorageMaterializedView::StorageMaterializedView( |
97 | const String & table_name_, |
98 | const String & database_name_, |
99 | Context & local_context, |
100 | const ASTCreateQuery & query, |
101 | const ColumnsDescription & columns_, |
102 | bool attach_) |
103 | : table_name(table_name_), |
104 | database_name(database_name_), global_context(local_context.getGlobalContext()) |
105 | { |
106 | setColumns(columns_); |
107 | |
108 | if (!query.select) |
109 | throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); |
110 | |
111 | if (!query.storage && query.to_table.empty()) |
112 | throw Exception( |
113 | "You must specify where to save results of a MaterializedView query: either ENGINE or an existing table in a TO clause" , |
114 | ErrorCodes::INCORRECT_QUERY); |
115 | |
116 | /// Default value, if only table name exist in the query |
117 | select_database_name = local_context.getCurrentDatabase(); |
118 | if (query.select->list_of_selects->children.size() != 1) |
119 | throw Exception("UNION is not supported for MATERIALIZED VIEW" , ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW); |
120 | |
121 | inner_query = query.select->list_of_selects->children.at(0); |
122 | |
123 | auto & select_query = inner_query->as<ASTSelectQuery &>(); |
124 | extractDependentTable(select_query, select_database_name, select_table_name); |
125 | checkAllowedQueries(select_query); |
126 | |
127 | if (!select_table_name.empty()) |
128 | global_context.addDependency( |
129 | DatabaseAndTableName(select_database_name, select_table_name), |
130 | DatabaseAndTableName(database_name, table_name)); |
131 | |
132 | // If the destination table is not set, use inner table |
133 | if (!query.to_table.empty()) |
134 | { |
135 | target_database_name = query.to_database; |
136 | target_table_name = query.to_table; |
137 | } |
138 | else |
139 | { |
140 | target_database_name = database_name; |
141 | target_table_name = generateInnerTableName(table_name); |
142 | has_inner_table = true; |
143 | } |
144 | |
145 | /// If there is an ATTACH request, then the internal table must already be connected. |
146 | if (!attach_ && has_inner_table) |
147 | { |
148 | /// We will create a query to create an internal table. |
149 | auto manual_create_query = std::make_shared<ASTCreateQuery>(); |
150 | manual_create_query->database = target_database_name; |
151 | manual_create_query->table = target_table_name; |
152 | |
153 | auto new_columns_list = std::make_shared<ASTColumns>(); |
154 | new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr()); |
155 | |
156 | manual_create_query->set(manual_create_query->columns_list, new_columns_list); |
157 | manual_create_query->set(manual_create_query->storage, query.storage->ptr()); |
158 | |
159 | /// Execute the query. |
160 | try |
161 | { |
162 | InterpreterCreateQuery create_interpreter(manual_create_query, local_context); |
163 | create_interpreter.setInternal(true); |
164 | create_interpreter.execute(); |
165 | } |
166 | catch (...) |
167 | { |
168 | /// In case of any error we should remove dependency to the view. |
169 | if (!select_table_name.empty()) |
170 | global_context.removeDependency( |
171 | DatabaseAndTableName(select_database_name, select_table_name), |
172 | DatabaseAndTableName(database_name, table_name)); |
173 | |
174 | throw; |
175 | } |
176 | } |
177 | } |
178 | |
179 | NameAndTypePair StorageMaterializedView::getColumn(const String & column_name) const |
180 | { |
181 | return getTargetTable()->getColumn(column_name); |
182 | } |
183 | |
184 | bool StorageMaterializedView::hasColumn(const String & column_name) const |
185 | { |
186 | return getTargetTable()->hasColumn(column_name); |
187 | } |
188 | |
189 | QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context) const |
190 | { |
191 | return getTargetTable()->getQueryProcessingStage(context); |
192 | } |
193 | |
194 | BlockInputStreams StorageMaterializedView::read( |
195 | const Names & column_names, |
196 | const SelectQueryInfo & query_info, |
197 | const Context & context, |
198 | QueryProcessingStage::Enum processed_stage, |
199 | const size_t max_block_size, |
200 | const unsigned num_streams) |
201 | { |
202 | auto storage = getTargetTable(); |
203 | auto lock = storage->lockStructureForShare(false, context.getCurrentQueryId()); |
204 | if (query_info.order_by_optimizer) |
205 | query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage); |
206 | |
207 | auto streams = storage->read(column_names, query_info, context, processed_stage, max_block_size, num_streams); |
208 | for (auto & stream : streams) |
209 | stream->addTableLock(lock); |
210 | return streams; |
211 | } |
212 | |
213 | BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const Context & context) |
214 | { |
215 | auto storage = getTargetTable(); |
216 | auto lock = storage->lockStructureForShare(true, context.getCurrentQueryId()); |
217 | auto stream = storage->write(query, context); |
218 | stream->addTableLock(lock); |
219 | return stream; |
220 | } |
221 | |
222 | |
223 | static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context, const String & target_database_name, const String & target_table_name) |
224 | { |
225 | if (global_context.tryGetTable(target_database_name, target_table_name)) |
226 | { |
227 | /// We create and execute `drop` query for internal table. |
228 | auto drop_query = std::make_shared<ASTDropQuery>(); |
229 | drop_query->database = target_database_name; |
230 | drop_query->table = target_table_name; |
231 | drop_query->kind = kind; |
232 | ASTPtr ast_drop_query = drop_query; |
233 | InterpreterDropQuery drop_interpreter(ast_drop_query, global_context); |
234 | drop_interpreter.execute(); |
235 | } |
236 | } |
237 | |
238 | |
239 | void StorageMaterializedView::drop(TableStructureWriteLockHolder &) |
240 | { |
241 | global_context.removeDependency( |
242 | DatabaseAndTableName(select_database_name, select_table_name), |
243 | DatabaseAndTableName(database_name, table_name)); |
244 | |
245 | if (has_inner_table && tryGetTargetTable()) |
246 | executeDropQuery(ASTDropQuery::Kind::Drop, global_context, target_database_name, target_table_name); |
247 | } |
248 | |
249 | void StorageMaterializedView::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
250 | { |
251 | if (has_inner_table) |
252 | executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, target_database_name, target_table_name); |
253 | } |
254 | |
255 | void StorageMaterializedView::checkStatementCanBeForwarded() const |
256 | { |
257 | if (!has_inner_table) |
258 | throw Exception( |
259 | "MATERIALIZED VIEW targets existing table " + target_database_name + "." + target_table_name + ". " |
260 | + "Execute the statement directly on it." , ErrorCodes::INCORRECT_QUERY); |
261 | } |
262 | |
263 | bool StorageMaterializedView::optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) |
264 | { |
265 | checkStatementCanBeForwarded(); |
266 | return getTargetTable()->optimize(query, partition, final, deduplicate, context); |
267 | } |
268 | |
269 | void StorageMaterializedView::alterPartition(const ASTPtr & query, const PartitionCommands &commands, const Context &context) |
270 | { |
271 | checkStatementCanBeForwarded(); |
272 | getTargetTable()->alterPartition(query, commands, context); |
273 | } |
274 | |
275 | void StorageMaterializedView::mutate(const MutationCommands & commands, const Context & context) |
276 | { |
277 | checkStatementCanBeForwarded(); |
278 | getTargetTable()->mutate(commands, context); |
279 | } |
280 | |
281 | static void executeRenameQuery(Context & global_context, const String & database_name, const String & table_original_name, const String & new_table_name) |
282 | { |
283 | if (global_context.tryGetTable(database_name, table_original_name)) |
284 | { |
285 | auto rename = std::make_shared<ASTRenameQuery>(); |
286 | |
287 | ASTRenameQuery::Table from; |
288 | from.database = database_name; |
289 | from.table = table_original_name; |
290 | |
291 | ASTRenameQuery::Table to; |
292 | to.database = database_name; |
293 | to.table = new_table_name; |
294 | |
295 | ASTRenameQuery::Element elem; |
296 | elem.from = from; |
297 | elem.to = to; |
298 | |
299 | rename->elements.emplace_back(elem); |
300 | |
301 | InterpreterRenameQuery(rename, global_context).execute(); |
302 | } |
303 | } |
304 | |
305 | |
306 | void StorageMaterializedView::rename( |
307 | const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
308 | { |
309 | if (has_inner_table && tryGetTargetTable()) |
310 | { |
311 | String new_target_table_name = generateInnerTableName(new_table_name); |
312 | executeRenameQuery(global_context, target_database_name, target_table_name, new_target_table_name); |
313 | target_table_name = new_target_table_name; |
314 | } |
315 | |
316 | auto lock = global_context.getLock(); |
317 | |
318 | global_context.removeDependencyUnsafe( |
319 | DatabaseAndTableName(select_database_name, select_table_name), |
320 | DatabaseAndTableName(database_name, table_name)); |
321 | |
322 | table_name = new_table_name; |
323 | database_name = new_database_name; |
324 | |
325 | global_context.addDependencyUnsafe( |
326 | DatabaseAndTableName(select_database_name, select_table_name), |
327 | DatabaseAndTableName(database_name, table_name)); |
328 | } |
329 | |
330 | void StorageMaterializedView::shutdown() |
331 | { |
332 | /// Make sure the dependency is removed after DETACH TABLE |
333 | global_context.removeDependency( |
334 | DatabaseAndTableName(select_database_name, select_table_name), |
335 | DatabaseAndTableName(database_name, table_name)); |
336 | } |
337 | |
338 | StoragePtr StorageMaterializedView::getTargetTable() const |
339 | { |
340 | return global_context.getTable(target_database_name, target_table_name); |
341 | } |
342 | |
343 | StoragePtr StorageMaterializedView::tryGetTargetTable() const |
344 | { |
345 | return global_context.tryGetTable(target_database_name, target_table_name); |
346 | } |
347 | |
348 | Strings StorageMaterializedView::getDataPaths() const |
349 | { |
350 | if (auto table = tryGetTargetTable()) |
351 | return table->getDataPaths(); |
352 | return {}; |
353 | } |
354 | |
355 | void StorageMaterializedView::checkTableCanBeDropped() const |
356 | { |
357 | /// Don't drop the target table if it was created manually via 'TO inner_table' statement |
358 | if (!has_inner_table) |
359 | return; |
360 | |
361 | auto target_table = tryGetTargetTable(); |
362 | if (!target_table) |
363 | return; |
364 | |
365 | target_table->checkTableCanBeDropped(); |
366 | } |
367 | |
368 | void StorageMaterializedView::checkPartitionCanBeDropped(const ASTPtr & partition) |
369 | { |
370 | /// Don't drop the partition in target table if it was created manually via 'TO inner_table' statement |
371 | if (!has_inner_table) |
372 | return; |
373 | |
374 | auto target_table = tryGetTargetTable(); |
375 | if (!target_table) |
376 | return; |
377 | |
378 | target_table->checkPartitionCanBeDropped(partition); |
379 | } |
380 | |
381 | ActionLock StorageMaterializedView::getActionLock(StorageActionBlockType type) |
382 | { |
383 | return has_inner_table ? getTargetTable()->getActionLock(type) : ActionLock{}; |
384 | } |
385 | |
386 | void registerStorageMaterializedView(StorageFactory & factory) |
387 | { |
388 | factory.registerStorage("MaterializedView" , [](const StorageFactory::Arguments & args) |
389 | { |
390 | /// Pass local_context here to convey setting for inner table |
391 | return StorageMaterializedView::create( |
392 | args.table_name, args.database_name, args.local_context, args.query, |
393 | args.columns, args.attach); |
394 | }); |
395 | } |
396 | |
397 | } |
398 | |