1#include <Columns/ColumnsNumber.h>
2#include <Columns/ColumnArray.h>
3#include <Columns/ColumnString.h>
4#include <DataTypes/DataTypeArray.h>
5#include <DataTypes/DataTypesNumber.h>
6#include <DataTypes/DataTypeDateTime.h>
7#include <DataTypes/DataTypeDate.h>
8#include <DataTypes/DataTypeString.h>
9#include <DataTypes/DataTypeEnum.h>
10#include <Storages/MergeTree/MergeTreeDataPart.h>
11#include <Storages/MergeTree/MergeTreeData.h>
12#include <Interpreters/PartLog.h>
13
14
15namespace DB
16{
17
18Block PartLogElement::createBlock()
19{
20 auto event_type_datatype = std::make_shared<DataTypeEnum8>(
21 DataTypeEnum8::Values
22 {
23 {"NewPart", static_cast<Int8>(NEW_PART)},
24 {"MergeParts", static_cast<Int8>(MERGE_PARTS)},
25 {"DownloadPart", static_cast<Int8>(DOWNLOAD_PART)},
26 {"RemovePart", static_cast<Int8>(REMOVE_PART)},
27 {"MutatePart", static_cast<Int8>(MUTATE_PART)},
28 {"MovePart", static_cast<Int8>(MOVE_PART)},
29 }
30 );
31
32 return
33 {
34 {ColumnInt8::create(), std::move(event_type_datatype), "event_type"},
35 {ColumnUInt16::create(), std::make_shared<DataTypeDate>(), "event_date"},
36 {ColumnUInt32::create(), std::make_shared<DataTypeDateTime>(), "event_time"},
37 {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "duration_ms"},
38
39 {ColumnString::create(), std::make_shared<DataTypeString>(), "database"},
40 {ColumnString::create(), std::make_shared<DataTypeString>(), "table"},
41 {ColumnString::create(), std::make_shared<DataTypeString>(), "part_name"},
42 {ColumnString::create(), std::make_shared<DataTypeString>(), "partition_id"},
43 {ColumnString::create(), std::make_shared<DataTypeString>(), "path_on_disk"},
44
45 {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "rows"},
46 {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "size_in_bytes"}, // On disk
47
48 /// Merge-specific info
49 {ColumnArray::create(ColumnString::create()), std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "merged_from"},
50 {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "bytes_uncompressed"}, // Result bytes
51 {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_rows"},
52 {ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "read_bytes"},
53
54 /// Is there an error during the execution or commit
55 {ColumnUInt16::create(), std::make_shared<DataTypeUInt16>(), "error"},
56 {ColumnString::create(), std::make_shared<DataTypeString>(), "exception"},
57 };
58}
59
60void PartLogElement::appendToBlock(Block & block) const
61{
62 MutableColumns columns = block.mutateColumns();
63
64 size_t i = 0;
65
66 columns[i++]->insert(event_type);
67 columns[i++]->insert(DateLUT::instance().toDayNum(event_time));
68 columns[i++]->insert(event_time);
69 columns[i++]->insert(duration_ms);
70
71 columns[i++]->insert(database_name);
72 columns[i++]->insert(table_name);
73 columns[i++]->insert(part_name);
74 columns[i++]->insert(partition_id);
75 columns[i++]->insert(path_on_disk);
76
77 columns[i++]->insert(rows);
78 columns[i++]->insert(bytes_compressed_on_disk);
79
80 Array source_part_names_array;
81 source_part_names_array.reserve(source_part_names.size());
82 for (const auto & name : source_part_names)
83 source_part_names_array.push_back(name);
84
85 columns[i++]->insert(source_part_names_array);
86
87 columns[i++]->insert(bytes_uncompressed);
88 columns[i++]->insert(rows_read);
89 columns[i++]->insert(bytes_read_uncompressed);
90
91 columns[i++]->insert(error);
92 columns[i++]->insert(exception);
93
94 block.setColumns(std::move(columns));
95}
96
97
98bool PartLog::addNewPart(Context & current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status)
99{
100 return addNewParts(current_context, {part}, elapsed_ns, execution_status);
101}
102
103bool PartLog::addNewParts(Context & current_context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns,
104 const ExecutionStatus & execution_status)
105{
106 if (parts.empty())
107 return true;
108
109 std::shared_ptr<PartLog> part_log;
110
111 try
112 {
113 part_log = current_context.getPartLog(parts.front()->storage.getDatabaseName()); // assume parts belong to the same table
114 if (!part_log)
115 return false;
116
117 for (const auto & part : parts)
118 {
119 PartLogElement elem;
120
121 elem.event_type = PartLogElement::NEW_PART;
122 elem.event_time = time(nullptr);
123 elem.duration_ms = elapsed_ns / 1000000;
124
125 elem.database_name = part->storage.getDatabaseName();
126 elem.table_name = part->storage.getTableName();
127 elem.partition_id = part->info.partition_id;
128 elem.part_name = part->name;
129 elem.path_on_disk = part->getFullPath();
130
131 elem.bytes_compressed_on_disk = part->bytes_on_disk;
132 elem.rows = part->rows_count;
133
134 elem.error = static_cast<UInt16>(execution_status.code);
135 elem.exception = execution_status.message;
136
137 part_log->add(elem);
138 }
139 }
140 catch (...)
141 {
142 tryLogCurrentException(part_log ? part_log->log : &Logger::get("PartLog"), __PRETTY_FUNCTION__);
143 return false;
144 }
145
146 return true;
147}
148
149}
150