1 | #include <Columns/ColumnString.h> |
2 | #include <Columns/ColumnsNumber.h> |
3 | #include <DataTypes/DataTypeString.h> |
4 | #include <DataTypes/DataTypeDateTime.h> |
5 | #include <DataStreams/OneBlockInputStream.h> |
6 | #include <Storages/System/StorageSystemTables.h> |
7 | #include <Storages/VirtualColumnUtils.h> |
8 | #include <Databases/IDatabase.h> |
9 | #include <Interpreters/Context.h> |
10 | #include <Parsers/ASTCreateQuery.h> |
11 | #include <Parsers/queryToString.h> |
12 | #include <Common/typeid_cast.h> |
13 | #include <Common/StringUtils/StringUtils.h> |
14 | #include <DataTypes/DataTypesNumber.h> |
15 | #include <DataTypes/DataTypeArray.h> |
16 | |
17 | |
18 | namespace DB |
19 | { |
20 | |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int CANNOT_GET_CREATE_TABLE_QUERY; |
24 | extern const int TABLE_IS_DROPPED; |
25 | } |
26 | |
27 | |
28 | StorageSystemTables::StorageSystemTables(const std::string & name_) |
29 | : name(name_) |
30 | { |
31 | setColumns(ColumnsDescription( |
32 | { |
33 | {"database" , std::make_shared<DataTypeString>()}, |
34 | {"name" , std::make_shared<DataTypeString>()}, |
35 | {"engine" , std::make_shared<DataTypeString>()}, |
36 | {"is_temporary" , std::make_shared<DataTypeUInt8>()}, |
37 | {"data_paths" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, |
38 | {"metadata_path" , std::make_shared<DataTypeString>()}, |
39 | {"metadata_modification_time" , std::make_shared<DataTypeDateTime>()}, |
40 | {"dependencies_database" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, |
41 | {"dependencies_table" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, |
42 | {"create_table_query" , std::make_shared<DataTypeString>()}, |
43 | {"engine_full" , std::make_shared<DataTypeString>()}, |
44 | {"partition_key" , std::make_shared<DataTypeString>()}, |
45 | {"sorting_key" , std::make_shared<DataTypeString>()}, |
46 | {"primary_key" , std::make_shared<DataTypeString>()}, |
47 | {"sampling_key" , std::make_shared<DataTypeString>()}, |
48 | {"storage_policy" , std::make_shared<DataTypeString>()}, |
49 | })); |
50 | } |
51 | |
52 | |
53 | static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & context) |
54 | { |
55 | MutableColumnPtr column = ColumnString::create(); |
56 | for (const auto & db : context.getDatabases()) |
57 | column->insert(db.first); |
58 | |
59 | Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database" ) }; |
60 | VirtualColumnUtils::filterBlockWithQuery(query, block, context); |
61 | return block.getByPosition(0).column; |
62 | } |
63 | |
64 | /// Avoid heavy operation on tables if we only queried columns that we can get without table object. |
65 | /// Otherwise it will require table initialization for Lazy database. |
66 | static bool needLockStructure(const DatabasePtr & database, const Block & ) |
67 | { |
68 | if (database->getEngineName() != "Lazy" ) |
69 | return true; |
70 | |
71 | static const std::set<std::string> columns_without_lock = { "database" , "name" , "metadata_modification_time" }; |
72 | for (const auto & column : header.getColumnsWithTypeAndName()) |
73 | { |
74 | if (columns_without_lock.find(column.name) == columns_without_lock.end()) |
75 | return true; |
76 | } |
77 | return false; |
78 | } |
79 | |
80 | class TablesBlockInputStream : public IBlockInputStream |
81 | { |
82 | public: |
83 | TablesBlockInputStream( |
84 | std::vector<UInt8> columns_mask_, |
85 | Block , |
86 | UInt64 max_block_size_, |
87 | ColumnPtr databases_, |
88 | const Context & context_) |
89 | : columns_mask(std::move(columns_mask_)) |
90 | , header(std::move(header_)) |
91 | , max_block_size(max_block_size_) |
92 | , databases(std::move(databases_)) |
93 | , context(context_) {} |
94 | |
95 | String getName() const override { return "Tables" ; } |
96 | Block () const override { return header; } |
97 | |
98 | protected: |
99 | Block readImpl() override |
100 | { |
101 | if (done) |
102 | return {}; |
103 | |
104 | Block res = header; |
105 | MutableColumns res_columns = header.cloneEmptyColumns(); |
106 | |
107 | size_t rows_count = 0; |
108 | while (rows_count < max_block_size) |
109 | { |
110 | if (tables_it && !tables_it->isValid()) |
111 | ++database_idx; |
112 | |
113 | while (database_idx < databases->size() && (!tables_it || !tables_it->isValid())) |
114 | { |
115 | database_name = databases->getDataAt(database_idx).toString(); |
116 | database = context.tryGetDatabase(database_name); |
117 | |
118 | if (!database || !context.hasDatabaseAccessRights(database_name)) |
119 | { |
120 | /// Database was deleted just now or the user has no access. |
121 | ++database_idx; |
122 | continue; |
123 | } |
124 | |
125 | break; |
126 | } |
127 | |
128 | /// This is for temporary tables. They are output in single block regardless to max_block_size. |
129 | if (database_idx >= databases->size()) |
130 | { |
131 | if (context.hasSessionContext()) |
132 | { |
133 | Tables external_tables = context.getSessionContext().getExternalTables(); |
134 | |
135 | for (auto table : external_tables) |
136 | { |
137 | size_t src_index = 0; |
138 | size_t res_index = 0; |
139 | |
140 | if (columns_mask[src_index++]) |
141 | res_columns[res_index++]->insertDefault(); |
142 | |
143 | if (columns_mask[src_index++]) |
144 | res_columns[res_index++]->insert(table.first); |
145 | |
146 | if (columns_mask[src_index++]) |
147 | res_columns[res_index++]->insert(table.second->getName()); |
148 | |
149 | if (columns_mask[src_index++]) |
150 | res_columns[res_index++]->insert(1u); |
151 | |
152 | if (columns_mask[src_index++]) |
153 | res_columns[res_index++]->insertDefault(); |
154 | |
155 | if (columns_mask[src_index++]) |
156 | res_columns[res_index++]->insertDefault(); |
157 | |
158 | if (columns_mask[src_index++]) |
159 | res_columns[res_index++]->insertDefault(); |
160 | |
161 | if (columns_mask[src_index++]) |
162 | res_columns[res_index++]->insertDefault(); |
163 | |
164 | if (columns_mask[src_index++]) |
165 | res_columns[res_index++]->insertDefault(); |
166 | |
167 | if (columns_mask[src_index++]) |
168 | res_columns[res_index++]->insertDefault(); |
169 | |
170 | if (columns_mask[src_index++]) |
171 | res_columns[res_index++]->insert(table.second->getName()); |
172 | |
173 | if (columns_mask[src_index++]) |
174 | res_columns[res_index++]->insertDefault(); |
175 | |
176 | if (columns_mask[src_index++]) |
177 | res_columns[res_index++]->insertDefault(); |
178 | |
179 | if (columns_mask[src_index++]) |
180 | res_columns[res_index++]->insertDefault(); |
181 | |
182 | if (columns_mask[src_index++]) |
183 | res_columns[res_index++]->insertDefault(); |
184 | |
185 | if (columns_mask[src_index++]) |
186 | res_columns[res_index++]->insertDefault(); |
187 | } |
188 | } |
189 | |
190 | res.setColumns(std::move(res_columns)); |
191 | done = true; |
192 | return res; |
193 | } |
194 | |
195 | if (!tables_it || !tables_it->isValid()) |
196 | tables_it = database->getTablesWithDictionaryTablesIterator(context); |
197 | |
198 | const bool need_lock_structure = needLockStructure(database, header); |
199 | |
200 | for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next()) |
201 | { |
202 | auto table_name = tables_it->name(); |
203 | StoragePtr table = nullptr; |
204 | |
205 | TableStructureReadLockHolder lock; |
206 | |
207 | try |
208 | { |
209 | if (need_lock_structure) |
210 | { |
211 | if (!table) |
212 | table = tables_it->table(); |
213 | lock = table->lockStructureForShare(false, context.getCurrentQueryId()); |
214 | } |
215 | } |
216 | catch (const Exception & e) |
217 | { |
218 | if (e.code() == ErrorCodes::TABLE_IS_DROPPED) |
219 | continue; |
220 | throw; |
221 | } |
222 | |
223 | ++rows_count; |
224 | |
225 | size_t src_index = 0; |
226 | size_t res_index = 0; |
227 | |
228 | if (columns_mask[src_index++]) |
229 | res_columns[res_index++]->insert(database_name); |
230 | |
231 | if (columns_mask[src_index++]) |
232 | res_columns[res_index++]->insert(table_name); |
233 | |
234 | if (columns_mask[src_index++]) |
235 | { |
236 | if (!table) |
237 | table = tables_it->table(); |
238 | res_columns[res_index++]->insert(table->getName()); |
239 | } |
240 | |
241 | if (columns_mask[src_index++]) |
242 | res_columns[res_index++]->insert(0u); // is_temporary |
243 | |
244 | if (columns_mask[src_index++]) |
245 | { |
246 | if (!table) |
247 | table = tables_it->table(); |
248 | |
249 | Array table_paths_array; |
250 | auto paths = table->getDataPaths(); |
251 | table_paths_array.reserve(paths.size()); |
252 | for (const String & path : paths) |
253 | table_paths_array.push_back(path); |
254 | res_columns[res_index++]->insert(table_paths_array); |
255 | } |
256 | |
257 | if (columns_mask[src_index++]) |
258 | res_columns[res_index++]->insert(database->getObjectMetadataPath(table_name)); |
259 | |
260 | if (columns_mask[src_index++]) |
261 | res_columns[res_index++]->insert(static_cast<UInt64>(database->getObjectMetadataModificationTime(table_name))); |
262 | |
263 | { |
264 | Array dependencies_table_name_array; |
265 | Array dependencies_database_name_array; |
266 | if (columns_mask[src_index] || columns_mask[src_index + 1]) |
267 | { |
268 | const auto dependencies = context.getDependencies(database_name, table_name); |
269 | |
270 | dependencies_table_name_array.reserve(dependencies.size()); |
271 | dependencies_database_name_array.reserve(dependencies.size()); |
272 | for (const auto & dependency : dependencies) |
273 | { |
274 | dependencies_table_name_array.push_back(dependency.second); |
275 | dependencies_database_name_array.push_back(dependency.first); |
276 | } |
277 | } |
278 | |
279 | if (columns_mask[src_index++]) |
280 | res_columns[res_index++]->insert(dependencies_database_name_array); |
281 | |
282 | if (columns_mask[src_index++]) |
283 | res_columns[res_index++]->insert(dependencies_table_name_array); |
284 | } |
285 | |
286 | if (columns_mask[src_index] || columns_mask[src_index + 1]) |
287 | { |
288 | ASTPtr ast = database->tryGetCreateTableQuery(context, table_name); |
289 | |
290 | if (columns_mask[src_index++]) |
291 | res_columns[res_index++]->insert(ast ? queryToString(ast) : "" ); |
292 | |
293 | if (columns_mask[src_index++]) |
294 | { |
295 | String engine_full; |
296 | |
297 | if (ast) |
298 | { |
299 | const auto & ast_create = ast->as<ASTCreateQuery &>(); |
300 | if (ast_create.storage) |
301 | { |
302 | engine_full = queryToString(*ast_create.storage); |
303 | |
304 | static const char * const = " ENGINE = " ; |
305 | if (startsWith(engine_full, extra_head)) |
306 | engine_full = engine_full.substr(strlen(extra_head)); |
307 | } |
308 | } |
309 | |
310 | res_columns[res_index++]->insert(engine_full); |
311 | } |
312 | } |
313 | else |
314 | src_index += 2; |
315 | |
316 | ASTPtr expression_ptr; |
317 | if (columns_mask[src_index++]) |
318 | { |
319 | if (!table) |
320 | table = tables_it->table(); |
321 | if ((expression_ptr = table->getPartitionKeyAST())) |
322 | res_columns[res_index++]->insert(queryToString(expression_ptr)); |
323 | else |
324 | res_columns[res_index++]->insertDefault(); |
325 | } |
326 | |
327 | if (columns_mask[src_index++]) |
328 | { |
329 | if (!table) |
330 | table = tables_it->table(); |
331 | if ((expression_ptr = table->getSortingKeyAST())) |
332 | res_columns[res_index++]->insert(queryToString(expression_ptr)); |
333 | else |
334 | res_columns[res_index++]->insertDefault(); |
335 | } |
336 | |
337 | if (columns_mask[src_index++]) |
338 | { |
339 | if (!table) |
340 | table = tables_it->table(); |
341 | if ((expression_ptr = table->getPrimaryKeyAST())) |
342 | res_columns[res_index++]->insert(queryToString(expression_ptr)); |
343 | else |
344 | res_columns[res_index++]->insertDefault(); |
345 | } |
346 | |
347 | if (columns_mask[src_index++]) |
348 | { |
349 | if (!table) |
350 | table = tables_it->table(); |
351 | if ((expression_ptr = table->getSamplingKeyAST())) |
352 | res_columns[res_index++]->insert(queryToString(expression_ptr)); |
353 | else |
354 | res_columns[res_index++]->insertDefault(); |
355 | } |
356 | |
357 | if (columns_mask[src_index++]) |
358 | { |
359 | if (!table) |
360 | table = tables_it->table(); |
361 | auto policy = table->getStoragePolicy(); |
362 | if (policy) |
363 | res_columns[res_index++]->insert(policy->getName()); |
364 | else |
365 | res_columns[res_index++]->insertDefault(); |
366 | } |
367 | } |
368 | } |
369 | |
370 | res.setColumns(std::move(res_columns)); |
371 | return res; |
372 | } |
373 | private: |
374 | std::vector<UInt8> columns_mask; |
375 | Block ; |
376 | UInt64 max_block_size; |
377 | ColumnPtr databases; |
378 | size_t database_idx = 0; |
379 | DatabaseTablesIteratorPtr tables_it; |
380 | const Context context; |
381 | bool done = false; |
382 | DatabasePtr database; |
383 | std::string database_name; |
384 | }; |
385 | |
386 | |
387 | BlockInputStreams StorageSystemTables::read( |
388 | const Names & column_names, |
389 | const SelectQueryInfo & query_info, |
390 | const Context & context, |
391 | QueryProcessingStage::Enum /*processed_stage*/, |
392 | const size_t max_block_size, |
393 | const unsigned /*num_streams*/) |
394 | { |
395 | check(column_names); |
396 | |
397 | /// Create a mask of what columns are needed in the result. |
398 | |
399 | NameSet names_set(column_names.begin(), column_names.end()); |
400 | |
401 | Block sample_block = getSampleBlock(); |
402 | Block res_block; |
403 | |
404 | std::vector<UInt8> columns_mask(sample_block.columns()); |
405 | for (size_t i = 0, size = columns_mask.size(); i < size; ++i) |
406 | { |
407 | if (names_set.count(sample_block.getByPosition(i).name)) |
408 | { |
409 | columns_mask[i] = 1; |
410 | res_block.insert(sample_block.getByPosition(i)); |
411 | } |
412 | } |
413 | |
414 | ColumnPtr filtered_databases_column = getFilteredDatabases(query_info.query, context); |
415 | return {std::make_shared<TablesBlockInputStream>( |
416 | std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_databases_column), context)}; |
417 | } |
418 | |
419 | } |
420 | |