1#include <Storages/MergeTree/DataPartsExchange.h>
2#include <Storages/IStorage.h>
3#include <Common/CurrentMetrics.h>
4#include <Common/NetException.h>
5#include <Common/typeid_cast.h>
6#include <IO/HTTPCommon.h>
7#include <Poco/File.h>
8#include <ext/scope_guard.h>
9#include <Poco/Net/HTTPServerResponse.h>
10#include <Poco/Net/HTTPRequest.h>
11
12
13namespace CurrentMetrics
14{
15 extern const Metric ReplicatedSend;
16 extern const Metric ReplicatedFetch;
17}
18
19namespace DB
20{
21
22namespace ErrorCodes
23{
24 extern const int ABORTED;
25 extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
26 extern const int CANNOT_WRITE_TO_OSTREAM;
27 extern const int CHECKSUM_DOESNT_MATCH;
28 extern const int UNKNOWN_TABLE;
29 extern const int UNKNOWN_PROTOCOL;
30 extern const int INSECURE_PATH;
31}
32
33namespace DataPartsExchange
34{
35
36namespace
37{
38
39static constexpr auto REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE = "0";
40static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = "1";
41
42std::string getEndpointId(const std::string & node_id)
43{
44 return "DataPartsExchange:" + node_id;
45}
46
47}
48
49std::string Service::getId(const std::string & node_id) const
50{
51 return getEndpointId(node_id);
52}
53
54void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
55{
56 if (blocker.isCancelled())
57 throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
58
59 String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
60
61
62 String part_name = params.get("part");
63
64 if (client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE && client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE)
65 throw Exception("Unsupported fetch protocol version", ErrorCodes::UNKNOWN_PROTOCOL);
66
67 const auto data_settings = data.getSettings();
68
69 /// Validation of the input that may come from malicious replica.
70 MergeTreePartInfo::fromPartName(part_name, data.format_version);
71
72 static std::atomic_uint total_sends {0};
73
74 if ((data_settings->replicated_max_parallel_sends && total_sends >= data_settings->replicated_max_parallel_sends)
75 || (data_settings->replicated_max_parallel_sends_for_table && data.current_table_sends >= data_settings->replicated_max_parallel_sends_for_table))
76 {
77 response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS));
78 response.setReason("Too many concurrent fetches, try again later");
79 response.set("Retry-After", "10");
80 response.setChunkedTransferEncoding(false);
81 return;
82 }
83 response.addCookie({"server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE});
84
85 ++total_sends;
86 SCOPE_EXIT({--total_sends;});
87
88 ++data.current_table_sends;
89 SCOPE_EXIT({--data.current_table_sends;});
90
91 StoragePtr owned_storage = storage.lock();
92 if (!owned_storage)
93 throw Exception("The table was already dropped", ErrorCodes::UNKNOWN_TABLE);
94
95 LOG_TRACE(log, "Sending part " << part_name);
96
97 try
98 {
99 auto storage_lock = owned_storage->lockStructureForShare(false, RWLockImpl::NO_QUERY);
100
101 MergeTreeData::DataPartPtr part = findPart(part_name);
102
103 std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
104
105 CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
106
107 /// We'll take a list of files from the list of checksums.
108 MergeTreeData::DataPart::Checksums checksums = part->checksums;
109 /// Add files that are not in the checksum list.
110 checksums.files["checksums.txt"];
111 checksums.files["columns.txt"];
112
113 MergeTreeData::DataPart::Checksums data_checksums;
114
115
116 if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
117 writeBinary(checksums.getTotalSizeOnDisk(), out);
118
119 writeBinary(checksums.files.size(), out);
120 for (const auto & it : checksums.files)
121 {
122 String file_name = it.first;
123
124 String path = part->getFullPath() + file_name;
125
126 UInt64 size = Poco::File(path).getSize();
127
128 writeStringBinary(it.first, out);
129 writeBinary(size, out);
130
131 ReadBufferFromFile file_in(path);
132 HashingWriteBuffer hashing_out(out);
133 copyData(file_in, hashing_out, blocker.getCounter());
134
135 if (blocker.isCancelled())
136 throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
137
138 if (hashing_out.count() != size)
139 throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
140
141 writePODBinary(hashing_out.getHash(), out);
142
143 if (file_name != "checksums.txt" &&
144 file_name != "columns.txt")
145 data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
146 }
147
148 part->checksums.checkEqual(data_checksums, false);
149 }
150 catch (const NetException &)
151 {
152 /// Network error or error on remote side. No need to enqueue part for check.
153 throw;
154 }
155 catch (const Exception & e)
156 {
157 if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM)
158 data.reportBrokenPart(part_name);
159 throw;
160 }
161 catch (...)
162 {
163 data.reportBrokenPart(part_name);
164 throw;
165 }
166}
167
168MergeTreeData::DataPartPtr Service::findPart(const String & name)
169{
170 /// It is important to include PreCommitted and Outdated parts here because remote replicas cannot reliably
171 /// determine the local state of the part, so queries for the parts in these states are completely normal.
172 auto part = data.getPartIfExists(
173 name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
174 if (part)
175 return part;
176
177 throw Exception("No part " + name + " in table", ErrorCodes::NO_SUCH_DATA_PART);
178}
179
180MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
181 const String & part_name,
182 const String & replica_path,
183 const String & host,
184 int port,
185 const ConnectionTimeouts & timeouts,
186 const String & user,
187 const String & password,
188 const String & interserver_scheme,
189 bool to_detached,
190 const String & tmp_prefix_)
191{
192 /// Validation of the input that may come from malicious replica.
193 MergeTreePartInfo::fromPartName(part_name, data.format_version);
194 const auto data_settings = data.getSettings();
195
196 Poco::URI uri;
197 uri.setScheme(interserver_scheme);
198 uri.setHost(host);
199 uri.setPort(port);
200 uri.setQueryParameters(
201 {
202 {"endpoint", getEndpointId(replica_path)},
203 {"part", part_name},
204 {"client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE},
205 {"compress", "false"}
206 });
207
208 Poco::Net::HTTPBasicCredentials creds{};
209 if (!user.empty())
210 {
211 creds.setUsername(user);
212 creds.setPassword(password);
213 }
214
215 PooledReadWriteBufferFromHTTP in{
216 uri,
217 Poco::Net::HTTPRequest::HTTP_POST,
218 {},
219 timeouts,
220 creds,
221 DBMS_DEFAULT_BUFFER_SIZE,
222 0, /* no redirects */
223 data_settings->replicated_max_parallel_fetches_for_host
224 };
225
226 auto server_protocol_version = in.getResponseCookie("server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
227
228
229 ReservationPtr reservation;
230 if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
231 {
232 size_t sum_files_size;
233 readBinary(sum_files_size, in);
234 reservation = data.reserveSpace(sum_files_size);
235 }
236 else
237 {
238 /// We don't know real size of part because sender server version is too old
239 reservation = data.makeEmptyReservationOnLargestDisk();
240 }
241
242 return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);
243}
244
245MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
246 const String & part_name,
247 const String & replica_path,
248 bool to_detached,
249 const String & tmp_prefix_,
250 const ReservationPtr reservation,
251 PooledReadWriteBufferFromHTTP & in)
252{
253
254 size_t files;
255 readBinary(files, in);
256
257 static const String TMP_PREFIX = "tmp_fetch_";
258 String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
259
260 String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
261 String absolute_part_path = Poco::Path(data.getFullPathOnDisk(reservation->getDisk()) + relative_part_path + "/").absolute().toString();
262 Poco::File part_file(absolute_part_path);
263
264 if (part_file.exists())
265 throw Exception("Directory " + absolute_part_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
266
267 CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
268
269 part_file.createDirectory();
270
271 MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk(), part_name);
272 new_data_part->relative_path = relative_part_path;
273 new_data_part->is_temp = true;
274
275
276 MergeTreeData::DataPart::Checksums checksums;
277 for (size_t i = 0; i < files; ++i)
278 {
279 String file_name;
280 UInt64 file_size;
281
282 readStringBinary(file_name, in);
283 readBinary(file_size, in);
284
285 /// File must be inside "absolute_part_path" directory.
286 /// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
287 String absolute_file_path = Poco::Path(absolute_part_path + file_name).absolute().toString();
288 if (!startsWith(absolute_file_path, absolute_part_path))
289 throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + absolute_part_path + ")."
290 " This may happen if we are trying to download part from malicious replica or logical error.",
291 ErrorCodes::INSECURE_PATH);
292
293 WriteBufferFromFile file_out(absolute_file_path);
294 HashingWriteBuffer hashing_out(file_out);
295 copyData(in, hashing_out, file_size, blocker.getCounter());
296
297 if (blocker.isCancelled())
298 {
299 /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout.
300 /// And now we check it only between read chunks (in the `copyData` function).
301 part_file.remove(true);
302 throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
303 }
304
305 MergeTreeDataPartChecksum::uint128 expected_hash;
306 readPODBinary(expected_hash, in);
307
308 if (expected_hash != hashing_out.getHash())
309 throw Exception("Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path,
310 ErrorCodes::CHECKSUM_DOESNT_MATCH);
311
312 if (file_name != "checksums.txt" &&
313 file_name != "columns.txt")
314 checksums.addFile(file_name, file_size, expected_hash);
315 }
316
317 assertEOF(in);
318
319 new_data_part->modification_time = time(nullptr);
320 new_data_part->loadColumnsChecksumsIndexes(true, false);
321 new_data_part->checksums.checkEqual(checksums, false);
322
323 return new_data_part;
324}
325
326}
327
328}
329