1#include <DataStreams/RemoteBlockOutputStream.h>
2#include <DataStreams/NativeBlockInputStream.h>
3#include <Common/escapeForFileName.h>
4#include <Common/setThreadName.h>
5#include <Common/CurrentMetrics.h>
6#include <Common/StringUtils/StringUtils.h>
7#include <Common/ClickHouseRevision.h>
8#include <Common/SipHash.h>
9#include <Common/quoteString.h>
10#include <Common/hex.h>
11#include <common/StringRef.h>
12#include <Interpreters/Context.h>
13#include <Storages/Distributed/DirectoryMonitor.h>
14#include <IO/ReadBufferFromFile.h>
15#include <IO/ReadBufferFromString.h>
16#include <IO/WriteBufferFromFile.h>
17#include <Compression/CompressedReadBuffer.h>
18#include <IO/ConnectionTimeouts.h>
19#include <IO/Operators.h>
20
21#include <boost/algorithm/string/find_iterator.hpp>
22#include <boost/algorithm/string/finder.hpp>
23
24#include <Poco/DirectoryIterator.h>
25
26
27namespace CurrentMetrics
28{
29 extern const Metric DistributedSend;
30 extern const Metric DistributedFilesToInsert;
31}
32
33namespace DB
34{
35
36namespace ErrorCodes
37{
38 extern const int ABORTED;
39 extern const int UNKNOWN_CODEC;
40 extern const int CANNOT_DECOMPRESS;
41 extern const int INCORRECT_FILE_NAME;
42 extern const int CHECKSUM_DOESNT_MATCH;
43 extern const int TOO_LARGE_SIZE_COMPRESSED;
44 extern const int ATTEMPT_TO_READ_AFTER_EOF;
45 extern const int CORRUPTED_DATA;
46}
47
48
49namespace
50{
51 static constexpr const std::chrono::minutes decrease_error_count_period{5};
52
53 template <typename PoolFactory>
54 ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory)
55 {
56 ConnectionPoolPtrs pools;
57
58 for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
59 {
60 Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range<std::string>(*it));
61 pools.emplace_back(factory(address));
62 }
63
64 return pools;
65 }
66
67 void assertChecksum(CityHash_v1_0_2::uint128 expected, CityHash_v1_0_2::uint128 calculated)
68 {
69 if (expected != calculated)
70 {
71 String message = "Checksum of extra info doesn't match: corrupted data."
72 " Reference: " + getHexUIntLowercase(expected.first) + getHexUIntLowercase(expected.second)
73 + ". Actual: " + getHexUIntLowercase(calculated.first) + getHexUIntLowercase(calculated.second)
74 + ".";
75 throw Exception(message, ErrorCodes::CHECKSUM_DOESNT_MATCH);
76 }
77 }
78
79}
80
81
82StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
83 StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_)
84 /// It's important to initialize members before `thread` to avoid race.
85 : storage(storage_)
86 , pool(std::move(pool_))
87 , name(std::move(name_))
88 , path{storage.path + name + '/'}
89 , should_batch_inserts(storage.global_context.getSettingsRef().distributed_directory_monitor_batch_inserts)
90 , min_batched_block_size_rows(storage.global_context.getSettingsRef().min_insert_block_size_rows)
91 , min_batched_block_size_bytes(storage.global_context.getSettingsRef().min_insert_block_size_bytes)
92 , current_batch_file_path{path + "current_batch.txt"}
93 , default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
94 , sleep_time{default_sleep_time}
95 , max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
96 , log{&Logger::get(getLoggerName())}
97 , monitor_blocker(monitor_blocker_)
98{
99}
100
101
102StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
103{
104 if (!quit)
105 {
106 {
107 quit = true;
108 std::lock_guard lock{mutex};
109 }
110 cond.notify_one();
111 thread.join();
112 }
113}
114
115void StorageDistributedDirectoryMonitor::flushAllData()
116{
117 if (!quit)
118 {
119 std::unique_lock lock{mutex};
120 processFiles();
121 }
122}
123
124void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
125{
126 if (!quit)
127 {
128 {
129 quit = true;
130 std::lock_guard lock{mutex};
131 }
132 cond.notify_one();
133 thread.join();
134 }
135
136 Poco::File(path).remove(true);
137}
138
139
140void StorageDistributedDirectoryMonitor::run()
141{
142 setThreadName("DistrDirMonitor");
143
144 std::unique_lock lock{mutex};
145
146 const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); };
147
148 while (!quit_requested())
149 {
150 auto do_sleep = true;
151
152 if (!monitor_blocker.isCancelled())
153 {
154 try
155 {
156 do_sleep = !processFiles();
157 }
158 catch (...)
159 {
160 do_sleep = true;
161 ++error_count;
162 sleep_time = std::min(
163 std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
164 max_sleep_time);
165 tryLogCurrentException(getLoggerName().data());
166 }
167 }
168 else
169 {
170 LOG_DEBUG(log, "Skipping send data over distributed table.");
171 }
172
173 if (do_sleep)
174 cond.wait_for(lock, sleep_time, quit_requested);
175
176 const auto now = std::chrono::system_clock::now();
177 if (now - last_decrease_time > decrease_error_count_period)
178 {
179 error_count /= 2;
180 last_decrease_time = now;
181 }
182 }
183}
184
185
186ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
187{
188 const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
189 {
190 const auto & cluster = storage.getCluster();
191 const auto & shards_info = cluster->getShardsInfo();
192 const auto & shards_addresses = cluster->getShardsAddresses();
193
194 /// existing connections pool have a higher priority
195 for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
196 {
197 const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
198
199 for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
200 {
201 const Cluster::Address & replica_address = replicas_addresses[replica_index];
202
203 if (address == replica_address)
204 return shards_info[shard_index].per_replica_pools[replica_index];
205 }
206 }
207
208 return std::make_shared<ConnectionPool>(
209 1, address.host_name, address.port, address.default_database, address.user, address.password,
210 storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
211 };
212
213 auto pools = createPoolsForAddresses(name, pool_factory);
214
215 const auto settings = storage.global_context.getSettings();
216 return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM,
217 settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap);
218}
219
220
221bool StorageDistributedDirectoryMonitor::processFiles()
222{
223 std::map<UInt64, std::string> files;
224
225 Poco::DirectoryIterator end;
226 for (Poco::DirectoryIterator it{path}; it != end; ++it)
227 {
228 const auto & file_path_str = it->path();
229 Poco::Path file_path{file_path_str};
230
231 if (!it->isDirectory() && startsWith(file_path.getExtension().data(), "bin"))
232 files[parse<UInt64>(file_path.getBaseName())] = file_path_str;
233 }
234
235 if (files.empty())
236 return false;
237
238 CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedFilesToInsert, CurrentMetrics::Value(files.size())};
239
240 if (should_batch_inserts)
241 {
242 processFilesWithBatching(files);
243 }
244 else
245 {
246 for (const auto & file : files)
247 {
248 if (quit)
249 return true;
250
251 processFile(file.second);
252 }
253 }
254
255 return true;
256}
257
258void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
259{
260 LOG_TRACE(log, "Started processing `" << file_path << '`');
261 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
262 auto connection = pool->get(timeouts);
263
264 try
265 {
266 CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
267
268 ReadBufferFromFile in{file_path};
269
270 Settings insert_settings;
271 std::string insert_query;
272 readHeader(in, insert_settings, insert_query);
273
274 RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
275
276 remote.writePrefix();
277 remote.writePrepared(in);
278 remote.writeSuffix();
279 }
280 catch (const Exception & e)
281 {
282 maybeMarkAsBroken(file_path, e);
283 throw;
284 }
285
286 Poco::File{file_path}.remove();
287
288 LOG_TRACE(log, "Finished processing `" << file_path << '`');
289}
290
291void StorageDistributedDirectoryMonitor::readHeader(
292 ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const
293{
294 UInt64 query_size;
295 readVarUInt(query_size, in);
296
297 if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER)
298 {
299 /// Read the header as a string.
300 String header;
301 readStringBinary(header, in);
302
303 /// Check the checksum of the header.
304 CityHash_v1_0_2::uint128 checksum;
305 readPODBinary(checksum, in);
306 assertChecksum(checksum, CityHash_v1_0_2::CityHash128(header.data(), header.size()));
307
308 /// Read the parts of the header.
309 ReadBufferFromString header_buf(header);
310
311 UInt64 initiator_revision;
312 readVarUInt(initiator_revision, header_buf);
313 if (ClickHouseRevision::get() < initiator_revision)
314 {
315 LOG_WARNING(
316 log,
317 "ClickHouse shard version is older than ClickHouse initiator version. "
318 << "It may lack support for new features.");
319 }
320
321 readStringBinary(insert_query, header_buf);
322 insert_settings.deserialize(header_buf);
323
324 /// Add handling new data here, for example:
325 /// if (initiator_revision >= DBMS_MIN_REVISION_WITH_MY_NEW_DATA)
326 /// readVarUInt(my_new_data, header_buf);
327
328 return;
329 }
330
331 if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT)
332 {
333 insert_settings.deserialize(in, SettingsBinaryFormat::OLD);
334 readStringBinary(insert_query, in);
335 return;
336 }
337
338 insert_query.resize(query_size);
339 in.readStrict(insert_query.data(), query_size);
340}
341
342struct StorageDistributedDirectoryMonitor::BatchHeader
343{
344 Settings settings;
345 String query;
346 Block sample_block;
347
348 BatchHeader(Settings settings_, String query_, Block sample_block_)
349 : settings(std::move(settings_))
350 , query(std::move(query_))
351 , sample_block(std::move(sample_block_))
352 {
353 }
354
355 bool operator==(const BatchHeader & other) const
356 {
357 return settings == other.settings && query == other.query &&
358 blocksHaveEqualStructure(sample_block, other.sample_block);
359 }
360
361 struct Hash
362 {
363 size_t operator()(const BatchHeader & batch_header) const
364 {
365 SipHash hash_state;
366 hash_state.update(batch_header.query.data(), batch_header.query.size());
367
368 size_t num_columns = batch_header.sample_block.columns();
369 for (size_t i = 0; i < num_columns; ++i)
370 {
371 const String & type_name = batch_header.sample_block.getByPosition(i).type->getName();
372 hash_state.update(type_name.data(), type_name.size());
373 }
374
375 return hash_state.get64();
376 }
377 };
378};
379
380struct StorageDistributedDirectoryMonitor::Batch
381{
382 std::vector<UInt64> file_indices;
383 size_t total_rows = 0;
384 size_t total_bytes = 0;
385 bool recovered = false;
386
387 StorageDistributedDirectoryMonitor & parent;
388 const std::map<UInt64, String> & file_index_to_path;
389
390 Batch(
391 StorageDistributedDirectoryMonitor & parent_,
392 const std::map<UInt64, String> & file_index_to_path_)
393 : parent(parent_), file_index_to_path(file_index_to_path_)
394 {}
395
396 bool isEnoughSize() const
397 {
398 return (!parent.min_batched_block_size_rows && !parent.min_batched_block_size_bytes)
399 || (parent.min_batched_block_size_rows && total_rows >= parent.min_batched_block_size_rows)
400 || (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes);
401 }
402
403 void send()
404 {
405 if (file_indices.empty())
406 return;
407
408 CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
409
410 if (!recovered)
411 {
412 /// For deduplication in Replicated tables to work, in case of error
413 /// we must try to re-send exactly the same batches.
414 /// So we save contents of the current batch into the current_batch_file_path file
415 /// and truncate it afterwards if all went well.
416
417 /// Temporary file is required for atomicity.
418 String tmp_file{parent.current_batch_file_path + ".tmp"};
419
420 if (Poco::File{tmp_file}.exists())
421 LOG_ERROR(parent.log, "Temporary file " << backQuote(tmp_file) << " exists. Unclean shutdown?");
422
423 {
424 WriteBufferFromFile out{tmp_file, O_WRONLY | O_TRUNC | O_CREAT};
425 writeText(out);
426 }
427
428 Poco::File{tmp_file}.renameTo(parent.current_batch_file_path);
429 }
430 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
431 auto connection = parent.pool->get(timeouts);
432
433 bool batch_broken = false;
434 try
435 {
436 Settings insert_settings;
437 String insert_query;
438 std::unique_ptr<RemoteBlockOutputStream> remote;
439 bool first = true;
440
441 for (UInt64 file_idx : file_indices)
442 {
443 auto file_path = file_index_to_path.find(file_idx);
444 if (file_path == file_index_to_path.end())
445 {
446 LOG_ERROR(parent.log, "Failed to send batch: file with index " << file_idx << " is absent");
447 batch_broken = true;
448 break;
449 }
450
451 ReadBufferFromFile in(file_path->second);
452 parent.readHeader(in, insert_settings, insert_query);
453
454 if (first)
455 {
456 first = false;
457 remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings);
458 remote->writePrefix();
459 }
460
461 remote->writePrepared(in);
462 }
463
464 if (remote)
465 remote->writeSuffix();
466 }
467 catch (const Exception & e)
468 {
469 if (isFileBrokenErrorCode(e.code()))
470 {
471 tryLogCurrentException(parent.log, "Failed to send batch due to");
472 batch_broken = true;
473 }
474 else
475 throw;
476 }
477
478 if (!batch_broken)
479 {
480 LOG_TRACE(parent.log, "Sent a batch of " << file_indices.size() << " files.");
481
482 for (UInt64 file_index : file_indices)
483 Poco::File{file_index_to_path.at(file_index)}.remove();
484 }
485 else
486 {
487 LOG_ERROR(parent.log, "Marking a batch of " << file_indices.size() << " files as broken.");
488
489 for (UInt64 file_idx : file_indices)
490 {
491 auto file_path = file_index_to_path.find(file_idx);
492 if (file_path != file_index_to_path.end())
493 parent.markAsBroken(file_path->second);
494 }
495 }
496
497 file_indices.clear();
498 total_rows = 0;
499 total_bytes = 0;
500 recovered = false;
501
502 Poco::File{parent.current_batch_file_path}.setSize(0);
503 }
504
505 void writeText(WriteBuffer & out)
506 {
507 for (UInt64 file_idx : file_indices)
508 out << file_idx << '\n';
509 }
510
511 void readText(ReadBuffer & in)
512 {
513 while (!in.eof())
514 {
515 UInt64 idx;
516 in >> idx >> "\n";
517 file_indices.push_back(idx);
518 }
519 recovered = true;
520 }
521};
522
523
524void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
525{
526 std::unordered_set<UInt64> file_indices_to_skip;
527
528 if (Poco::File{current_batch_file_path}.exists())
529 {
530 /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
531 Batch batch(*this, files);
532 ReadBufferFromFile in{current_batch_file_path};
533 batch.readText(in);
534 file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
535 batch.send();
536 }
537
538 std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
539
540 for (const auto & file : files)
541 {
542 if (quit)
543 return;
544
545 UInt64 file_idx = file.first;
546 const String & file_path = file.second;
547
548 if (file_indices_to_skip.count(file_idx))
549 continue;
550
551 size_t total_rows = 0;
552 size_t total_bytes = 0;
553 Block sample_block;
554 Settings insert_settings;
555 String insert_query;
556 try
557 {
558 /// Determine metadata of the current file and check if it is not broken.
559 ReadBufferFromFile in{file_path};
560 readHeader(in, insert_settings, insert_query);
561
562 CompressedReadBuffer decompressing_in(in);
563 NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());
564 block_in.readPrefix();
565
566 while (Block block = block_in.read())
567 {
568 total_rows += block.rows();
569 total_bytes += block.bytes();
570
571 if (!sample_block)
572 sample_block = block.cloneEmpty();
573 }
574 block_in.readSuffix();
575 }
576 catch (const Exception & e)
577 {
578 if (maybeMarkAsBroken(file_path, e))
579 {
580 tryLogCurrentException(log, "File is marked broken due to");
581 continue;
582 }
583 else
584 throw;
585 }
586
587 BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(sample_block));
588 Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
589
590 batch.file_indices.push_back(file_idx);
591 batch.total_rows += total_rows;
592 batch.total_bytes += total_bytes;
593
594 if (batch.isEnoughSize())
595 batch.send();
596 }
597
598 for (auto & kv : header_to_batch)
599 {
600 Batch & batch = kv.second;
601 batch.send();
602 }
603
604 Poco::File{current_batch_file_path}.remove();
605}
606
607bool StorageDistributedDirectoryMonitor::isFileBrokenErrorCode(int code)
608{
609 return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
610 || code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
611 || code == ErrorCodes::CANNOT_READ_ALL_DATA
612 || code == ErrorCodes::UNKNOWN_CODEC
613 || code == ErrorCodes::CANNOT_DECOMPRESS
614 || code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF;
615}
616
617void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path) const
618{
619 const auto last_path_separator_pos = file_path.rfind('/');
620 const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
621 const auto & file_name = file_path.substr(last_path_separator_pos + 1);
622 const auto & broken_path = base_path + "broken/";
623 const auto & broken_file_path = broken_path + file_name;
624
625 Poco::File{broken_path}.createDirectory();
626 Poco::File{file_path}.renameTo(broken_file_path);
627
628 LOG_ERROR(log, "Renamed `" << file_path << "` to `" << broken_file_path << '`');
629}
630
631bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) const
632{
633 /// mark file as broken if necessary
634 if (isFileBrokenErrorCode(e.code()))
635 {
636 markAsBroken(file_path);
637 return true;
638 }
639 else
640 return false;
641}
642
643std::string StorageDistributedDirectoryMonitor::getLoggerName() const
644{
645 return storage.table_name + '.' + storage.getName() + ".DirectoryMonitor";
646}
647
648void StorageDistributedDirectoryMonitor::updatePath()
649{
650 std::lock_guard lock{mutex};
651 path = storage.path + name + '/';
652 current_batch_file_path = path + "current_batch.txt";
653}
654
655}
656