1#include <Storages/MergeTree/MergeTreeDataWriter.h>
2#include <Storages/MergeTree/MergedBlockOutputStream.h>
3#include <Common/HashTable/HashMap.h>
4#include <Common/Exception.h>
5#include <Interpreters/AggregationCommon.h>
6#include <IO/HashingWriteBuffer.h>
7#include <DataTypes/DataTypeDateTime.h>
8#include <DataTypes/DataTypeDate.h>
9#include <IO/WriteHelpers.h>
10#include <Poco/File.h>
11#include <Common/typeid_cast.h>
12
13
14namespace ProfileEvents
15{
16 extern const Event MergeTreeDataWriterBlocks;
17 extern const Event MergeTreeDataWriterBlocksAlreadySorted;
18 extern const Event MergeTreeDataWriterRows;
19 extern const Event MergeTreeDataWriterUncompressedBytes;
20 extern const Event MergeTreeDataWriterCompressedBytes;
21}
22
23namespace DB
24{
25
26namespace ErrorCodes
27{
28 extern const int LOGICAL_ERROR;
29 extern const int TOO_MANY_PARTS;
30}
31
32namespace
33{
34
35void buildScatterSelector(
36 const ColumnRawPtrs & columns,
37 PODArray<size_t> & partition_num_to_first_row,
38 IColumn::Selector & selector,
39 size_t max_parts)
40{
41 /// Use generic hashed variant since partitioning is unlikely to be a bottleneck.
42 using Data = HashMap<UInt128, size_t, UInt128TrivialHash>;
43 Data partitions_map;
44
45 size_t num_rows = columns[0]->size();
46 size_t partitions_count = 0;
47 for (size_t i = 0; i < num_rows; ++i)
48 {
49 Data::key_type key = hash128(i, columns.size(), columns);
50 typename Data::LookupResult it;
51 bool inserted;
52 partitions_map.emplace(key, it, inserted);
53
54 if (inserted)
55 {
56 if (max_parts && partitions_count >= max_parts)
57 throw Exception("Too many partitions for single INSERT block (more than " + toString(max_parts) + "). The limit is controlled by 'max_partitions_per_insert_block' setting. Large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).", ErrorCodes::TOO_MANY_PARTS);
58
59 partition_num_to_first_row.push_back(i);
60 it->getMapped() = partitions_count;
61
62 ++partitions_count;
63
64 /// Optimization for common case when there is only one partition - defer selector initialization.
65 if (partitions_count == 2)
66 {
67 selector = IColumn::Selector(num_rows);
68 std::fill(selector.begin(), selector.begin() + i, 0);
69 }
70 }
71
72 if (partitions_count > 1)
73 selector[i] = it->getMapped();
74 }
75}
76
77/// Computes ttls and updates ttl infos
78void updateTTL(const MergeTreeData::TTLEntry & ttl_entry,
79 MergeTreeDataPart::TTLInfos & ttl_infos,
80 DB::MergeTreeDataPartTTLInfo & ttl_info,
81 Block & block, bool update_part_min_max_ttls)
82{
83 bool remove_column = false;
84 if (!block.has(ttl_entry.result_column))
85 {
86 ttl_entry.expression->execute(block);
87 remove_column = true;
88 }
89
90 const auto & current = block.getByName(ttl_entry.result_column);
91
92 const IColumn * column = current.column.get();
93 if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(column))
94 {
95 const auto & date_lut = DateLUT::instance();
96 for (const auto & val : column_date->getData())
97 ttl_info.update(date_lut.fromDayNum(DayNum(val)));
98 }
99 else if (const ColumnUInt32 * column_date_time = typeid_cast<const ColumnUInt32 *>(column))
100 {
101 for (const auto & val : column_date_time->getData())
102 ttl_info.update(val);
103 }
104 else if (const ColumnConst * column_const = typeid_cast<const ColumnConst *>(column))
105 {
106 if (typeid_cast<const ColumnUInt16 *>(&column_const->getDataColumn()))
107 {
108 const auto & date_lut = DateLUT::instance();
109 ttl_info.update(date_lut.fromDayNum(DayNum(column_const->getValue<UInt16>())));
110 }
111 else if (typeid_cast<const ColumnUInt32 *>(&column_const->getDataColumn()))
112 {
113 ttl_info.update(column_const->getValue<UInt32>());
114 }
115 else
116 throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
117 }
118 else
119 throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
120
121 if (update_part_min_max_ttls)
122 ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
123
124 if (remove_column)
125 block.erase(ttl_entry.result_column);
126}
127
128}
129
130BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts)
131{
132 BlocksWithPartition result;
133 if (!block || !block.rows())
134 return result;
135
136 data.check(block, true);
137 block.checkNumberOfRows();
138
139 if (!data.partition_key_expr) /// Table is not partitioned.
140 {
141 result.emplace_back(Block(block), Row());
142 return result;
143 }
144
145 Block block_copy = block;
146 data.partition_key_expr->execute(block_copy);
147
148 ColumnRawPtrs partition_columns;
149 partition_columns.reserve(data.partition_key_sample.columns());
150 for (const ColumnWithTypeAndName & element : data.partition_key_sample)
151 partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
152
153 PODArray<size_t> partition_num_to_first_row;
154 IColumn::Selector selector;
155 buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
156
157 size_t partitions_count = partition_num_to_first_row.size();
158 result.reserve(partitions_count);
159
160 auto get_partition = [&](size_t num)
161 {
162 Row partition(partition_columns.size());
163 for (size_t i = 0; i < partition_columns.size(); ++i)
164 partition[i] = Field((*partition_columns[i])[partition_num_to_first_row[num]]);
165 return partition;
166 };
167
168 if (partitions_count == 1)
169 {
170 /// A typical case is when there is one partition (you do not need to split anything).
171 /// NOTE: returning a copy of the original block so that calculated partition key columns
172 /// do not interfere with possible calculated primary key columns of the same name.
173 result.emplace_back(Block(block), get_partition(0));
174 return result;
175 }
176
177 for (size_t i = 0; i < partitions_count; ++i)
178 result.emplace_back(block.cloneEmpty(), get_partition(i));
179
180 for (size_t col = 0; col < block.columns(); ++col)
181 {
182 MutableColumns scattered = block.getByPosition(col).column->scatter(partitions_count, selector);
183 for (size_t i = 0; i < partitions_count; ++i)
184 result[i].block.getByPosition(col).column = std::move(scattered[i]);
185 }
186
187 return result;
188}
189
190MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition)
191{
192 Block & block = block_with_partition.block;
193
194 static const String TMP_PREFIX = "tmp_insert_";
195
196 /// This will generate unique name in scope of current server process.
197 Int64 temp_index = data.insert_increment.get();
198
199 MergeTreeDataPart::MinMaxIndex minmax_idx;
200 minmax_idx.update(block, data.minmax_idx_columns);
201
202 MergeTreePartition partition(std::move(block_with_partition.partition));
203
204 MergeTreePartInfo new_part_info(partition.getID(data.partition_key_sample), temp_index, temp_index, 0);
205 String part_name;
206 if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
207 {
208 DayNum min_date(minmax_idx.parallelogram[data.minmax_idx_date_column_pos].left.get<UInt64>());
209 DayNum max_date(minmax_idx.parallelogram[data.minmax_idx_date_column_pos].right.get<UInt64>());
210
211 const auto & date_lut = DateLUT::instance();
212
213 DayNum min_month = date_lut.toFirstDayNumOfMonth(DayNum(min_date));
214 DayNum max_month = date_lut.toFirstDayNumOfMonth(DayNum(max_date));
215
216 if (min_month != max_month)
217 throw Exception("Logical error: part spans more than one month.", ErrorCodes::LOGICAL_ERROR);
218
219 part_name = new_part_info.getPartNameV0(min_date, max_date);
220 }
221 else
222 part_name = new_part_info.getPartName();
223
224 /// Size of part would not be greater than block.bytes() + epsilon
225 size_t expected_size = block.bytes();
226
227 DB::MergeTreeDataPart::TTLInfos move_ttl_infos;
228 for (const auto & ttl_entry : data.move_ttl_entries)
229 updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
230
231 ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr));
232
233 MergeTreeData::MutableDataPartPtr new_data_part =
234 std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk(), part_name, new_part_info);
235
236 new_data_part->partition = std::move(partition);
237 new_data_part->minmax_idx = std::move(minmax_idx);
238 new_data_part->relative_path = TMP_PREFIX + part_name;
239 new_data_part->is_temp = true;
240
241 /// The name could be non-unique in case of stale files from previous runs.
242 String full_path = new_data_part->getFullPath();
243 Poco::File dir(full_path);
244
245 if (dir.exists())
246 {
247 LOG_WARNING(log, "Removing old temporary directory " + full_path);
248 dir.remove(true);
249 }
250
251 dir.createDirectories();
252
253 /// If we need to calculate some columns to sort.
254 if (data.hasSortingKey() || data.hasSkipIndices())
255 data.sorting_key_and_skip_indices_expr->execute(block);
256
257 Names sort_columns = data.sorting_key_columns;
258 SortDescription sort_description;
259 size_t sort_columns_size = sort_columns.size();
260 sort_description.reserve(sort_columns_size);
261
262 for (size_t i = 0; i < sort_columns_size; ++i)
263 sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1);
264
265 ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
266
267 /// Sort
268 IColumn::Permutation * perm_ptr = nullptr;
269 IColumn::Permutation perm;
270 if (!sort_description.empty())
271 {
272 if (!isAlreadySorted(block, sort_description))
273 {
274 stableGetPermutation(block, sort_description, perm);
275 perm_ptr = &perm;
276 }
277 else
278 ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted);
279 }
280
281 if (data.hasTableTTL())
282 updateTTL(data.ttl_table_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
283
284 for (const auto & [name, ttl_entry] : data.column_ttl_entries_by_name)
285 updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
286
287 new_data_part->ttl_infos.update(move_ttl_infos);
288
289 /// This effectively chooses minimal compression method:
290 /// either default lz4 or compression method with zero thresholds on absolute and relative part size.
291 auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
292
293 NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames());
294 MergedBlockOutputStream out(data, new_data_part->getFullPath(), columns, compression_codec);
295
296 out.writePrefix();
297 out.writeWithPermutation(block, perm_ptr);
298 out.writeSuffixAndFinalizePart(new_data_part);
299
300 ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
301 ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes());
302 ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->bytes_on_disk);
303
304 return new_data_part;
305}
306
307}
308