| 1 | #include <Storages/System/StorageSystemMutations.h> | 
|---|
| 2 | #include <DataTypes/DataTypeString.h> | 
|---|
| 3 | #include <DataTypes/DataTypesNumber.h> | 
|---|
| 4 | #include <DataTypes/DataTypeDateTime.h> | 
|---|
| 5 | #include <DataTypes/DataTypeArray.h> | 
|---|
| 6 | #include <DataStreams/OneBlockInputStream.h> | 
|---|
| 7 | #include <Storages/MergeTree/MergeTreeData.h> | 
|---|
| 8 | #include <Storages/MergeTree/MergeTreeMutationStatus.h> | 
|---|
| 9 | #include <Storages/VirtualColumnUtils.h> | 
|---|
| 10 | #include <Databases/IDatabase.h> | 
|---|
| 11 |  | 
|---|
| 12 |  | 
|---|
| 13 | namespace DB | 
|---|
| 14 | { | 
|---|
| 15 |  | 
|---|
| 16 |  | 
|---|
| 17 | NamesAndTypesList StorageSystemMutations::getNamesAndTypes() | 
|---|
| 18 | { | 
|---|
| 19 | return { | 
|---|
| 20 | { "database",                   std::make_shared<DataTypeString>() }, | 
|---|
| 21 | { "table",                      std::make_shared<DataTypeString>() }, | 
|---|
| 22 | { "mutation_id",                std::make_shared<DataTypeString>() }, | 
|---|
| 23 | { "command",                    std::make_shared<DataTypeString>() }, | 
|---|
| 24 | { "create_time",                std::make_shared<DataTypeDateTime>() }, | 
|---|
| 25 | { "block_numbers.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) }, | 
|---|
| 26 | { "block_numbers.number",       std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>()) }, | 
|---|
| 27 | { "parts_to_do_names",          std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) }, | 
|---|
| 28 | { "parts_to_do",                std::make_shared<DataTypeInt64>() }, | 
|---|
| 29 | { "is_done",                    std::make_shared<DataTypeUInt8>() }, | 
|---|
| 30 | { "latest_failed_part",         std::make_shared<DataTypeString>() }, | 
|---|
| 31 | { "latest_fail_time",           std::make_shared<DataTypeDateTime>() }, | 
|---|
| 32 | { "latest_fail_reason",         std::make_shared<DataTypeString>() }, | 
|---|
| 33 | }; | 
|---|
| 34 | } | 
|---|
| 35 |  | 
|---|
| 36 |  | 
|---|
| 37 | void StorageSystemMutations::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const | 
|---|
| 38 | { | 
|---|
| 39 | /// Collect a set of *MergeTree tables. | 
|---|
| 40 | std::map<String, std::map<String, StoragePtr>> merge_tree_tables; | 
|---|
| 41 | for (const auto & db : context.getDatabases()) | 
|---|
| 42 | { | 
|---|
| 43 | /// Lazy database can not contain MergeTree tables | 
|---|
| 44 | if (db.second->getEngineName() == "Lazy") | 
|---|
| 45 | continue; | 
|---|
| 46 | if (context.hasDatabaseAccessRights(db.first)) | 
|---|
| 47 | for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) | 
|---|
| 48 | if (dynamic_cast<const MergeTreeData *>(iterator->table().get())) | 
|---|
| 49 | merge_tree_tables[db.first][iterator->name()] = iterator->table(); | 
|---|
| 50 | } | 
|---|
| 51 |  | 
|---|
| 52 | MutableColumnPtr col_database_mut = ColumnString::create(); | 
|---|
| 53 | MutableColumnPtr col_table_mut = ColumnString::create(); | 
|---|
| 54 |  | 
|---|
| 55 | for (auto & db : merge_tree_tables) | 
|---|
| 56 | { | 
|---|
| 57 | for (auto & table : db.second) | 
|---|
| 58 | { | 
|---|
| 59 | col_database_mut->insert(db.first); | 
|---|
| 60 | col_table_mut->insert(table.first); | 
|---|
| 61 | } | 
|---|
| 62 | } | 
|---|
| 63 |  | 
|---|
| 64 | ColumnPtr col_database = std::move(col_database_mut); | 
|---|
| 65 | ColumnPtr col_table = std::move(col_table_mut); | 
|---|
| 66 |  | 
|---|
| 67 | /// Determine what tables are needed by the conditions in the query. | 
|---|
| 68 | { | 
|---|
| 69 | Block filtered_block | 
|---|
| 70 | { | 
|---|
| 71 | { col_database, std::make_shared<DataTypeString>(), "database"}, | 
|---|
| 72 | { col_table, std::make_shared<DataTypeString>(), "table"}, | 
|---|
| 73 | }; | 
|---|
| 74 |  | 
|---|
| 75 | VirtualColumnUtils::filterBlockWithQuery(query_info.query, filtered_block, context); | 
|---|
| 76 |  | 
|---|
| 77 | if (!filtered_block.rows()) | 
|---|
| 78 | return; | 
|---|
| 79 |  | 
|---|
| 80 | col_database = filtered_block.getByName( "database").column; | 
|---|
| 81 | col_table = filtered_block.getByName( "table").column; | 
|---|
| 82 | } | 
|---|
| 83 |  | 
|---|
| 84 | for (size_t i_storage = 0; i_storage < col_database->size(); ++i_storage) | 
|---|
| 85 | { | 
|---|
| 86 | auto database = (*col_database)[i_storage].safeGet<String>(); | 
|---|
| 87 | auto table = (*col_table)[i_storage].safeGet<String>(); | 
|---|
| 88 |  | 
|---|
| 89 | std::vector<MergeTreeMutationStatus> statuses; | 
|---|
| 90 | { | 
|---|
| 91 | const IStorage * storage = merge_tree_tables[database][table].get(); | 
|---|
| 92 | if (const auto * merge_tree = dynamic_cast<const MergeTreeData *>(storage)) | 
|---|
| 93 | statuses = merge_tree->getMutationsStatus(); | 
|---|
| 94 | } | 
|---|
| 95 |  | 
|---|
| 96 | for (const MergeTreeMutationStatus & status : statuses) | 
|---|
| 97 | { | 
|---|
| 98 | Array block_partition_ids; | 
|---|
| 99 | block_partition_ids.reserve(status.block_numbers.size()); | 
|---|
| 100 | Array block_numbers; | 
|---|
| 101 | block_numbers.reserve(status.block_numbers.size()); | 
|---|
| 102 | for (const auto & pair : status.block_numbers) | 
|---|
| 103 | { | 
|---|
| 104 | block_partition_ids.emplace_back(pair.first); | 
|---|
| 105 | block_numbers.emplace_back(pair.second); | 
|---|
| 106 | } | 
|---|
| 107 | Array parts_to_do_names; | 
|---|
| 108 | parts_to_do_names.reserve(status.parts_to_do_names.size()); | 
|---|
| 109 | for (const String & part_name : status.parts_to_do_names) | 
|---|
| 110 | parts_to_do_names.emplace_back(part_name); | 
|---|
| 111 |  | 
|---|
| 112 | size_t col_num = 0; | 
|---|
| 113 | res_columns[col_num++]->insert(database); | 
|---|
| 114 | res_columns[col_num++]->insert(table); | 
|---|
| 115 |  | 
|---|
| 116 | res_columns[col_num++]->insert(status.id); | 
|---|
| 117 | res_columns[col_num++]->insert(status.command); | 
|---|
| 118 | res_columns[col_num++]->insert(UInt64(status.create_time)); | 
|---|
| 119 | res_columns[col_num++]->insert(block_partition_ids); | 
|---|
| 120 | res_columns[col_num++]->insert(block_numbers); | 
|---|
| 121 | res_columns[col_num++]->insert(parts_to_do_names); | 
|---|
| 122 | res_columns[col_num++]->insert(parts_to_do_names.size()); | 
|---|
| 123 | res_columns[col_num++]->insert(status.is_done); | 
|---|
| 124 | res_columns[col_num++]->insert(status.latest_failed_part); | 
|---|
| 125 | res_columns[col_num++]->insert(UInt64(status.latest_fail_time)); | 
|---|
| 126 | res_columns[col_num++]->insert(status.latest_fail_reason); | 
|---|
| 127 | } | 
|---|
| 128 | } | 
|---|
| 129 | } | 
|---|
| 130 |  | 
|---|
| 131 | } | 
|---|
| 132 |  | 
|---|