1#include <Storages/ColumnsDescription.h>
2#include <Storages/System/StorageSystemPartsBase.h>
3#include <Common/escapeForFileName.h>
4#include <Columns/ColumnString.h>
5#include <DataTypes/DataTypeString.h>
6#include <DataTypes/DataTypesNumber.h>
7#include <DataTypes/DataTypeDateTime.h>
8#include <DataTypes/DataTypeDate.h>
9#include <DataStreams/OneBlockInputStream.h>
10#include <Storages/MergeTree/MergeTreeData.h>
11#include <Storages/VirtualColumnUtils.h>
12#include <Databases/IDatabase.h>
13#include <Parsers/queryToString.h>
14#include <Parsers/ASTIdentifier.h>
15
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22 extern const int TABLE_IS_DROPPED;
23}
24
25bool StorageSystemPartsBase::hasStateColumn(const Names & column_names) const
26{
27 bool has_state_column = false;
28 Names real_column_names;
29
30 for (const String & column_name : column_names)
31 {
32 if (column_name == "_state")
33 has_state_column = true;
34 else
35 real_column_names.emplace_back(column_name);
36 }
37
38 /// Do not check if only _state column is requested
39 if (!(has_state_column && real_column_names.empty()))
40 check(real_column_names);
41
42 return has_state_column;
43}
44
45MergeTreeData::DataPartsVector
46StoragesInfo::getParts(MergeTreeData::DataPartStateVector & state, bool has_state_column) const
47{
48 using State = MergeTreeData::DataPartState;
49 if (need_inactive_parts)
50 {
51 /// If has_state_column is requested, return all states.
52 if (!has_state_column)
53 return data->getDataPartsVector({State::Committed, State::Outdated}, &state);
54
55 return data->getAllDataPartsVector(&state);
56 }
57
58 return data->getDataPartsVector({State::Committed}, &state);
59}
60
61StoragesInfoStream::StoragesInfoStream(const SelectQueryInfo & query_info, const Context & context)
62 : query_id(context.getCurrentQueryId())
63{
64 /// Will apply WHERE to subset of columns and then add more columns.
65 /// This is kind of complicated, but we use WHERE to do less work.
66
67 Block block_to_filter;
68
69 MutableColumnPtr table_column_mut = ColumnString::create();
70 MutableColumnPtr engine_column_mut = ColumnString::create();
71 MutableColumnPtr active_column_mut = ColumnUInt8::create();
72
73 {
74 Databases databases = context.getDatabases();
75
76 /// Add column 'database'.
77 MutableColumnPtr database_column_mut = ColumnString::create();
78 for (const auto & database : databases)
79 {
80 /// Lazy database can not contain MergeTree tables
81 /// and it's unnecessary to load all tables of Lazy database just to filter all of them.
82 if (context.hasDatabaseAccessRights(database.first) && database.second->getEngineName() != "Lazy")
83 database_column_mut->insert(database.first);
84 }
85 block_to_filter.insert(ColumnWithTypeAndName(
86 std::move(database_column_mut), std::make_shared<DataTypeString>(), "database"));
87
88 /// Filter block_to_filter with column 'database'.
89 VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
90 rows = block_to_filter.rows();
91
92 /// Block contains new columns, update database_column.
93 ColumnPtr database_column_ = block_to_filter.getByName("database").column;
94
95 if (rows)
96 {
97 /// Add columns 'table', 'engine', 'active'
98
99 IColumn::Offsets offsets(rows);
100
101 for (size_t i = 0; i < rows; ++i)
102 {
103 String database_name = (*database_column_)[i].get<String>();
104 const DatabasePtr database = databases.at(database_name);
105
106 offsets[i] = i ? offsets[i - 1] : 0;
107 for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
108 {
109 String table_name = iterator->name();
110 StoragePtr storage = iterator->table();
111 String engine_name = storage->getName();
112
113 if (!dynamic_cast<MergeTreeData *>(storage.get()))
114 continue;
115
116 storages[std::make_pair(database_name, iterator->name())] = storage;
117
118 /// Add all combinations of flag 'active'.
119 for (UInt64 active : {0, 1})
120 {
121 table_column_mut->insert(table_name);
122 engine_column_mut->insert(engine_name);
123 active_column_mut->insert(active);
124 }
125
126 offsets[i] += 2;
127 }
128 }
129
130 for (size_t i = 0; i < block_to_filter.columns(); ++i)
131 {
132 ColumnPtr & column = block_to_filter.safeGetByPosition(i).column;
133 column = column->replicate(offsets);
134 }
135 }
136 }
137
138 block_to_filter.insert(ColumnWithTypeAndName(std::move(table_column_mut), std::make_shared<DataTypeString>(), "table"));
139 block_to_filter.insert(ColumnWithTypeAndName(std::move(engine_column_mut), std::make_shared<DataTypeString>(), "engine"));
140 block_to_filter.insert(ColumnWithTypeAndName(std::move(active_column_mut), std::make_shared<DataTypeUInt8>(), "active"));
141
142 if (rows)
143 {
144 /// Filter block_to_filter with columns 'database', 'table', 'engine', 'active'.
145 VirtualColumnUtils::filterBlockWithQuery(query_info.query, block_to_filter, context);
146 rows = block_to_filter.rows();
147 }
148
149 database_column = block_to_filter.getByName("database").column;
150 table_column = block_to_filter.getByName("table").column;
151 active_column = block_to_filter.getByName("active").column;
152
153 next_row = 0;
154}
155
156StoragesInfo StoragesInfoStream::next()
157{
158 while (next_row < rows)
159 {
160 StoragesInfo info;
161
162 info.database = (*database_column)[next_row].get<String>();
163 info.table = (*table_column)[next_row].get<String>();
164
165 auto isSameTable = [&info, this] (size_t row) -> bool
166 {
167 return (*database_column)[row].get<String>() == info.database &&
168 (*table_column)[row].get<String>() == info.table;
169 };
170
171 /// We may have two rows per table which differ in 'active' value.
172 /// If rows with 'active = 0' were not filtered out, this means we
173 /// must collect the inactive parts. Remember this fact in StoragesInfo.
174 for (; next_row < rows && isSameTable(next_row); ++next_row)
175 {
176 const auto active = (*active_column)[next_row].get<UInt64>();
177 if (active == 0)
178 info.need_inactive_parts = true;
179 }
180
181 info.storage = storages.at(std::make_pair(info.database, info.table));
182
183 try
184 {
185 /// For table not to be dropped and set of columns to remain constant.
186 info.table_lock = info.storage->lockStructureForShare(false, query_id);
187 }
188 catch (const Exception & e)
189 {
190 /** There are case when IStorage::drop was called,
191 * but we still own the object.
192 * Then table will throw exception at attempt to lock it.
193 * Just skip the table.
194 */
195 if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
196 continue;
197
198 throw;
199 }
200
201 info.engine = info.storage->getName();
202
203 info.data = dynamic_cast<MergeTreeData *>(info.storage.get());
204 if (!info.data)
205 throw Exception("Unknown engine " + info.engine, ErrorCodes::LOGICAL_ERROR);
206
207 return info;
208 }
209
210 return {};
211}
212
213BlockInputStreams StorageSystemPartsBase::read(
214 const Names & column_names,
215 const SelectQueryInfo & query_info,
216 const Context & context,
217 QueryProcessingStage::Enum /*processed_stage*/,
218 const size_t /*max_block_size*/,
219 const unsigned /*num_streams*/)
220{
221 bool has_state_column = hasStateColumn(column_names);
222
223 StoragesInfoStream stream(query_info, context);
224
225 /// Create the result.
226
227 MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
228 if (has_state_column)
229 res_columns.push_back(ColumnString::create());
230
231 while (StoragesInfo info = stream.next())
232 {
233 processNextStorage(res_columns, info, has_state_column);
234 }
235
236 Block block = getSampleBlock();
237 if (has_state_column)
238 block.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));
239
240 return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(block.cloneWithColumns(std::move(res_columns))));
241}
242
243NameAndTypePair StorageSystemPartsBase::getColumn(const String & column_name) const
244{
245 if (column_name == "_state")
246 return NameAndTypePair("_state", std::make_shared<DataTypeString>());
247
248 return IStorage::getColumn(column_name);
249}
250
251bool StorageSystemPartsBase::hasColumn(const String & column_name) const
252{
253 if (column_name == "_state")
254 return true;
255
256 return IStorage::hasColumn(column_name);
257}
258
259StorageSystemPartsBase::StorageSystemPartsBase(std::string name_, NamesAndTypesList && columns_)
260 : name(std::move(name_))
261{
262 ColumnsDescription tmp_columns(std::move(columns_));
263
264 auto add_alias = [&](const String & alias_name, const String & column_name)
265 {
266 ColumnDescription column(alias_name, tmp_columns.get(column_name).type, false);
267 column.default_desc.kind = ColumnDefaultKind::Alias;
268 column.default_desc.expression = std::make_shared<ASTIdentifier>(column_name);
269 tmp_columns.add(column);
270 };
271
272 /// Add aliases for old column names for backwards compatibility.
273 add_alias("bytes", "bytes_on_disk");
274 add_alias("marks_size", "marks_bytes");
275
276 setColumns(tmp_columns);
277}
278
279}
280