1 | #include <Storages/MergeTree/DataPartsExchange.h> |
2 | #include <Storages/IStorage.h> |
3 | #include <Common/CurrentMetrics.h> |
4 | #include <Common/NetException.h> |
5 | #include <Common/typeid_cast.h> |
6 | #include <IO/HTTPCommon.h> |
7 | #include <Poco/File.h> |
8 | #include <ext/scope_guard.h> |
9 | #include <Poco/Net/HTTPServerResponse.h> |
10 | #include <Poco/Net/HTTPRequest.h> |
11 | |
12 | |
13 | namespace CurrentMetrics |
14 | { |
15 | extern const Metric ReplicatedSend; |
16 | extern const Metric ReplicatedFetch; |
17 | } |
18 | |
19 | namespace DB |
20 | { |
21 | |
22 | namespace ErrorCodes |
23 | { |
24 | extern const int ABORTED; |
25 | extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; |
26 | extern const int CANNOT_WRITE_TO_OSTREAM; |
27 | extern const int CHECKSUM_DOESNT_MATCH; |
28 | extern const int UNKNOWN_TABLE; |
29 | extern const int UNKNOWN_PROTOCOL; |
30 | extern const int INSECURE_PATH; |
31 | } |
32 | |
33 | namespace DataPartsExchange |
34 | { |
35 | |
36 | namespace |
37 | { |
38 | |
39 | static constexpr auto REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE = "0" ; |
40 | static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = "1" ; |
41 | |
42 | std::string getEndpointId(const std::string & node_id) |
43 | { |
44 | return "DataPartsExchange:" + node_id; |
45 | } |
46 | |
47 | } |
48 | |
49 | std::string Service::getId(const std::string & node_id) const |
50 | { |
51 | return getEndpointId(node_id); |
52 | } |
53 | |
54 | void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) |
55 | { |
56 | if (blocker.isCancelled()) |
57 | throw Exception("Transferring part to replica was cancelled" , ErrorCodes::ABORTED); |
58 | |
59 | String client_protocol_version = params.get("client_protocol_version" , REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE); |
60 | |
61 | |
62 | String part_name = params.get("part" ); |
63 | |
64 | if (client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE && client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE) |
65 | throw Exception("Unsupported fetch protocol version" , ErrorCodes::UNKNOWN_PROTOCOL); |
66 | |
67 | const auto data_settings = data.getSettings(); |
68 | |
69 | /// Validation of the input that may come from malicious replica. |
70 | MergeTreePartInfo::fromPartName(part_name, data.format_version); |
71 | |
72 | static std::atomic_uint total_sends {0}; |
73 | |
74 | if ((data_settings->replicated_max_parallel_sends && total_sends >= data_settings->replicated_max_parallel_sends) |
75 | || (data_settings->replicated_max_parallel_sends_for_table && data.current_table_sends >= data_settings->replicated_max_parallel_sends_for_table)) |
76 | { |
77 | response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS)); |
78 | response.setReason("Too many concurrent fetches, try again later" ); |
79 | response.set("Retry-After" , "10" ); |
80 | response.setChunkedTransferEncoding(false); |
81 | return; |
82 | } |
83 | response.addCookie({"server_protocol_version" , REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE}); |
84 | |
85 | ++total_sends; |
86 | SCOPE_EXIT({--total_sends;}); |
87 | |
88 | ++data.current_table_sends; |
89 | SCOPE_EXIT({--data.current_table_sends;}); |
90 | |
91 | StoragePtr owned_storage = storage.lock(); |
92 | if (!owned_storage) |
93 | throw Exception("The table was already dropped" , ErrorCodes::UNKNOWN_TABLE); |
94 | |
95 | LOG_TRACE(log, "Sending part " << part_name); |
96 | |
97 | try |
98 | { |
99 | auto storage_lock = owned_storage->lockStructureForShare(false, RWLockImpl::NO_QUERY); |
100 | |
101 | MergeTreeData::DataPartPtr part = findPart(part_name); |
102 | |
103 | std::shared_lock<std::shared_mutex> part_lock(part->columns_lock); |
104 | |
105 | CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend}; |
106 | |
107 | /// We'll take a list of files from the list of checksums. |
108 | MergeTreeData::DataPart::Checksums checksums = part->checksums; |
109 | /// Add files that are not in the checksum list. |
110 | checksums.files["checksums.txt" ]; |
111 | checksums.files["columns.txt" ]; |
112 | |
113 | MergeTreeData::DataPart::Checksums data_checksums; |
114 | |
115 | |
116 | if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE) |
117 | writeBinary(checksums.getTotalSizeOnDisk(), out); |
118 | |
119 | writeBinary(checksums.files.size(), out); |
120 | for (const auto & it : checksums.files) |
121 | { |
122 | String file_name = it.first; |
123 | |
124 | String path = part->getFullPath() + file_name; |
125 | |
126 | UInt64 size = Poco::File(path).getSize(); |
127 | |
128 | writeStringBinary(it.first, out); |
129 | writeBinary(size, out); |
130 | |
131 | ReadBufferFromFile file_in(path); |
132 | HashingWriteBuffer hashing_out(out); |
133 | copyData(file_in, hashing_out, blocker.getCounter()); |
134 | |
135 | if (blocker.isCancelled()) |
136 | throw Exception("Transferring part to replica was cancelled" , ErrorCodes::ABORTED); |
137 | |
138 | if (hashing_out.count() != size) |
139 | throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART); |
140 | |
141 | writePODBinary(hashing_out.getHash(), out); |
142 | |
143 | if (file_name != "checksums.txt" && |
144 | file_name != "columns.txt" ) |
145 | data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash()); |
146 | } |
147 | |
148 | part->checksums.checkEqual(data_checksums, false); |
149 | } |
150 | catch (const NetException &) |
151 | { |
152 | /// Network error or error on remote side. No need to enqueue part for check. |
153 | throw; |
154 | } |
155 | catch (const Exception & e) |
156 | { |
157 | if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM) |
158 | data.reportBrokenPart(part_name); |
159 | throw; |
160 | } |
161 | catch (...) |
162 | { |
163 | data.reportBrokenPart(part_name); |
164 | throw; |
165 | } |
166 | } |
167 | |
168 | MergeTreeData::DataPartPtr Service::findPart(const String & name) |
169 | { |
170 | /// It is important to include PreCommitted and Outdated parts here because remote replicas cannot reliably |
171 | /// determine the local state of the part, so queries for the parts in these states are completely normal. |
172 | auto part = data.getPartIfExists( |
173 | name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); |
174 | if (part) |
175 | return part; |
176 | |
177 | throw Exception("No part " + name + " in table" , ErrorCodes::NO_SUCH_DATA_PART); |
178 | } |
179 | |
180 | MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( |
181 | const String & part_name, |
182 | const String & replica_path, |
183 | const String & host, |
184 | int port, |
185 | const ConnectionTimeouts & timeouts, |
186 | const String & user, |
187 | const String & password, |
188 | const String & interserver_scheme, |
189 | bool to_detached, |
190 | const String & tmp_prefix_) |
191 | { |
192 | /// Validation of the input that may come from malicious replica. |
193 | MergeTreePartInfo::fromPartName(part_name, data.format_version); |
194 | const auto data_settings = data.getSettings(); |
195 | |
196 | Poco::URI uri; |
197 | uri.setScheme(interserver_scheme); |
198 | uri.setHost(host); |
199 | uri.setPort(port); |
200 | uri.setQueryParameters( |
201 | { |
202 | {"endpoint" , getEndpointId(replica_path)}, |
203 | {"part" , part_name}, |
204 | {"client_protocol_version" , REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE}, |
205 | {"compress" , "false" } |
206 | }); |
207 | |
208 | Poco::Net::HTTPBasicCredentials creds{}; |
209 | if (!user.empty()) |
210 | { |
211 | creds.setUsername(user); |
212 | creds.setPassword(password); |
213 | } |
214 | |
215 | PooledReadWriteBufferFromHTTP in{ |
216 | uri, |
217 | Poco::Net::HTTPRequest::HTTP_POST, |
218 | {}, |
219 | timeouts, |
220 | creds, |
221 | DBMS_DEFAULT_BUFFER_SIZE, |
222 | 0, /* no redirects */ |
223 | data_settings->replicated_max_parallel_fetches_for_host |
224 | }; |
225 | |
226 | auto server_protocol_version = in.getResponseCookie("server_protocol_version" , REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE); |
227 | |
228 | |
229 | ReservationPtr reservation; |
230 | if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE) |
231 | { |
232 | size_t sum_files_size; |
233 | readBinary(sum_files_size, in); |
234 | reservation = data.reserveSpace(sum_files_size); |
235 | } |
236 | else |
237 | { |
238 | /// We don't know real size of part because sender server version is too old |
239 | reservation = data.makeEmptyReservationOnLargestDisk(); |
240 | } |
241 | |
242 | return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in); |
243 | } |
244 | |
245 | MergeTreeData::MutableDataPartPtr Fetcher::downloadPart( |
246 | const String & part_name, |
247 | const String & replica_path, |
248 | bool to_detached, |
249 | const String & tmp_prefix_, |
250 | const ReservationPtr reservation, |
251 | PooledReadWriteBufferFromHTTP & in) |
252 | { |
253 | |
254 | size_t files; |
255 | readBinary(files, in); |
256 | |
257 | static const String TMP_PREFIX = "tmp_fetch_" ; |
258 | String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; |
259 | |
260 | String relative_part_path = String(to_detached ? "detached/" : "" ) + tmp_prefix + part_name; |
261 | String absolute_part_path = Poco::Path(data.getFullPathOnDisk(reservation->getDisk()) + relative_part_path + "/" ).absolute().toString(); |
262 | Poco::File part_file(absolute_part_path); |
263 | |
264 | if (part_file.exists()) |
265 | throw Exception("Directory " + absolute_part_path + " already exists." , ErrorCodes::DIRECTORY_ALREADY_EXISTS); |
266 | |
267 | CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch}; |
268 | |
269 | part_file.createDirectory(); |
270 | |
271 | MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk(), part_name); |
272 | new_data_part->relative_path = relative_part_path; |
273 | new_data_part->is_temp = true; |
274 | |
275 | |
276 | MergeTreeData::DataPart::Checksums checksums; |
277 | for (size_t i = 0; i < files; ++i) |
278 | { |
279 | String file_name; |
280 | UInt64 file_size; |
281 | |
282 | readStringBinary(file_name, in); |
283 | readBinary(file_size, in); |
284 | |
285 | /// File must be inside "absolute_part_path" directory. |
286 | /// Otherwise malicious ClickHouse replica may force us to write to arbitrary path. |
287 | String absolute_file_path = Poco::Path(absolute_part_path + file_name).absolute().toString(); |
288 | if (!startsWith(absolute_file_path, absolute_part_path)) |
289 | throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + absolute_part_path + ")." |
290 | " This may happen if we are trying to download part from malicious replica or logical error." , |
291 | ErrorCodes::INSECURE_PATH); |
292 | |
293 | WriteBufferFromFile file_out(absolute_file_path); |
294 | HashingWriteBuffer hashing_out(file_out); |
295 | copyData(in, hashing_out, file_size, blocker.getCounter()); |
296 | |
297 | if (blocker.isCancelled()) |
298 | { |
299 | /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout. |
300 | /// And now we check it only between read chunks (in the `copyData` function). |
301 | part_file.remove(true); |
302 | throw Exception("Fetching of part was cancelled" , ErrorCodes::ABORTED); |
303 | } |
304 | |
305 | MergeTreeDataPartChecksum::uint128 expected_hash; |
306 | readPODBinary(expected_hash, in); |
307 | |
308 | if (expected_hash != hashing_out.getHash()) |
309 | throw Exception("Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path, |
310 | ErrorCodes::CHECKSUM_DOESNT_MATCH); |
311 | |
312 | if (file_name != "checksums.txt" && |
313 | file_name != "columns.txt" ) |
314 | checksums.addFile(file_name, file_size, expected_hash); |
315 | } |
316 | |
317 | assertEOF(in); |
318 | |
319 | new_data_part->modification_time = time(nullptr); |
320 | new_data_part->loadColumnsChecksumsIndexes(true, false); |
321 | new_data_part->checksums.checkEqual(checksums, false); |
322 | |
323 | return new_data_part; |
324 | } |
325 | |
326 | } |
327 | |
328 | } |
329 | |