1#include <Storages/System/StorageSystemMutations.h>
2#include <DataTypes/DataTypeString.h>
3#include <DataTypes/DataTypesNumber.h>
4#include <DataTypes/DataTypeDateTime.h>
5#include <DataTypes/DataTypeArray.h>
6#include <DataStreams/OneBlockInputStream.h>
7#include <Storages/MergeTree/MergeTreeData.h>
8#include <Storages/MergeTree/MergeTreeMutationStatus.h>
9#include <Storages/VirtualColumnUtils.h>
10#include <Databases/IDatabase.h>
11
12
13namespace DB
14{
15
16
17NamesAndTypesList StorageSystemMutations::getNamesAndTypes()
18{
19 return {
20 { "database", std::make_shared<DataTypeString>() },
21 { "table", std::make_shared<DataTypeString>() },
22 { "mutation_id", std::make_shared<DataTypeString>() },
23 { "command", std::make_shared<DataTypeString>() },
24 { "create_time", std::make_shared<DataTypeDateTime>() },
25 { "block_numbers.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
26 { "block_numbers.number", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>()) },
27 { "parts_to_do_names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
28 { "parts_to_do", std::make_shared<DataTypeInt64>() },
29 { "is_done", std::make_shared<DataTypeUInt8>() },
30 { "latest_failed_part", std::make_shared<DataTypeString>() },
31 { "latest_fail_time", std::make_shared<DataTypeDateTime>() },
32 { "latest_fail_reason", std::make_shared<DataTypeString>() },
33 };
34}
35
36
37void StorageSystemMutations::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
38{
39 /// Collect a set of *MergeTree tables.
40 std::map<String, std::map<String, StoragePtr>> merge_tree_tables;
41 for (const auto & db : context.getDatabases())
42 {
43 /// Lazy database can not contain MergeTree tables
44 if (db.second->getEngineName() == "Lazy")
45 continue;
46 if (context.hasDatabaseAccessRights(db.first))
47 for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
48 if (dynamic_cast<const MergeTreeData *>(iterator->table().get()))
49 merge_tree_tables[db.first][iterator->name()] = iterator->table();
50 }
51
52 MutableColumnPtr col_database_mut = ColumnString::create();
53 MutableColumnPtr col_table_mut = ColumnString::create();
54
55 for (auto & db : merge_tree_tables)
56 {
57 for (auto & table : db.second)
58 {
59 col_database_mut->insert(db.first);
60 col_table_mut->insert(table.first);
61 }
62 }
63
64 ColumnPtr col_database = std::move(col_database_mut);
65 ColumnPtr col_table = std::move(col_table_mut);
66
67 /// Determine what tables are needed by the conditions in the query.
68 {
69 Block filtered_block
70 {
71 { col_database, std::make_shared<DataTypeString>(), "database" },
72 { col_table, std::make_shared<DataTypeString>(), "table" },
73 };
74
75 VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context);
76
77 if (!filtered_block.rows())
78 return;
79
80 col_database = filtered_block.getByName("database").column;
81 col_table = filtered_block.getByName("table").column;
82 }
83
84 for (size_t i_storage = 0; i_storage < col_database->size(); ++i_storage)
85 {
86 auto database = (*col_database)[i_storage].safeGet<String>();
87 auto table = (*col_table)[i_storage].safeGet<String>();
88
89 std::vector<MergeTreeMutationStatus> statuses;
90 {
91 const IStorage * storage = merge_tree_tables[database][table].get();
92 if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage))
93 statuses = merge_tree->getMutationsStatus();
94 }
95
96 for (const MergeTreeMutationStatus & status : statuses)
97 {
98 Array block_partition_ids;
99 block_partition_ids.reserve(status.block_numbers.size());
100 Array block_numbers;
101 block_numbers.reserve(status.block_numbers.size());
102 for (const auto & pair : status.block_numbers)
103 {
104 block_partition_ids.emplace_back(pair.first);
105 block_numbers.emplace_back(pair.second);
106 }
107 Array parts_to_do_names;
108 parts_to_do_names.reserve(status.parts_to_do_names.size());
109 for (const String & part_name : status.parts_to_do_names)
110 parts_to_do_names.emplace_back(part_name);
111
112 size_t col_num = 0;
113 res_columns[col_num++]->insert(database);
114 res_columns[col_num++]->insert(table);
115
116 res_columns[col_num++]->insert(status.id);
117 res_columns[col_num++]->insert(status.command);
118 res_columns[col_num++]->insert(UInt64(status.create_time));
119 res_columns[col_num++]->insert(block_partition_ids);
120 res_columns[col_num++]->insert(block_numbers);
121 res_columns[col_num++]->insert(parts_to_do_names);
122 res_columns[col_num++]->insert(parts_to_do_names.size());
123 res_columns[col_num++]->insert(status.is_done);
124 res_columns[col_num++]->insert(status.latest_failed_part);
125 res_columns[col_num++]->insert(UInt64(status.latest_fail_time));
126 res_columns[col_num++]->insert(status.latest_fail_reason);
127 }
128 }
129}
130
131}
132