1#pragma once
2
3#include <Storages/MergeTree/MergeTreeData.h>
4#include <Storages/MutationCommands.h>
5#include <atomic>
6#include <functional>
7#include <Common/ActionBlocker.h>
8
9
10namespace DB
11{
12
13class MergeListEntry;
14class MergeProgressCallback;
15
16/// Auxiliary struct holding metainformation for the future merged or mutated part.
17struct FutureMergedMutatedPart
18{
19 String name;
20 String path;
21 MergeTreePartInfo part_info;
22 MergeTreeData::DataPartsVector parts;
23
24 const MergeTreePartition & getPartition() const { return parts.front()->partition; }
25
26 FutureMergedMutatedPart() = default;
27 explicit FutureMergedMutatedPart(MergeTreeData::DataPartsVector parts_)
28 {
29 assign(std::move(parts_));
30 }
31
32 void assign(MergeTreeData::DataPartsVector parts_);
33 void updatePath(const MergeTreeData & storage, const ReservationPtr & reservation);
34};
35
36
37/** Can select parts for background processes and do them.
38 * Currently helps with merges, mutations and moves
39 */
40class MergeTreeDataMergerMutator
41{
42public:
43 using AllowedMergingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, const MergeTreeData::DataPartPtr &, String * reason)>;
44
45public:
46 MergeTreeDataMergerMutator(MergeTreeData & data_, size_t background_pool_size);
47
48 /** Get maximum total size of parts to do merge, at current moment of time.
49 * It depends on number of free threads in background_pool and amount of free space in disk.
50 */
51 UInt64 getMaxSourcePartsSizeForMerge();
52
53 /** For explicitly passed size of pool and number of used tasks.
54 * This method could be used to calculate threshold depending on number of tasks in replication queue.
55 */
56 UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used);
57
58 /** Get maximum total size of parts to do mutation, at current moment of time.
59 * It depends only on amount of free space in disk.
60 */
61 UInt64 getMaxSourcePartSizeForMutation();
62
63 /** Selects which parts to merge. Uses a lot of heuristics.
64 *
65 * can_merge - a function that determines if it is possible to merge a pair of adjacent parts.
66 * This function must coordinate merge with inserts and other merges, ensuring that
67 * - Parts between which another part can still appear can not be merged. Refer to METR-7001.
68 * - A part that already merges with something in one place, you can not start to merge into something else in another place.
69 */
70 bool selectPartsToMerge(
71 FutureMergedMutatedPart & future_part,
72 bool aggressive,
73 size_t max_total_size_to_merge,
74 const AllowedMergingPredicate & can_merge,
75 String * out_disable_reason = nullptr);
76
77
78 /** Select all the parts in the specified partition for merge, if possible.
79 * final - choose to merge even a single part - that is, allow to merge one part "with itself".
80 */
81 bool selectAllPartsToMergeWithinPartition(
82 FutureMergedMutatedPart & future_part,
83 UInt64 & available_disk_space,
84 const AllowedMergingPredicate & can_merge,
85 const String & partition_id,
86 bool final,
87 String * out_disable_reason = nullptr);
88
89 /** Merge the parts.
90 * If `reservation != nullptr`, now and then reduces the size of the reserved space
91 * is approximately proportional to the amount of data already written.
92 *
93 * Creates and returns a temporary part.
94 * To end the merge, call the function renameMergedTemporaryPart.
95 *
96 * time_of_merge - the time when the merge was assigned.
97 * Important when using ReplicatedGraphiteMergeTree to provide the same merge on replicas.
98 */
99 MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart(
100 const FutureMergedMutatedPart & future_part,
101 MergeListEntry & merge_entry, TableStructureReadLockHolder & table_lock_holder, time_t time_of_merge,
102 const ReservationPtr & disk_reservation, bool deduplication, bool force_ttl);
103
104 /// Mutate a single data part with the specified commands. Will create and return a temporary part.
105 MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
106 const FutureMergedMutatedPart & future_part,
107 const std::vector<MutationCommand> & commands,
108 MergeListEntry & merge_entry, const Context & context,
109 const ReservationPtr & disk_reservation,
110 TableStructureReadLockHolder & table_lock_holder);
111
112 MergeTreeData::DataPartPtr renameMergedTemporaryPart(
113 MergeTreeData::MutableDataPartPtr & new_data_part,
114 const MergeTreeData::DataPartsVector & parts,
115 MergeTreeData::Transaction * out_transaction = nullptr);
116
117
118 /// The approximate amount of disk space needed for merge or mutation. With a surplus.
119 static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
120
121private:
122 /** Select all parts belonging to the same partition.
123 */
124 MergeTreeData::DataPartsVector selectAllPartsFromPartition(const String & partition_id);
125
126public:
127 /** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon.
128 * All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed.
129 */
130 ActionBlocker merges_blocker;
131 ActionBlocker ttl_merges_blocker;
132
133 enum class MergeAlgorithm
134 {
135 Horizontal, /// per-row merge of all columns
136 Vertical /// per-row merge of PK and secondary indices columns, per-column gather for non-PK columns
137 };
138
139private:
140
141 MergeAlgorithm chooseMergeAlgorithm(
142 const MergeTreeData::DataPartsVector & parts,
143 size_t rows_upper_bound, const NamesAndTypesList & gathering_columns, bool deduplicate, bool need_remove_expired_values) const;
144
145private:
146 MergeTreeData & data;
147 const size_t background_pool_size;
148
149 Logger * log;
150
151 /// When the last time you wrote to the log that the disk space was running out (not to write about this too often).
152 time_t disk_space_warning_time = 0;
153
154 /// Last time when TTLMergeSelector has been used
155 time_t last_merge_with_ttl = 0;
156};
157
158
159}
160