1#include <Storages/Distributed/DistributedBlockOutputStream.h>
2#include <Storages/Distributed/DirectoryMonitor.h>
3#include <Storages/StorageDistributed.h>
4
5#include <Parsers/formatAST.h>
6#include <Parsers/queryToString.h>
7
8#include <IO/WriteBufferFromFile.h>
9#include <Compression/CompressedWriteBuffer.h>
10#include <IO/Operators.h>
11#include <IO/WriteBufferFromString.h>
12#include <DataStreams/NativeBlockOutputStream.h>
13#include <DataStreams/RemoteBlockOutputStream.h>
14#include <Interpreters/InterpreterInsertQuery.h>
15#include <Interpreters/createBlockSelector.h>
16
17#include <DataTypes/DataTypesNumber.h>
18#include <DataTypes/DataTypeLowCardinality.h>
19#include <Common/setThreadName.h>
20#include <Common/ClickHouseRevision.h>
21#include <Common/CurrentMetrics.h>
22#include <Common/typeid_cast.h>
23#include <Common/Exception.h>
24#include <Common/ProfileEvents.h>
25#include <Common/escapeForFileName.h>
26#include <Common/CurrentThread.h>
27#include <common/logger_useful.h>
28#include <ext/range.h>
29#include <ext/scope_guard.h>
30
31#include <Poco/DirectoryIterator.h>
32
33#include <future>
34#include <condition_variable>
35#include <mutex>
36
37
38
39namespace CurrentMetrics
40{
41 extern const Metric DistributedSend;
42}
43
44namespace ProfileEvents
45{
46 extern const Event DistributedSyncInsertionTimeoutExceeded;
47}
48
49namespace DB
50{
51
52
53namespace ErrorCodes
54{
55 extern const int TIMEOUT_EXCEEDED;
56 extern const int TYPE_MISMATCH;
57 extern const int CANNOT_LINK;
58}
59
60
61DistributedBlockOutputStream::DistributedBlockOutputStream(
62 const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_,
63 bool insert_sync_, UInt64 insert_timeout_)
64 : context(context_), storage(storage_), query_ast(query_ast_), query_string(queryToString(query_ast_)),
65 cluster(cluster_), insert_sync(insert_sync_),
66 insert_timeout(insert_timeout_), log(&Logger::get("DistributedBlockOutputStream"))
67{
68}
69
70
71Block DistributedBlockOutputStream::getHeader() const
72{
73 return storage.getSampleBlock();
74}
75
76
77void DistributedBlockOutputStream::writePrefix()
78{
79}
80
81
82void DistributedBlockOutputStream::write(const Block & block)
83{
84 Block ordinary_block{ block };
85
86 /* They are added by the AddingDefaultBlockOutputStream, and we will get
87 * different number of columns eventually */
88 for (const auto & col : storage.getColumns().getMaterialized())
89 {
90 if (ordinary_block.has(col.name))
91 {
92 ordinary_block.erase(col.name);
93 LOG_DEBUG(log, storage.getTableName()
94 << ": column " + col.name + " will be removed, "
95 << "because it is MATERIALIZED");
96 }
97 }
98
99
100 if (insert_sync)
101 writeSync(ordinary_block);
102 else
103 writeAsync(ordinary_block);
104}
105
106void DistributedBlockOutputStream::writeAsync(const Block & block)
107{
108 if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
109 return writeSplitAsync(block);
110
111 writeAsyncImpl(block);
112 ++inserted_blocks;
113}
114
115
116std::string DistributedBlockOutputStream::getCurrentStateDescription()
117{
118 std::stringstream buffer;
119 const auto & addresses = cluster->getShardsAddresses();
120
121 buffer << "Insertion status:\n";
122 for (auto & shard_jobs : per_shard_jobs)
123 for (JobReplica & job : shard_jobs.replicas_jobs)
124 {
125 buffer << "Wrote " << job.blocks_written << " blocks and " << job.rows_written << " rows"
126 << " on shard " << job.shard_index << " replica " << job.replica_index
127 << ", " << addresses[job.shard_index][job.replica_index].readableString();
128
129 /// Performance statistics
130 if (job.blocks_started > 0)
131 {
132 buffer << " (average " << job.elapsed_time_ms / job.blocks_started << " ms per block";
133 if (job.blocks_started > 1)
134 buffer << ", the slowest block " << job.max_elapsed_time_for_block_ms << " ms";
135 buffer << ")";
136 }
137
138 buffer << "\n";
139 }
140
141 return buffer.str();
142}
143
144
145void DistributedBlockOutputStream::initWritingJobs(const Block & first_block)
146{
147 const auto & addresses_with_failovers = cluster->getShardsAddresses();
148 const auto & shards_info = cluster->getShardsInfo();
149 size_t num_shards = shards_info.size();
150
151 remote_jobs_count = 0;
152 local_jobs_count = 0;
153 per_shard_jobs.resize(shards_info.size());
154
155 for (size_t shard_index : ext::range(0, shards_info.size()))
156 {
157 const auto & shard_info = shards_info[shard_index];
158 auto & shard_jobs = per_shard_jobs[shard_index];
159
160 /// If hasInternalReplication, than prefer local replica
161 if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
162 {
163 const auto & replicas = addresses_with_failovers[shard_index];
164
165 for (size_t replica_index : ext::range(0, replicas.size()))
166 {
167 if (!replicas[replica_index].is_local)
168 {
169 shard_jobs.replicas_jobs.emplace_back(shard_index, replica_index, false, first_block);
170 ++remote_jobs_count;
171
172 if (shard_info.hasInternalReplication())
173 break;
174 }
175 }
176 }
177
178 if (shard_info.isLocal())
179 {
180 shard_jobs.replicas_jobs.emplace_back(shard_index, 0, true, first_block);
181 ++local_jobs_count;
182 }
183
184 if (num_shards > 1)
185 shard_jobs.shard_current_block_permuation.reserve(first_block.rows());
186 }
187}
188
189
190void DistributedBlockOutputStream::waitForJobs()
191{
192 pool->wait();
193
194 if (insert_timeout)
195 {
196 if (static_cast<UInt64>(watch.elapsedSeconds()) > insert_timeout)
197 {
198 ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
199 throw Exception("Synchronous distributed insert timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
200 }
201 }
202
203 size_t jobs_count = remote_jobs_count + local_jobs_count;
204 size_t num_finished_jobs = finished_jobs_count;
205
206 if (num_finished_jobs < jobs_count)
207 LOG_WARNING(log, "Expected " << jobs_count << " writing jobs, but finished only " << num_finished_jobs);
208}
209
210
211ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block)
212{
213 auto thread_group = CurrentThread::getGroup();
214 return [this, thread_group, &job, &current_block]()
215 {
216 if (thread_group)
217 CurrentThread::attachToIfDetached(thread_group);
218 setThreadName("DistrOutStrProc");
219
220 ++job.blocks_started;
221
222 SCOPE_EXIT({
223 ++finished_jobs_count;
224
225 UInt64 elapsed_time_for_block_ms = watch_current_block.elapsedMilliseconds();
226 job.elapsed_time_ms += elapsed_time_for_block_ms;
227 job.max_elapsed_time_for_block_ms = std::max(job.max_elapsed_time_for_block_ms, elapsed_time_for_block_ms);
228 });
229
230 const auto & shard_info = cluster->getShardsInfo()[job.shard_index];
231 size_t num_shards = cluster->getShardsInfo().size();
232 auto & shard_job = per_shard_jobs[job.shard_index];
233 const auto & addresses = cluster->getShardsAddresses();
234
235 /// Generate current shard block
236 if (num_shards > 1)
237 {
238 auto & shard_permutation = shard_job.shard_current_block_permuation;
239 size_t num_shard_rows = shard_permutation.size();
240
241 for (size_t j = 0; j < current_block.columns(); ++j)
242 {
243 auto & src_column = current_block.getByPosition(j).column;
244 auto & dst_column = job.current_shard_block.getByPosition(j).column;
245
246 /// Zero permutation size has special meaning in IColumn::permute
247 if (num_shard_rows)
248 dst_column = src_column->permute(shard_permutation, num_shard_rows);
249 else
250 dst_column = src_column->cloneEmpty();
251 }
252 }
253
254 const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
255
256 if (!job.is_local_job)
257 {
258 if (!job.stream)
259 {
260 const Settings & settings = context.getSettingsRef();
261 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
262 if (shard_info.hasInternalReplication())
263 {
264 /// Skip replica_index in case of internal replication
265 if (shard_job.replicas_jobs.size() != 1)
266 throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
267
268 /// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
269 auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE);
270 if (connections.empty() || connections.front().isNull())
271 throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR);
272
273 job.connection_entry = std::move(connections.front());
274 }
275 else
276 {
277 const auto & replica = addresses.at(job.shard_index).at(job.replica_index);
278
279 const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(job.replica_index);
280 if (!connection_pool)
281 throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR);
282
283 job.connection_entry = connection_pool->get(timeouts, &settings);
284 if (job.connection_entry.isNull())
285 throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR);
286 }
287
288 if (throttler)
289 job.connection_entry->setThrottler(throttler);
290
291 job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings);
292 job.stream->writePrefix();
293 }
294
295 CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
296 job.stream->write(shard_block);
297 }
298 else
299 {
300 if (!job.stream)
301 {
302 /// Forward user settings
303 job.local_context = std::make_unique<Context>(context);
304
305 InterpreterInsertQuery interp(query_ast, *job.local_context);
306 job.stream = interp.execute().out;
307 job.stream->writePrefix();
308 }
309
310 size_t num_repetitions = shard_info.getLocalNodeCount();
311 for (size_t i = 0; i < num_repetitions; ++i)
312 job.stream->write(shard_block);
313 }
314
315 job.blocks_written += 1;
316 job.rows_written += shard_block.rows();
317 };
318}
319
320
321void DistributedBlockOutputStream::writeSync(const Block & block)
322{
323 const Settings & settings = context.getSettingsRef();
324 const auto & shards_info = cluster->getShardsInfo();
325 size_t num_shards = shards_info.size();
326
327 if (!pool)
328 {
329 /// Deferred initialization. Only for sync insertion.
330 initWritingJobs(block);
331
332 pool.emplace(remote_jobs_count + local_jobs_count);
333
334 if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes))
335 {
336 throttler = std::make_shared<Throttler>(settings.max_network_bandwidth, settings.max_network_bytes,
337 "Network bandwidth limit for a query exceeded.");
338 }
339
340 watch.restart();
341 }
342
343 watch_current_block.restart();
344
345 if (num_shards > 1)
346 {
347 auto current_selector = createSelector(block);
348
349 /// Prepare row numbers for each shard
350 for (size_t shard_index : ext::range(0, num_shards))
351 per_shard_jobs[shard_index].shard_current_block_permuation.resize(0);
352
353 for (size_t i = 0; i < block.rows(); ++i)
354 per_shard_jobs[current_selector[i]].shard_current_block_permuation.push_back(i);
355 }
356
357 try
358 {
359 /// Run jobs in parallel for each block and wait them
360 finished_jobs_count = 0;
361 for (size_t shard_index : ext::range(0, shards_info.size()))
362 for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
363 pool->scheduleOrThrowOnError(runWritingJob(job, block));
364 }
365 catch (...)
366 {
367 pool->wait();
368 throw;
369 }
370
371 try
372 {
373 waitForJobs();
374 }
375 catch (Exception & exception)
376 {
377 exception.addMessage(getCurrentStateDescription());
378 throw;
379 }
380
381 inserted_blocks += 1;
382 inserted_rows += block.rows();
383}
384
385
386void DistributedBlockOutputStream::writeSuffix()
387{
388 auto log_performance = [this] ()
389 {
390 double elapsed = watch.elapsedSeconds();
391 LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks"
392 << ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second"
393 << ". " << getCurrentStateDescription());
394 };
395
396 if (insert_sync && pool)
397 {
398 finished_jobs_count = 0;
399 try
400 {
401 for (auto & shard_jobs : per_shard_jobs)
402 {
403 for (JobReplica & job : shard_jobs.replicas_jobs)
404 {
405 if (job.stream)
406 {
407 pool->scheduleOrThrowOnError([&job]()
408 {
409 job.stream->writeSuffix();
410 });
411 }
412 }
413 }
414 }
415 catch (...)
416 {
417 pool->wait();
418 throw;
419 }
420
421 try
422 {
423 pool->wait();
424 log_performance();
425 }
426 catch (Exception & exception)
427 {
428 log_performance();
429 exception.addMessage(getCurrentStateDescription());
430 throw;
431 }
432 }
433}
434
435
436IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block)
437{
438 Block current_block_with_sharding_key_expr = source_block;
439 storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
440
441 const auto & key_column = current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName());
442 const auto & slot_to_shard = cluster->getSlotToShard();
443
444// If key_column.type is DataTypeLowCardinality, do shard according to its dictionaryType
445#define CREATE_FOR_TYPE(TYPE) \
446 if (typeid_cast<const DataType ## TYPE *>(key_column.type.get())) \
447 return createBlockSelector<TYPE>(*key_column.column, slot_to_shard); \
448 else if (auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(key_column.type.get())) \
449 if (typeid_cast<const DataType ## TYPE *>(type_low_cardinality->getDictionaryType().get())) \
450 return createBlockSelector<TYPE>(*key_column.column->convertToFullColumnIfLowCardinality(), slot_to_shard);
451
452 CREATE_FOR_TYPE(UInt8)
453 CREATE_FOR_TYPE(UInt16)
454 CREATE_FOR_TYPE(UInt32)
455 CREATE_FOR_TYPE(UInt64)
456 CREATE_FOR_TYPE(Int8)
457 CREATE_FOR_TYPE(Int16)
458 CREATE_FOR_TYPE(Int32)
459 CREATE_FOR_TYPE(Int64)
460
461#undef CREATE_FOR_TYPE
462
463 throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
464}
465
466
467Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
468{
469 auto selector = createSelector(block);
470
471 /// Split block to num_shard smaller block, using 'selector'.
472
473 const size_t num_shards = cluster->getShardsInfo().size();
474 Blocks splitted_blocks(num_shards);
475
476 for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
477 splitted_blocks[shard_idx] = block.cloneEmpty();
478
479 size_t columns_in_block = block.columns();
480 for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
481 {
482 MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector);
483 for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
484 splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]);
485 }
486
487 return splitted_blocks;
488}
489
490
491void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
492{
493 Blocks splitted_blocks = splitBlock(block);
494 const size_t num_shards = splitted_blocks.size();
495
496 for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
497 if (splitted_blocks[shard_idx].rows())
498 writeAsyncImpl(splitted_blocks[shard_idx], shard_idx);
499
500 ++inserted_blocks;
501}
502
503
504void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id)
505{
506 const auto & shard_info = cluster->getShardsInfo()[shard_id];
507
508 if (shard_info.hasInternalReplication())
509 {
510 if (shard_info.getLocalNodeCount() > 0)
511 {
512 /// Prefer insert into current instance directly
513 writeToLocal(block, shard_info.getLocalNodeCount());
514 }
515 else
516 {
517 if (shard_info.dir_name_for_internal_replication.empty())
518 throw Exception("Directory name for async inserts is empty, table " + storage.getTableName(), ErrorCodes::LOGICAL_ERROR);
519
520 writeToShard(block, {shard_info.dir_name_for_internal_replication});
521 }
522 }
523 else
524 {
525 if (shard_info.getLocalNodeCount() > 0)
526 writeToLocal(block, shard_info.getLocalNodeCount());
527
528 std::vector<std::string> dir_names;
529 for (const auto & address : cluster->getShardsAddresses()[shard_id])
530 if (!address.is_local)
531 dir_names.push_back(address.toFullString());
532
533 if (!dir_names.empty())
534 writeToShard(block, dir_names);
535 }
536}
537
538
539void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
540{
541 /// Async insert does not support settings forwarding yet whereas sync one supports
542 InterpreterInsertQuery interp(query_ast, context);
543
544 auto block_io = interp.execute();
545 block_io.out->writePrefix();
546
547 for (size_t i = 0; i < repeats; ++i)
548 block_io.out->write(block);
549
550 block_io.out->writeSuffix();
551}
552
553
554void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
555{
556 /** tmp directory is used to ensure atomicity of transactions
557 * and keep monitor thread out from reading incomplete data
558 */
559 std::string first_file_tmp_path{};
560
561 auto first = true;
562
563 /// write first file, hardlink the others
564 for (const auto & dir_name : dir_names)
565 {
566 const auto & path = storage.getPath() + dir_name + '/';
567
568 /// ensure shard subdirectory creation and notify storage
569 if (Poco::File(path).createDirectory())
570 storage.requireDirectoryMonitor(dir_name);
571
572 const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
573 const auto & block_file_path = path + file_name;
574
575 /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
576 * the inode is not freed until we're done */
577 if (first)
578 {
579 first = false;
580
581 const auto & tmp_path = path + "tmp/";
582 Poco::File(tmp_path).createDirectory();
583 const auto & block_file_tmp_path = tmp_path + file_name;
584
585 first_file_tmp_path = block_file_tmp_path;
586
587 WriteBufferFromFile out{block_file_tmp_path};
588 CompressedWriteBuffer compress{out};
589 NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
590
591 /// Prepare the header.
592 /// We wrap the header into a string for compatibility with older versions:
593 /// a shard will able to read the header partly and ignore other parts based on its version.
594 WriteBufferFromOwnString header_buf;
595 writeVarUInt(ClickHouseRevision::get(), header_buf);
596 writeStringBinary(query_string, header_buf);
597 context.getSettingsRef().serialize(header_buf);
598
599 /// Add new fields here, for example:
600 /// writeVarUInt(my_new_data, header_buf);
601
602 /// Write the header.
603 const StringRef header = header_buf.stringRef();
604 writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out);
605 writeStringBinary(header, out);
606 writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out);
607
608 stream.writePrefix();
609 stream.write(block);
610 stream.writeSuffix();
611 }
612
613 if (link(first_file_tmp_path.data(), block_file_path.data()))
614 throwFromErrnoWithPath("Could not link " + block_file_path + " to " + first_file_tmp_path, block_file_path,
615 ErrorCodes::CANNOT_LINK);
616 }
617
618 /** remove the temporary file, enabling the OS to reclaim inode after all threads
619 * have removed their corresponding files */
620 Poco::File(first_file_tmp_path).remove();
621}
622
623
624}
625