1#include <optional>
2#include <Storages/System/StorageSystemColumns.h>
3#include <Storages/MergeTree/MergeTreeData.h>
4#include <Columns/ColumnsNumber.h>
5#include <Columns/ColumnString.h>
6#include <DataTypes/DataTypeString.h>
7#include <DataTypes/DataTypesNumber.h>
8#include <DataStreams/NullBlockInputStream.h>
9#include <Storages/VirtualColumnUtils.h>
10#include <Parsers/queryToString.h>
11#include <Parsers/ASTSelectQuery.h>
12#include <Databases/IDatabase.h>
13
14
15namespace DB
16{
17
18namespace ErrorCodes
19{
20 extern const int LOGICAL_ERROR;
21 extern const int TABLE_IS_DROPPED;
22}
23
24StorageSystemColumns::StorageSystemColumns(const std::string & name_)
25 : name(name_)
26{
27 setColumns(ColumnsDescription(
28 {
29 { "database", std::make_shared<DataTypeString>() },
30 { "table", std::make_shared<DataTypeString>() },
31 { "name", std::make_shared<DataTypeString>() },
32 { "type", std::make_shared<DataTypeString>() },
33 { "default_kind", std::make_shared<DataTypeString>() },
34 { "default_expression", std::make_shared<DataTypeString>() },
35 { "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
36 { "data_uncompressed_bytes", std::make_shared<DataTypeUInt64>() },
37 { "marks_bytes", std::make_shared<DataTypeUInt64>() },
38 { "comment", std::make_shared<DataTypeString>() },
39 { "is_in_partition_key", std::make_shared<DataTypeUInt8>() },
40 { "is_in_sorting_key", std::make_shared<DataTypeUInt8>() },
41 { "is_in_primary_key", std::make_shared<DataTypeUInt8>() },
42 { "is_in_sampling_key", std::make_shared<DataTypeUInt8>() },
43 { "compression_codec", std::make_shared<DataTypeString>() },
44 }));
45}
46
47
48namespace
49{
50 using Storages = std::map<std::pair<std::string, std::string>, StoragePtr>;
51}
52
53
54class ColumnsBlockInputStream : public IBlockInputStream
55{
56public:
57 ColumnsBlockInputStream(
58 const std::vector<UInt8> & columns_mask_,
59 const Block & header_,
60 UInt64 max_block_size_,
61 ColumnPtr databases_,
62 ColumnPtr tables_,
63 Storages storages_,
64 String query_id_)
65 : columns_mask(columns_mask_), header(header_), max_block_size(max_block_size_)
66 , databases(databases_), tables(tables_), storages(std::move(storages_))
67 , query_id(std::move(query_id_)), total_tables(tables->size())
68 {
69 }
70
71 String getName() const override { return "Columns"; }
72 Block getHeader() const override { return header; }
73
74protected:
75 Block readImpl() override
76 {
77 if (db_table_num >= total_tables)
78 return {};
79
80 Block res = header;
81 MutableColumns res_columns = header.cloneEmptyColumns();
82 size_t rows_count = 0;
83
84 while (rows_count < max_block_size && db_table_num < total_tables)
85 {
86 const std::string database_name = (*databases)[db_table_num].get<std::string>();
87 const std::string table_name = (*tables)[db_table_num].get<std::string>();
88 ++db_table_num;
89
90 ColumnsDescription columns;
91 Names cols_required_for_partition_key;
92 Names cols_required_for_sorting_key;
93 Names cols_required_for_primary_key;
94 Names cols_required_for_sampling;
95 MergeTreeData::ColumnSizeByName column_sizes;
96
97 {
98 StoragePtr storage = storages.at(std::make_pair(database_name, table_name));
99 TableStructureReadLockHolder table_lock;
100
101 try
102 {
103 table_lock = storage->lockStructureForShare(false, query_id);
104 }
105 catch (const Exception & e)
106 {
107 /** There are case when IStorage::drop was called,
108 * but we still own the object.
109 * Then table will throw exception at attempt to lock it.
110 * Just skip the table.
111 */
112 if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
113 continue;
114 else
115 throw;
116 }
117
118 columns = storage->getColumns();
119
120 cols_required_for_partition_key = storage->getColumnsRequiredForPartitionKey();
121 cols_required_for_sorting_key = storage->getColumnsRequiredForSortingKey();
122 cols_required_for_primary_key = storage->getColumnsRequiredForPrimaryKey();
123 cols_required_for_sampling = storage->getColumnsRequiredForSampling();
124
125 column_sizes = storage->getColumnSizes();
126 }
127
128 for (const auto & column : columns)
129 {
130 if (column.is_virtual)
131 continue;
132
133 size_t src_index = 0;
134 size_t res_index = 0;
135
136 if (columns_mask[src_index++])
137 res_columns[res_index++]->insert(database_name);
138 if (columns_mask[src_index++])
139 res_columns[res_index++]->insert(table_name);
140 if (columns_mask[src_index++])
141 res_columns[res_index++]->insert(column.name);
142 if (columns_mask[src_index++])
143 res_columns[res_index++]->insert(column.type->getName());
144
145 if (column.default_desc.expression)
146 {
147 if (columns_mask[src_index++])
148 res_columns[res_index++]->insert(toString(column.default_desc.kind));
149 if (columns_mask[src_index++])
150 res_columns[res_index++]->insert(queryToString(column.default_desc.expression));
151 }
152 else
153 {
154 if (columns_mask[src_index++])
155 res_columns[res_index++]->insertDefault();
156 if (columns_mask[src_index++])
157 res_columns[res_index++]->insertDefault();
158 }
159
160 {
161 const auto it = column_sizes.find(column.name);
162 if (it == std::end(column_sizes))
163 {
164 if (columns_mask[src_index++])
165 res_columns[res_index++]->insertDefault();
166 if (columns_mask[src_index++])
167 res_columns[res_index++]->insertDefault();
168 if (columns_mask[src_index++])
169 res_columns[res_index++]->insertDefault();
170 }
171 else
172 {
173 if (columns_mask[src_index++])
174 res_columns[res_index++]->insert(it->second.data_compressed);
175 if (columns_mask[src_index++])
176 res_columns[res_index++]->insert(it->second.data_uncompressed);
177 if (columns_mask[src_index++])
178 res_columns[res_index++]->insert(it->second.marks);
179 }
180 }
181
182 if (columns_mask[src_index++])
183 res_columns[res_index++]->insert(column.comment);
184
185 {
186 auto find_in_vector = [&key = column.name](const Names& names)
187 {
188 return std::find(names.cbegin(), names.cend(), key) != names.end();
189 };
190
191 if (columns_mask[src_index++])
192 res_columns[res_index++]->insert(find_in_vector(cols_required_for_partition_key));
193 if (columns_mask[src_index++])
194 res_columns[res_index++]->insert(find_in_vector(cols_required_for_sorting_key));
195 if (columns_mask[src_index++])
196 res_columns[res_index++]->insert(find_in_vector(cols_required_for_primary_key));
197 if (columns_mask[src_index++])
198 res_columns[res_index++]->insert(find_in_vector(cols_required_for_sampling));
199 }
200
201 if (columns_mask[src_index++])
202 {
203 if (column.codec)
204 res_columns[res_index++]->insert("CODEC(" + column.codec->getCodecDesc() + ")");
205 else
206 res_columns[res_index++]->insertDefault();
207 }
208
209 ++rows_count;
210 }
211 }
212
213 res.setColumns(std::move(res_columns));
214 return res;
215 }
216
217private:
218 std::vector<UInt8> columns_mask;
219 Block header;
220 UInt64 max_block_size;
221 ColumnPtr databases;
222 ColumnPtr tables;
223 Storages storages;
224 String query_id;
225 size_t db_table_num = 0;
226 size_t total_tables;
227};
228
229
230BlockInputStreams StorageSystemColumns::read(
231 const Names & column_names,
232 const SelectQueryInfo & query_info,
233 const Context & context,
234 QueryProcessingStage::Enum /*processed_stage*/,
235 const size_t max_block_size,
236 const unsigned /*num_streams*/)
237{
238 check(column_names);
239
240 /// Create a mask of what columns are needed in the result.
241
242 NameSet names_set(column_names.begin(), column_names.end());
243
244 Block sample_block = getSampleBlock();
245 Block res_block;
246
247 std::vector<UInt8> columns_mask(sample_block.columns());
248 for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
249 {
250 if (names_set.count(sample_block.getByPosition(i).name))
251 {
252 columns_mask[i] = 1;
253 res_block.insert(sample_block.getByPosition(i));
254 }
255 }
256
257 Block block_to_filter;
258 Storages storages;
259
260 {
261 Databases databases = context.getDatabases();
262
263 /// Add `database` column.
264 MutableColumnPtr database_column_mut = ColumnString::create();
265 for (const auto & database : databases)
266 {
267 /// We are skipping "Lazy" database because we cannot afford initialization of all its tables.
268 /// This should be documented.
269
270 if (context.hasDatabaseAccessRights(database.first)
271 && database.second->getEngineName() != "Lazy")
272 database_column_mut->insert(database.first);
273 }
274
275 block_to_filter.insert(ColumnWithTypeAndName(std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
276
277 /// Filter block with `database` column.
278 VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
279
280 if (!block_to_filter.rows())
281 return {std::make_shared<NullBlockInputStream>(res_block)};
282
283 ColumnPtr & database_column = block_to_filter.getByName("database").column;
284 size_t rows = database_column->size();
285
286 /// Add `table` column.
287 MutableColumnPtr table_column_mut = ColumnString::create();
288 IColumn::Offsets offsets(rows);
289 for (size_t i = 0; i < rows; ++i)
290 {
291 const std::string database_name = (*database_column)[i].get<std::string>();
292 const DatabasePtr database = databases.at(database_name);
293 offsets[i] = i ? offsets[i - 1] : 0;
294
295 for (auto iterator = database->getTablesWithDictionaryTablesIterator(context); iterator->isValid(); iterator->next())
296 {
297 const String & table_name = iterator->name();
298 storages.emplace(std::piecewise_construct,
299 std::forward_as_tuple(database_name, table_name),
300 std::forward_as_tuple(iterator->table()));
301 table_column_mut->insert(table_name);
302 ++offsets[i];
303 }
304 }
305
306 database_column = database_column->replicate(offsets);
307 block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
308 }
309
310 /// Filter block with `database` and `table` columns.
311 VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
312
313 if (!block_to_filter.rows())
314 return {std::make_shared<NullBlockInputStream>(res_block)};
315
316 ColumnPtr filtered_database_column = block_to_filter.getByName("database").column;
317 ColumnPtr filtered_table_column = block_to_filter.getByName("table").column;
318
319 return {std::make_shared<ColumnsBlockInputStream>(
320 std::move(columns_mask), std::move(res_block), max_block_size,
321 std::move(filtered_database_column), std::move(filtered_table_column), std::move(storages),
322 context.getCurrentQueryId())};
323}
324
325}
326