1#include <Storages/MergeTree/ReplicatedMergeTreeAlterThread.h>
2#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
3#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
4#include <Storages/ColumnsDescription.h>
5#include <Storages/StorageReplicatedMergeTree.h>
6#include <Common/setThreadName.h>
7#include <Common/ZooKeeper/KeeperException.h>
8#include <Interpreters/InterpreterAlterQuery.h>
9#include <Databases/IDatabase.h>
10
11#include <memory>
12
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int NOT_FOUND_NODE;
20}
21
22static const auto ALTER_ERROR_SLEEP_MS = 10 * 1000;
23
24
25ReplicatedMergeTreeAlterThread::ReplicatedMergeTreeAlterThread(StorageReplicatedMergeTree & storage_)
26 : storage(storage_)
27 , zk_node_cache([&] { return storage.getZooKeeper(); })
28 , log_name(storage.database_name + "." + storage.table_name + " (ReplicatedMergeTreeAlterThread)")
29 , log(&Logger::get(log_name))
30{
31 task = storage_.global_context.getSchedulePool().createTask(log_name, [this]{ run(); });
32}
33
34void ReplicatedMergeTreeAlterThread::run()
35{
36 try
37 {
38 /** We have a description of columns in ZooKeeper, common for all replicas (Example: /clickhouse/tables/02-06/visits/columns),
39 * as well as a description of columns in local file with metadata (storage.getColumnsList()).
40 *
41 * If these descriptions are different - you need to do ALTER.
42 *
43 * If stored version of the node (columns_version) differs from the version in ZK,
44 * then the description of the columns in ZK does not necessarily differ from the local
45 * - this can happen with a loop from ALTER-s, which as a whole, does not change anything.
46 * In this case, you need to update the stored version number,
47 * and also check the structure of parts, and, if necessary, make ALTER.
48 *
49 * Recorded version number needs to be updated after updating the metadata, under lock.
50 * This version number is checked against the current one for INSERT.
51 * That is, we make sure to insert blocks with the correct structure.
52 *
53 * When the server starts, previous ALTER might not have been completed.
54 * Therefore, for the first time, regardless of the changes, we check the structure of all parts,
55 * (Example: /clickhouse/tables/02-06/visits/replicas/example02-06-1.yandex.ru/parts/20140806_20140831_131664_134988_3296/columns)
56 * and do ALTER if necessary.
57 *
58 * TODO: Too complicated, rewrite everything.
59 */
60
61 auto zookeeper = storage.getZooKeeper();
62
63 String columns_path = storage.zookeeper_path + "/columns";
64 auto columns_znode = zk_node_cache.get(columns_path, task->getWatchCallback());
65 if (!columns_znode.exists)
66 throw Exception(columns_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
67 int32_t columns_version = columns_znode.stat.version;
68
69 String metadata_path = storage.zookeeper_path + "/metadata";
70 auto metadata_znode = zk_node_cache.get(metadata_path, task->getWatchCallback());
71 if (!metadata_znode.exists)
72 throw Exception(metadata_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
73 int32_t metadata_version = metadata_znode.stat.version;
74
75 const bool changed_columns_version = (columns_version != storage.columns_version);
76 const bool changed_metadata_version = (metadata_version != storage.metadata_version);
77
78 if (!(changed_columns_version || changed_metadata_version || force_recheck_parts))
79 return;
80
81 const String & columns_str = columns_znode.contents;
82 auto columns_in_zk = ColumnsDescription::parse(columns_str);
83
84 const String & metadata_str = metadata_znode.contents;
85 auto metadata_in_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
86 auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
87
88 /// If you need to lock table structure, then suspend merges and moves.
89 ActionLock merge_blocker = storage.merger_mutator.merges_blocker.cancel();
90 ActionLock moves_blocker = storage.parts_mover.moves_blocker.cancel();
91
92 MergeTreeData::DataParts parts;
93
94 /// If metadata nodes have changed, we will update table structure locally.
95 if (changed_columns_version || changed_metadata_version)
96 {
97 /// Temporarily cancel part checks to avoid locking for long time.
98 auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
99
100 /// Temporarily cancel parts sending
101 ActionLock data_parts_exchange_blocker;
102 if (storage.data_parts_exchange_endpoint_holder)
103 data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->getBlocker().cancel();
104
105 /// Temporarily cancel part fetches
106 auto fetches_blocker = storage.fetcher.blocker.cancel();
107
108 LOG_INFO(log, "Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.");
109
110 auto table_lock = storage.lockExclusively(RWLockImpl::NO_QUERY);
111
112 if (columns_in_zk == storage.getColumns() && metadata_diff.empty())
113 {
114 LOG_INFO(log, "Metadata nodes changed in ZooKeeper, but their contents didn't change. "
115 "Most probably it is a cyclic ALTER.");
116 }
117 else
118 {
119 LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
120
121 storage.setTableStructure(std::move(columns_in_zk), metadata_diff);
122
123 LOG_INFO(log, "Applied changes to the metadata of the table.");
124 }
125
126 /// You need to get a list of parts under table lock to avoid race condition with merge.
127 parts = storage.getDataParts();
128
129 storage.columns_version = columns_version;
130 storage.metadata_version = metadata_version;
131 }
132
133 /// Update parts.
134 if (changed_columns_version || force_recheck_parts)
135 {
136 auto table_lock = storage.lockStructureForShare(false, RWLockImpl::NO_QUERY);
137
138 if (changed_columns_version)
139 LOG_INFO(log, "ALTER-ing parts");
140
141 int changed_parts = 0;
142
143 if (!changed_columns_version)
144 parts = storage.getDataParts();
145
146 const auto columns_for_parts = storage.getColumns().getAllPhysical();
147 const auto indices_for_parts = storage.getIndices();
148
149 for (const MergeTreeData::DataPartPtr & part : parts)
150 {
151 /// Update the part and write result to temporary files.
152 /// TODO: You can skip checking for too large changes if ZooKeeper has, for example,
153 /// node /flags/force_alter.
154 MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
155 storage.alterDataPart(columns_for_parts, indices_for_parts.indices, false, transaction);
156 if (!transaction->isValid())
157 continue;
158
159 storage.updatePartHeaderInZooKeeperAndCommit(zookeeper, *transaction);
160
161 ++changed_parts;
162 }
163
164 /// Columns sizes could be quietly changed in case of MODIFY/ADD COLUMN
165 storage.recalculateColumnSizes();
166
167 if (changed_columns_version)
168 {
169 if (changed_parts != 0)
170 LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
171 else
172 LOG_INFO(log, "No parts ALTER-ed");
173 }
174 }
175
176 /// Update metadata ZK nodes for a specific replica.
177 if (changed_columns_version || force_recheck_parts)
178 zookeeper->set(storage.replica_path + "/columns", columns_str);
179 if (changed_metadata_version || force_recheck_parts)
180 zookeeper->set(storage.replica_path + "/metadata", metadata_str);
181
182 force_recheck_parts = false;
183 }
184 catch (const Coordination::Exception & e)
185 {
186 tryLogCurrentException(log, __PRETTY_FUNCTION__);
187
188 if (e.code == Coordination::ZSESSIONEXPIRED)
189 return;
190
191 force_recheck_parts = true;
192 task->scheduleAfter(ALTER_ERROR_SLEEP_MS);
193 }
194 catch (...)
195 {
196 tryLogCurrentException(log, __PRETTY_FUNCTION__);
197
198 force_recheck_parts = true;
199 task->scheduleAfter(ALTER_ERROR_SLEEP_MS);
200 }
201}
202
203}
204