1#pragma once
2
3#include <ext/shared_ptr_helper.h>
4
5#include <Core/Names.h>
6#include <Storages/AlterCommands.h>
7#include <Storages/IStorage.h>
8#include <Storages/MergeTree/MergeTreeData.h>
9#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
10#include <Storages/MergeTree/MergeTreeDataWriter.h>
11#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
12#include <Storages/MergeTree/MergeTreePartsMover.h>
13#include <Storages/MergeTree/MergeTreeMutationEntry.h>
14#include <Storages/MergeTree/MergeTreeMutationStatus.h>
15#include <Disks/DiskSpaceMonitor.h>
16#include <Storages/MergeTree/BackgroundProcessingPool.h>
17#include <Common/SimpleIncrement.h>
18#include <Core/BackgroundSchedulePool.h>
19
20
21namespace DB
22{
23
24/** See the description of the data structure in MergeTreeData.
25 */
26class StorageMergeTree : public ext::shared_ptr_helper<StorageMergeTree>, public MergeTreeData
27{
28 friend struct ext::shared_ptr_helper<StorageMergeTree>;
29public:
30 void startup() override;
31 void shutdown() override;
32 ~StorageMergeTree() override;
33
34 std::string getName() const override { return merging_params.getModeName() + "MergeTree"; }
35 std::string getTableName() const override { return table_name; }
36 std::string getDatabaseName() const override { return database_name; }
37
38 bool supportsIndexForIn() const override { return true; }
39
40 Pipes readWithProcessors(
41 const Names & column_names,
42 const SelectQueryInfo & query_info,
43 const Context & context,
44 QueryProcessingStage::Enum processed_stage,
45 size_t max_block_size,
46 unsigned num_streams) override;
47
48 bool supportProcessorsPipeline() const override { return true; }
49
50 std::optional<UInt64> totalRows() const override;
51
52 BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
53
54 /** Perform the next step in combining the parts.
55 */
56 bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
57
58 void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context) override;
59
60 void mutate(const MutationCommands & commands, const Context & context) override;
61
62 /// Return introspection information about currently processing or recently processed mutations.
63 std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
64
65 CancellationCode killMutation(const String & mutation_id) override;
66
67 void drop(TableStructureWriteLockHolder &) override;
68 void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
69
70 void alter(const AlterCommands & commands, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
71
72 void checkTableCanBeDropped() const override;
73
74 void checkPartitionCanBeDropped(const ASTPtr & partition) override;
75
76 ActionLock getActionLock(StorageActionBlockType action_type) override;
77
78 CheckResults checkData(const ASTPtr & query, const Context & context) override;
79
80private:
81
82 /// Mutex and condvar for synchronous mutations wait
83 std::mutex mutation_wait_mutex;
84 std::condition_variable mutation_wait_event;
85
86 MergeTreeDataSelectExecutor reader;
87 MergeTreeDataWriter writer;
88 MergeTreeDataMergerMutator merger_mutator;
89
90 /// For block numbers.
91 SimpleIncrement increment{0};
92
93 /// For clearOldParts, clearOldTemporaryDirectories.
94 AtomicStopwatch time_after_previous_cleanup;
95
96 /// Mutex for parts currently processing in background
97 /// merging (also with TTL), mutating or moving.
98 mutable std::mutex currently_processing_in_background_mutex;
99
100 /// Parts that currently participate in merge or mutation.
101 /// This set have to be used with `currently_processing_in_background_mutex`.
102 DataParts currently_merging_mutating_parts;
103
104
105 std::map<String, MergeTreeMutationEntry> current_mutations_by_id;
106 std::multimap<Int64, MergeTreeMutationEntry &> current_mutations_by_version;
107
108 std::atomic<bool> shutdown_called {false};
109
110 /// Task handler for merges, mutations and moves.
111 BackgroundProcessingPool::TaskHandle merging_mutating_task_handle;
112 BackgroundProcessingPool::TaskHandle moving_task_handle;
113
114 std::vector<MergeTreeData::AlterDataPartTransactionPtr> prepareAlterTransactions(
115 const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context);
116
117 void loadMutations();
118
119 /** Determines what parts should be merged and merges it.
120 * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
121 * Returns true if merge is finished successfully.
122 */
123 bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr);
124
125 BackgroundProcessingPoolTaskResult movePartsTask();
126
127 /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
128 bool tryMutatePart();
129
130 BackgroundProcessingPoolTaskResult mergeMutateTask();
131
132 Int64 getCurrentMutationVersion(
133 const DataPartPtr & part,
134 std::lock_guard<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;
135
136 void clearOldMutations(bool truncate = false);
137
138 // Partition helpers
139 void dropPartition(const ASTPtr & partition, bool detach, const Context & context);
140 void clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context);
141 void attachPartition(const ASTPtr & partition, bool part, const Context & context);
142 void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
143 bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
144
145 /// Just checks versions of each active data part
146 bool isMutationDone(Int64 mutation_version) const;
147
148 friend class MergeTreeBlockOutputStream;
149 friend class MergeTreeData;
150 friend struct CurrentlyMergingPartsTagger;
151
152protected:
153
154 /** Attach the table with the appropriate name, along the appropriate path (with / at the end),
155 * (correctness of names and paths are not checked)
156 * consisting of the specified columns.
157 *
158 * See MergeTreeData constructor for comments on parameters.
159 */
160 StorageMergeTree(
161 const String & database_name_,
162 const String & table_name_,
163 const String & relative_data_path_,
164 const StorageInMemoryMetadata & metadata,
165 bool attach,
166 Context & context_,
167 const String & date_column_name,
168 const MergingParams & merging_params_,
169 std::unique_ptr<MergeTreeSettings> settings_,
170 bool has_force_restore_data_flag);
171};
172
173}
174