1 | #include <Storages/ColumnsDescription.h> |
2 | #include <Storages/System/StorageSystemPartsBase.h> |
3 | #include <Common/escapeForFileName.h> |
4 | #include <Columns/ColumnString.h> |
5 | #include <DataTypes/DataTypeString.h> |
6 | #include <DataTypes/DataTypesNumber.h> |
7 | #include <DataTypes/DataTypeDateTime.h> |
8 | #include <DataTypes/DataTypeDate.h> |
9 | #include <DataStreams/OneBlockInputStream.h> |
10 | #include <Storages/MergeTree/MergeTreeData.h> |
11 | #include <Storages/VirtualColumnUtils.h> |
12 | #include <Databases/IDatabase.h> |
13 | #include <Parsers/queryToString.h> |
14 | #include <Parsers/ASTIdentifier.h> |
15 | |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | namespace ErrorCodes |
21 | { |
22 | extern const int TABLE_IS_DROPPED; |
23 | } |
24 | |
25 | bool StorageSystemPartsBase::hasStateColumn(const Names & column_names) const |
26 | { |
27 | bool has_state_column = false; |
28 | Names real_column_names; |
29 | |
30 | for (const String & column_name : column_names) |
31 | { |
32 | if (column_name == "_state" ) |
33 | has_state_column = true; |
34 | else |
35 | real_column_names.emplace_back(column_name); |
36 | } |
37 | |
38 | /// Do not check if only _state column is requested |
39 | if (!(has_state_column && real_column_names.empty())) |
40 | check(real_column_names); |
41 | |
42 | return has_state_column; |
43 | } |
44 | |
45 | MergeTreeData::DataPartsVector |
46 | StoragesInfo::getParts(MergeTreeData::DataPartStateVector & state, bool has_state_column) const |
47 | { |
48 | using State = MergeTreeData::DataPartState; |
49 | if (need_inactive_parts) |
50 | { |
51 | /// If has_state_column is requested, return all states. |
52 | if (!has_state_column) |
53 | return data->getDataPartsVector({State::Committed, State::Outdated}, &state); |
54 | |
55 | return data->getAllDataPartsVector(&state); |
56 | } |
57 | |
58 | return data->getDataPartsVector({State::Committed}, &state); |
59 | } |
60 | |
61 | StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context) |
62 | : query_id(context.getCurrentQueryId()) |
63 | { |
64 | /// Will apply WHERE to subset of columns and then add more columns. |
65 | /// This is kind of complicated, but we use WHERE to do less work. |
66 | |
67 | Block block_to_filter; |
68 | |
69 | MutableColumnPtr table_column_mut = ColumnString::create(); |
70 | MutableColumnPtr engine_column_mut = ColumnString::create(); |
71 | MutableColumnPtr active_column_mut = ColumnUInt8::create(); |
72 | |
73 | { |
74 | Databases databases = context.getDatabases(); |
75 | |
76 | /// Add column 'database'. |
77 | MutableColumnPtr database_column_mut = ColumnString::create(); |
78 | for (const auto & database : databases) |
79 | { |
80 | /// Lazy database can not contain MergeTree tables |
81 | /// and it's unnecessary to load all tables of Lazy database just to filter all of them. |
82 | if (context.hasDatabaseAccessRights(database.first) && database.second->getEngineName() != "Lazy" ) |
83 | database_column_mut->insert(database.first); |
84 | } |
85 | block_to_filter.insert(ColumnWithTypeAndName( |
86 | std::move(database_column_mut), std::make_shared<DataTypeString>(), "database" )); |
87 | |
88 | /// Filter block_to_filter with column 'database'. |
89 | VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context); |
90 | rows = block_to_filter.rows(); |
91 | |
92 | /// Block contains new columns, update database_column. |
93 | ColumnPtr database_column_ = block_to_filter.getByName("database" ).column; |
94 | |
95 | if (rows) |
96 | { |
97 | /// Add columns 'table', 'engine', 'active' |
98 | |
99 | IColumn::Offsets offsets(rows); |
100 | |
101 | for (size_t i = 0; i < rows; ++i) |
102 | { |
103 | String database_name = (*database_column_)[i].get<String>(); |
104 | const DatabasePtr database = databases.at(database_name); |
105 | |
106 | offsets[i] = i ? offsets[i - 1] : 0; |
107 | for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next()) |
108 | { |
109 | String table_name = iterator->name(); |
110 | StoragePtr storage = iterator->table(); |
111 | String engine_name = storage->getName(); |
112 | |
113 | if (!dynamic_cast<MergeTreeData *>(storage.get())) |
114 | continue; |
115 | |
116 | storages[std::make_pair(database_name, iterator->name())] = storage; |
117 | |
118 | /// Add all combinations of flag 'active'. |
119 | for (UInt64 active : {0, 1}) |
120 | { |
121 | table_column_mut->insert(table_name); |
122 | engine_column_mut->insert(engine_name); |
123 | active_column_mut->insert(active); |
124 | } |
125 | |
126 | offsets[i] += 2; |
127 | } |
128 | } |
129 | |
130 | for (size_t i = 0; i < block_to_filter.columns(); ++i) |
131 | { |
132 | ColumnPtr & column = block_to_filter.safeGetByPosition(i).column; |
133 | column = column->replicate(offsets); |
134 | } |
135 | } |
136 | } |
137 | |
138 | block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table" )); |
139 | block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine" )); |
140 | block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active" )); |
141 | |
142 | if (rows) |
143 | { |
144 | /// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'. |
145 | VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context); |
146 | rows = block_to_filter.rows(); |
147 | } |
148 | |
149 | database_column = block_to_filter.getByName("database" ).column; |
150 | table_column = block_to_filter.getByName("table" ).column; |
151 | active_column = block_to_filter.getByName("active" ).column; |
152 | |
153 | next_row = 0; |
154 | } |
155 | |
156 | StoragesInfo StoragesInfoStream::next() |
157 | { |
158 | while (next_row < rows) |
159 | { |
160 | StoragesInfo info; |
161 | |
162 | info.database = (*database_column)[next_row].get<String>(); |
163 | info.table = (*table_column)[next_row].get<String>(); |
164 | |
165 | auto isSameTable = [&info, this] (size_t row) -> bool |
166 | { |
167 | return (*database_column)[row].get<String>() == info.database && |
168 | (*table_column)[row].get<String>() == info.table; |
169 | }; |
170 | |
171 | /// We may have two rows per table which differ in 'active' value. |
172 | /// If rows with 'active = 0' were not filtered out, this means we |
173 | /// must collect the inactive parts. Remember this fact in StoragesInfo. |
174 | for (; next_row < rows && isSameTable(next_row); ++next_row) |
175 | { |
176 | const auto active = (*active_column)[next_row].get<UInt64>(); |
177 | if (active == 0) |
178 | info.need_inactive_parts = true; |
179 | } |
180 | |
181 | info.storage = storages.at(std::make_pair(info.database, info.table)); |
182 | |
183 | try |
184 | { |
185 | /// For table not to be dropped and set of columns to remain constant. |
186 | info.table_lock = info.storage->lockStructureForShare(false, query_id); |
187 | } |
188 | catch (const Exception & e) |
189 | { |
190 | /** There are case when IStorage::drop was called, |
191 | * but we still own the object. |
192 | * Then table will throw exception at attempt to lock it. |
193 | * Just skip the table. |
194 | */ |
195 | if (e.code() == ErrorCodes::TABLE_IS_DROPPED) |
196 | continue; |
197 | |
198 | throw; |
199 | } |
200 | |
201 | info.engine = info.storage->getName(); |
202 | |
203 | info.data = dynamic_cast<MergeTreeData *>(info.storage.get()); |
204 | if (!info.data) |
205 | throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR); |
206 | |
207 | return info; |
208 | } |
209 | |
210 | return {}; |
211 | } |
212 | |
213 | BlockInputStreams StorageSystemPartsBase::read( |
214 | const Names & column_names, |
215 | const SelectQueryInfo & query_info, |
216 | const Context & context, |
217 | QueryProcessingStage::Enum /*processed_stage*/, |
218 | const size_t /*max_block_size*/, |
219 | const unsigned /*num_streams*/) |
220 | { |
221 | bool has_state_column = hasStateColumn(column_names); |
222 | |
223 | StoragesInfoStream stream(query_info, context); |
224 | |
225 | /// Create the result. |
226 | |
227 | MutableColumns res_columns = getSampleBlock().cloneEmptyColumns(); |
228 | if (has_state_column) |
229 | res_columns.push_back(ColumnString::create()); |
230 | |
231 | while (StoragesInfo info = stream.next()) |
232 | { |
233 | processNextStorage(res_columns, info, has_state_column); |
234 | } |
235 | |
236 | Block block = getSampleBlock(); |
237 | if (has_state_column) |
238 | block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state" )); |
239 | |
240 | return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block.cloneWithColumns(std::move(res_columns)))); |
241 | } |
242 | |
243 | NameAndTypePair StorageSystemPartsBase::getColumn(const String & column_name) const |
244 | { |
245 | if (column_name == "_state" ) |
246 | return NameAndTypePair("_state" , std::make_shared<DataTypeString>()); |
247 | |
248 | return IStorage::getColumn(column_name); |
249 | } |
250 | |
251 | bool StorageSystemPartsBase::hasColumn(const String & column_name) const |
252 | { |
253 | if (column_name == "_state" ) |
254 | return true; |
255 | |
256 | return IStorage::hasColumn(column_name); |
257 | } |
258 | |
259 | StorageSystemPartsBase::StorageSystemPartsBase(std::string name_, NamesAndTypesList && columns_) |
260 | : name(std::move(name_)) |
261 | { |
262 | ColumnsDescription tmp_columns(std::move(columns_)); |
263 | |
264 | auto add_alias = [&](const String & alias_name, const String & column_name) |
265 | { |
266 | ColumnDescription column(alias_name, tmp_columns.get(column_name).type, false); |
267 | column.default_desc.kind = ColumnDefaultKind::Alias; |
268 | column.default_desc.expression = std::make_shared<ASTIdentifier>(column_name); |
269 | tmp_columns.add(column); |
270 | }; |
271 | |
272 | /// Add aliases for old column names for backwards compatibility. |
273 | add_alias("bytes" , "bytes_on_disk" ); |
274 | add_alias("marks_size" , "marks_bytes" ); |
275 | |
276 | setColumns(tmp_columns); |
277 | } |
278 | |
279 | } |
280 | |