| 1 | #include <Columns/ColumnsNumber.h> | 
|---|
| 2 | #include <Columns/ColumnArray.h> | 
|---|
| 3 | #include <Columns/ColumnString.h> | 
|---|
| 4 | #include <DataTypes/DataTypeArray.h> | 
|---|
| 5 | #include <DataTypes/DataTypesNumber.h> | 
|---|
| 6 | #include <DataTypes/DataTypeDateTime.h> | 
|---|
| 7 | #include <DataTypes/DataTypeDate.h> | 
|---|
| 8 | #include <DataTypes/DataTypeString.h> | 
|---|
| 9 | #include <DataTypes/DataTypeEnum.h> | 
|---|
| 10 | #include <Storages/MergeTree/MergeTreeDataPart.h> | 
|---|
| 11 | #include <Storages/MergeTree/MergeTreeData.h> | 
|---|
| 12 | #include <Interpreters/PartLog.h> | 
|---|
| 13 |  | 
|---|
| 14 |  | 
|---|
| 15 | namespace DB | 
|---|
| 16 | { | 
|---|
| 17 |  | 
|---|
| 18 | Block PartLogElement::createBlock() | 
|---|
| 19 | { | 
|---|
| 20 | auto event_type_datatype = std::make_shared<DataTypeEnum8>( | 
|---|
| 21 | DataTypeEnum8::Values | 
|---|
| 22 | { | 
|---|
| 23 | { "NewPart",       static_cast<Int8>(NEW_PART)}, | 
|---|
| 24 | { "MergeParts",    static_cast<Int8>(MERGE_PARTS)}, | 
|---|
| 25 | { "DownloadPart",  static_cast<Int8>(DOWNLOAD_PART)}, | 
|---|
| 26 | { "RemovePart",    static_cast<Int8>(REMOVE_PART)}, | 
|---|
| 27 | { "MutatePart",    static_cast<Int8>(MUTATE_PART)}, | 
|---|
| 28 | { "MovePart",      static_cast<Int8>(MOVE_PART)}, | 
|---|
| 29 | } | 
|---|
| 30 | ); | 
|---|
| 31 |  | 
|---|
| 32 | return | 
|---|
| 33 | { | 
|---|
| 34 | {ColumnInt8::create(),   std::move(event_type_datatype), "event_type"}, | 
|---|
| 35 | {ColumnUInt16::create(), std::make_shared<DataTypeDate>(), "event_date"}, | 
|---|
| 36 | {ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(), "event_time"}, | 
|---|
| 37 | {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "duration_ms"}, | 
|---|
| 38 |  | 
|---|
| 39 | {ColumnString::create(), std::make_shared<DataTypeString>(), "database"}, | 
|---|
| 40 | {ColumnString::create(), std::make_shared<DataTypeString>(), "table"}, | 
|---|
| 41 | {ColumnString::create(), std::make_shared<DataTypeString>(), "part_name"}, | 
|---|
| 42 | {ColumnString::create(), std::make_shared<DataTypeString>(), "partition_id"}, | 
|---|
| 43 | {ColumnString::create(), std::make_shared<DataTypeString>(), "path_on_disk"}, | 
|---|
| 44 |  | 
|---|
| 45 | {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "rows"}, | 
|---|
| 46 | {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "size_in_bytes"}, // On disk | 
|---|
| 47 |  | 
|---|
| 48 | /// Merge-specific info | 
|---|
| 49 | {ColumnArray::create(ColumnString::create()), std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "merged_from"}, | 
|---|
| 50 | {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "bytes_uncompressed"}, // Result bytes | 
|---|
| 51 | {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_rows"}, | 
|---|
| 52 | {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_bytes"}, | 
|---|
| 53 |  | 
|---|
| 54 | /// Is there an error during the execution or commit | 
|---|
| 55 | {ColumnUInt16::create(), std::make_shared<DataTypeUInt16>(), "error"}, | 
|---|
| 56 | {ColumnString::create(), std::make_shared<DataTypeString>(), "exception"}, | 
|---|
| 57 | }; | 
|---|
| 58 | } | 
|---|
| 59 |  | 
|---|
| 60 | void PartLogElement::appendToBlock(Block & block) const | 
|---|
| 61 | { | 
|---|
| 62 | MutableColumns columns = block.mutateColumns(); | 
|---|
| 63 |  | 
|---|
| 64 | size_t i = 0; | 
|---|
| 65 |  | 
|---|
| 66 | columns[i++]->insert(event_type); | 
|---|
| 67 | columns[i++]->insert(DateLUT::instance().toDayNum(event_time)); | 
|---|
| 68 | columns[i++]->insert(event_time); | 
|---|
| 69 | columns[i++]->insert(duration_ms); | 
|---|
| 70 |  | 
|---|
| 71 | columns[i++]->insert(database_name); | 
|---|
| 72 | columns[i++]->insert(table_name); | 
|---|
| 73 | columns[i++]->insert(part_name); | 
|---|
| 74 | columns[i++]->insert(partition_id); | 
|---|
| 75 | columns[i++]->insert(path_on_disk); | 
|---|
| 76 |  | 
|---|
| 77 | columns[i++]->insert(rows); | 
|---|
| 78 | columns[i++]->insert(bytes_compressed_on_disk); | 
|---|
| 79 |  | 
|---|
| 80 | Array source_part_names_array; | 
|---|
| 81 | source_part_names_array.reserve(source_part_names.size()); | 
|---|
| 82 | for (const auto & name : source_part_names) | 
|---|
| 83 | source_part_names_array.push_back(name); | 
|---|
| 84 |  | 
|---|
| 85 | columns[i++]->insert(source_part_names_array); | 
|---|
| 86 |  | 
|---|
| 87 | columns[i++]->insert(bytes_uncompressed); | 
|---|
| 88 | columns[i++]->insert(rows_read); | 
|---|
| 89 | columns[i++]->insert(bytes_read_uncompressed); | 
|---|
| 90 |  | 
|---|
| 91 | columns[i++]->insert(error); | 
|---|
| 92 | columns[i++]->insert(exception); | 
|---|
| 93 |  | 
|---|
| 94 | block.setColumns(std::move(columns)); | 
|---|
| 95 | } | 
|---|
| 96 |  | 
|---|
| 97 |  | 
|---|
| 98 | bool PartLog::addNewPart(Context & current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status) | 
|---|
| 99 | { | 
|---|
| 100 | return addNewParts(current_context, {part}, elapsed_ns, execution_status); | 
|---|
| 101 | } | 
|---|
| 102 |  | 
|---|
| 103 | bool PartLog::addNewParts(Context & current_context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns, | 
|---|
| 104 | const ExecutionStatus & execution_status) | 
|---|
| 105 | { | 
|---|
| 106 | if (parts.empty()) | 
|---|
| 107 | return true; | 
|---|
| 108 |  | 
|---|
| 109 | std::shared_ptr<PartLog> part_log; | 
|---|
| 110 |  | 
|---|
| 111 | try | 
|---|
| 112 | { | 
|---|
| 113 | part_log = current_context.getPartLog(parts.front()->storage.getDatabaseName()); // assume parts belong to the same table | 
|---|
| 114 | if (!part_log) | 
|---|
| 115 | return false; | 
|---|
| 116 |  | 
|---|
| 117 | for (const auto & part : parts) | 
|---|
| 118 | { | 
|---|
| 119 | PartLogElement elem; | 
|---|
| 120 |  | 
|---|
| 121 | elem.event_type = PartLogElement::NEW_PART; | 
|---|
| 122 | elem.event_time = time(nullptr); | 
|---|
| 123 | elem.duration_ms = elapsed_ns / 1000000; | 
|---|
| 124 |  | 
|---|
| 125 | elem.database_name = part->storage.getDatabaseName(); | 
|---|
| 126 | elem.table_name = part->storage.getTableName(); | 
|---|
| 127 | elem.partition_id = part->info.partition_id; | 
|---|
| 128 | elem.part_name = part->name; | 
|---|
| 129 | elem.path_on_disk = part->getFullPath(); | 
|---|
| 130 |  | 
|---|
| 131 | elem.bytes_compressed_on_disk = part->bytes_on_disk; | 
|---|
| 132 | elem.rows = part->rows_count; | 
|---|
| 133 |  | 
|---|
| 134 | elem.error = static_cast<UInt16>(execution_status.code); | 
|---|
| 135 | elem.exception = execution_status.message; | 
|---|
| 136 |  | 
|---|
| 137 | part_log->add(elem); | 
|---|
| 138 | } | 
|---|
| 139 | } | 
|---|
| 140 | catch (...) | 
|---|
| 141 | { | 
|---|
| 142 | tryLogCurrentException(part_log ? part_log->log : &Logger::get( "PartLog"), __PRETTY_FUNCTION__); | 
|---|
| 143 | return false; | 
|---|
| 144 | } | 
|---|
| 145 |  | 
|---|
| 146 | return true; | 
|---|
| 147 | } | 
|---|
| 148 |  | 
|---|
| 149 | } | 
|---|
| 150 |  | 
|---|