1 | #include <Storages/Distributed/DistributedBlockOutputStream.h> |
2 | #include <Storages/Distributed/DirectoryMonitor.h> |
3 | #include <Storages/StorageDistributed.h> |
4 | |
5 | #include <Parsers/formatAST.h> |
6 | #include <Parsers/queryToString.h> |
7 | |
8 | #include <IO/WriteBufferFromFile.h> |
9 | #include <Compression/CompressedWriteBuffer.h> |
10 | #include <IO/Operators.h> |
11 | #include <IO/WriteBufferFromString.h> |
12 | #include <DataStreams/NativeBlockOutputStream.h> |
13 | #include <DataStreams/RemoteBlockOutputStream.h> |
14 | #include <Interpreters/InterpreterInsertQuery.h> |
15 | #include <Interpreters/createBlockSelector.h> |
16 | |
17 | #include <DataTypes/DataTypesNumber.h> |
18 | #include <DataTypes/DataTypeLowCardinality.h> |
19 | #include <Common/setThreadName.h> |
20 | #include <Common/ClickHouseRevision.h> |
21 | #include <Common/CurrentMetrics.h> |
22 | #include <Common/typeid_cast.h> |
23 | #include <Common/Exception.h> |
24 | #include <Common/ProfileEvents.h> |
25 | #include <Common/escapeForFileName.h> |
26 | #include <Common/CurrentThread.h> |
27 | #include <common/logger_useful.h> |
28 | #include <ext/range.h> |
29 | #include <ext/scope_guard.h> |
30 | |
31 | #include <Poco/DirectoryIterator.h> |
32 | |
33 | #include <future> |
34 | #include <condition_variable> |
35 | #include <mutex> |
36 | |
37 | |
38 | |
39 | namespace CurrentMetrics |
40 | { |
41 | extern const Metric DistributedSend; |
42 | } |
43 | |
44 | namespace ProfileEvents |
45 | { |
46 | extern const Event DistributedSyncInsertionTimeoutExceeded; |
47 | } |
48 | |
49 | namespace DB |
50 | { |
51 | |
52 | |
53 | namespace ErrorCodes |
54 | { |
55 | extern const int TIMEOUT_EXCEEDED; |
56 | extern const int TYPE_MISMATCH; |
57 | extern const int CANNOT_LINK; |
58 | } |
59 | |
60 | |
61 | DistributedBlockOutputStream::DistributedBlockOutputStream( |
62 | const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, |
63 | bool insert_sync_, UInt64 insert_timeout_) |
64 | : context(context_), storage(storage_), query_ast(query_ast_), query_string(queryToString(query_ast_)), |
65 | cluster(cluster_), insert_sync(insert_sync_), |
66 | insert_timeout(insert_timeout_), log(&Logger::get("DistributedBlockOutputStream" )) |
67 | { |
68 | } |
69 | |
70 | |
71 | Block DistributedBlockOutputStream::() const |
72 | { |
73 | return storage.getSampleBlock(); |
74 | } |
75 | |
76 | |
77 | void DistributedBlockOutputStream::writePrefix() |
78 | { |
79 | } |
80 | |
81 | |
82 | void DistributedBlockOutputStream::write(const Block & block) |
83 | { |
84 | Block ordinary_block{ block }; |
85 | |
86 | /* They are added by the AddingDefaultBlockOutputStream, and we will get |
87 | * different number of columns eventually */ |
88 | for (const auto & col : storage.getColumns().getMaterialized()) |
89 | { |
90 | if (ordinary_block.has(col.name)) |
91 | { |
92 | ordinary_block.erase(col.name); |
93 | LOG_DEBUG(log, storage.getTableName() |
94 | << ": column " + col.name + " will be removed, " |
95 | << "because it is MATERIALIZED" ); |
96 | } |
97 | } |
98 | |
99 | |
100 | if (insert_sync) |
101 | writeSync(ordinary_block); |
102 | else |
103 | writeAsync(ordinary_block); |
104 | } |
105 | |
106 | void DistributedBlockOutputStream::writeAsync(const Block & block) |
107 | { |
108 | if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) |
109 | return writeSplitAsync(block); |
110 | |
111 | writeAsyncImpl(block); |
112 | ++inserted_blocks; |
113 | } |
114 | |
115 | |
116 | std::string DistributedBlockOutputStream::getCurrentStateDescription() |
117 | { |
118 | std::stringstream buffer; |
119 | const auto & addresses = cluster->getShardsAddresses(); |
120 | |
121 | buffer << "Insertion status:\n" ; |
122 | for (auto & shard_jobs : per_shard_jobs) |
123 | for (JobReplica & job : shard_jobs.replicas_jobs) |
124 | { |
125 | buffer << "Wrote " << job.blocks_written << " blocks and " << job.rows_written << " rows" |
126 | << " on shard " << job.shard_index << " replica " << job.replica_index |
127 | << ", " << addresses[job.shard_index][job.replica_index].readableString(); |
128 | |
129 | /// Performance statistics |
130 | if (job.blocks_started > 0) |
131 | { |
132 | buffer << " (average " << job.elapsed_time_ms / job.blocks_started << " ms per block" ; |
133 | if (job.blocks_started > 1) |
134 | buffer << ", the slowest block " << job.max_elapsed_time_for_block_ms << " ms" ; |
135 | buffer << ")" ; |
136 | } |
137 | |
138 | buffer << "\n" ; |
139 | } |
140 | |
141 | return buffer.str(); |
142 | } |
143 | |
144 | |
145 | void DistributedBlockOutputStream::initWritingJobs(const Block & first_block) |
146 | { |
147 | const auto & addresses_with_failovers = cluster->getShardsAddresses(); |
148 | const auto & shards_info = cluster->getShardsInfo(); |
149 | size_t num_shards = shards_info.size(); |
150 | |
151 | remote_jobs_count = 0; |
152 | local_jobs_count = 0; |
153 | per_shard_jobs.resize(shards_info.size()); |
154 | |
155 | for (size_t shard_index : ext::range(0, shards_info.size())) |
156 | { |
157 | const auto & shard_info = shards_info[shard_index]; |
158 | auto & shard_jobs = per_shard_jobs[shard_index]; |
159 | |
160 | /// If hasInternalReplication, than prefer local replica |
161 | if (!shard_info.hasInternalReplication() || !shard_info.isLocal()) |
162 | { |
163 | const auto & replicas = addresses_with_failovers[shard_index]; |
164 | |
165 | for (size_t replica_index : ext::range(0, replicas.size())) |
166 | { |
167 | if (!replicas[replica_index].is_local) |
168 | { |
169 | shard_jobs.replicas_jobs.emplace_back(shard_index, replica_index, false, first_block); |
170 | ++remote_jobs_count; |
171 | |
172 | if (shard_info.hasInternalReplication()) |
173 | break; |
174 | } |
175 | } |
176 | } |
177 | |
178 | if (shard_info.isLocal()) |
179 | { |
180 | shard_jobs.replicas_jobs.emplace_back(shard_index, 0, true, first_block); |
181 | ++local_jobs_count; |
182 | } |
183 | |
184 | if (num_shards > 1) |
185 | shard_jobs.shard_current_block_permuation.reserve(first_block.rows()); |
186 | } |
187 | } |
188 | |
189 | |
190 | void DistributedBlockOutputStream::waitForJobs() |
191 | { |
192 | pool->wait(); |
193 | |
194 | if (insert_timeout) |
195 | { |
196 | if (static_cast<UInt64>(watch.elapsedSeconds()) > insert_timeout) |
197 | { |
198 | ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded); |
199 | throw Exception("Synchronous distributed insert timeout exceeded." , ErrorCodes::TIMEOUT_EXCEEDED); |
200 | } |
201 | } |
202 | |
203 | size_t jobs_count = remote_jobs_count + local_jobs_count; |
204 | size_t num_finished_jobs = finished_jobs_count; |
205 | |
206 | if (num_finished_jobs < jobs_count) |
207 | LOG_WARNING(log, "Expected " << jobs_count << " writing jobs, but finished only " << num_finished_jobs); |
208 | } |
209 | |
210 | |
211 | ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block) |
212 | { |
213 | auto thread_group = CurrentThread::getGroup(); |
214 | return [this, thread_group, &job, ¤t_block]() |
215 | { |
216 | if (thread_group) |
217 | CurrentThread::attachToIfDetached(thread_group); |
218 | setThreadName("DistrOutStrProc" ); |
219 | |
220 | ++job.blocks_started; |
221 | |
222 | SCOPE_EXIT({ |
223 | ++finished_jobs_count; |
224 | |
225 | UInt64 elapsed_time_for_block_ms = watch_current_block.elapsedMilliseconds(); |
226 | job.elapsed_time_ms += elapsed_time_for_block_ms; |
227 | job.max_elapsed_time_for_block_ms = std::max(job.max_elapsed_time_for_block_ms, elapsed_time_for_block_ms); |
228 | }); |
229 | |
230 | const auto & shard_info = cluster->getShardsInfo()[job.shard_index]; |
231 | size_t num_shards = cluster->getShardsInfo().size(); |
232 | auto & shard_job = per_shard_jobs[job.shard_index]; |
233 | const auto & addresses = cluster->getShardsAddresses(); |
234 | |
235 | /// Generate current shard block |
236 | if (num_shards > 1) |
237 | { |
238 | auto & shard_permutation = shard_job.shard_current_block_permuation; |
239 | size_t num_shard_rows = shard_permutation.size(); |
240 | |
241 | for (size_t j = 0; j < current_block.columns(); ++j) |
242 | { |
243 | auto & src_column = current_block.getByPosition(j).column; |
244 | auto & dst_column = job.current_shard_block.getByPosition(j).column; |
245 | |
246 | /// Zero permutation size has special meaning in IColumn::permute |
247 | if (num_shard_rows) |
248 | dst_column = src_column->permute(shard_permutation, num_shard_rows); |
249 | else |
250 | dst_column = src_column->cloneEmpty(); |
251 | } |
252 | } |
253 | |
254 | const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block; |
255 | |
256 | if (!job.is_local_job) |
257 | { |
258 | if (!job.stream) |
259 | { |
260 | const Settings & settings = context.getSettingsRef(); |
261 | auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); |
262 | if (shard_info.hasInternalReplication()) |
263 | { |
264 | /// Skip replica_index in case of internal replication |
265 | if (shard_job.replicas_jobs.size() != 1) |
266 | throw Exception("There are several writing job for an automatically replicated shard" , ErrorCodes::LOGICAL_ERROR); |
267 | |
268 | /// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here |
269 | auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE); |
270 | if (connections.empty() || connections.front().isNull()) |
271 | throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR); |
272 | |
273 | job.connection_entry = std::move(connections.front()); |
274 | } |
275 | else |
276 | { |
277 | const auto & replica = addresses.at(job.shard_index).at(job.replica_index); |
278 | |
279 | const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(job.replica_index); |
280 | if (!connection_pool) |
281 | throw Exception("Connection pool for replica " + replica.readableString() + " does not exist" , ErrorCodes::LOGICAL_ERROR); |
282 | |
283 | job.connection_entry = connection_pool->get(timeouts, &settings); |
284 | if (job.connection_entry.isNull()) |
285 | throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR); |
286 | } |
287 | |
288 | if (throttler) |
289 | job.connection_entry->setThrottler(throttler); |
290 | |
291 | job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings); |
292 | job.stream->writePrefix(); |
293 | } |
294 | |
295 | CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; |
296 | job.stream->write(shard_block); |
297 | } |
298 | else |
299 | { |
300 | if (!job.stream) |
301 | { |
302 | /// Forward user settings |
303 | job.local_context = std::make_unique<Context>(context); |
304 | |
305 | InterpreterInsertQuery interp(query_ast, *job.local_context); |
306 | job.stream = interp.execute().out; |
307 | job.stream->writePrefix(); |
308 | } |
309 | |
310 | size_t num_repetitions = shard_info.getLocalNodeCount(); |
311 | for (size_t i = 0; i < num_repetitions; ++i) |
312 | job.stream->write(shard_block); |
313 | } |
314 | |
315 | job.blocks_written += 1; |
316 | job.rows_written += shard_block.rows(); |
317 | }; |
318 | } |
319 | |
320 | |
321 | void DistributedBlockOutputStream::writeSync(const Block & block) |
322 | { |
323 | const Settings & settings = context.getSettingsRef(); |
324 | const auto & shards_info = cluster->getShardsInfo(); |
325 | size_t num_shards = shards_info.size(); |
326 | |
327 | if (!pool) |
328 | { |
329 | /// Deferred initialization. Only for sync insertion. |
330 | initWritingJobs(block); |
331 | |
332 | pool.emplace(remote_jobs_count + local_jobs_count); |
333 | |
334 | if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes)) |
335 | { |
336 | throttler = std::make_shared<Throttler>(settings.max_network_bandwidth, settings.max_network_bytes, |
337 | "Network bandwidth limit for a query exceeded." ); |
338 | } |
339 | |
340 | watch.restart(); |
341 | } |
342 | |
343 | watch_current_block.restart(); |
344 | |
345 | if (num_shards > 1) |
346 | { |
347 | auto current_selector = createSelector(block); |
348 | |
349 | /// Prepare row numbers for each shard |
350 | for (size_t shard_index : ext::range(0, num_shards)) |
351 | per_shard_jobs[shard_index].shard_current_block_permuation.resize(0); |
352 | |
353 | for (size_t i = 0; i < block.rows(); ++i) |
354 | per_shard_jobs[current_selector[i]].shard_current_block_permuation.push_back(i); |
355 | } |
356 | |
357 | try |
358 | { |
359 | /// Run jobs in parallel for each block and wait them |
360 | finished_jobs_count = 0; |
361 | for (size_t shard_index : ext::range(0, shards_info.size())) |
362 | for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) |
363 | pool->scheduleOrThrowOnError(runWritingJob(job, block)); |
364 | } |
365 | catch (...) |
366 | { |
367 | pool->wait(); |
368 | throw; |
369 | } |
370 | |
371 | try |
372 | { |
373 | waitForJobs(); |
374 | } |
375 | catch (Exception & exception) |
376 | { |
377 | exception.addMessage(getCurrentStateDescription()); |
378 | throw; |
379 | } |
380 | |
381 | inserted_blocks += 1; |
382 | inserted_rows += block.rows(); |
383 | } |
384 | |
385 | |
386 | void DistributedBlockOutputStream::writeSuffix() |
387 | { |
388 | auto log_performance = [this] () |
389 | { |
390 | double elapsed = watch.elapsedSeconds(); |
391 | LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks" |
392 | << ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second" |
393 | << ". " << getCurrentStateDescription()); |
394 | }; |
395 | |
396 | if (insert_sync && pool) |
397 | { |
398 | finished_jobs_count = 0; |
399 | try |
400 | { |
401 | for (auto & shard_jobs : per_shard_jobs) |
402 | { |
403 | for (JobReplica & job : shard_jobs.replicas_jobs) |
404 | { |
405 | if (job.stream) |
406 | { |
407 | pool->scheduleOrThrowOnError([&job]() |
408 | { |
409 | job.stream->writeSuffix(); |
410 | }); |
411 | } |
412 | } |
413 | } |
414 | } |
415 | catch (...) |
416 | { |
417 | pool->wait(); |
418 | throw; |
419 | } |
420 | |
421 | try |
422 | { |
423 | pool->wait(); |
424 | log_performance(); |
425 | } |
426 | catch (Exception & exception) |
427 | { |
428 | log_performance(); |
429 | exception.addMessage(getCurrentStateDescription()); |
430 | throw; |
431 | } |
432 | } |
433 | } |
434 | |
435 | |
436 | IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block) |
437 | { |
438 | Block current_block_with_sharding_key_expr = source_block; |
439 | storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr); |
440 | |
441 | const auto & key_column = current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName()); |
442 | const auto & slot_to_shard = cluster->getSlotToShard(); |
443 | |
444 | // If key_column.type is DataTypeLowCardinality, do shard according to its dictionaryType |
445 | #define CREATE_FOR_TYPE(TYPE) \ |
446 | if (typeid_cast<const DataType ## TYPE *>(key_column.type.get())) \ |
447 | return createBlockSelector<TYPE>(*key_column.column, slot_to_shard); \ |
448 | else if (auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(key_column.type.get())) \ |
449 | if (typeid_cast<const DataType ## TYPE *>(type_low_cardinality->getDictionaryType().get())) \ |
450 | return createBlockSelector<TYPE>(*key_column.column->convertToFullColumnIfLowCardinality(), slot_to_shard); |
451 | |
452 | CREATE_FOR_TYPE(UInt8) |
453 | CREATE_FOR_TYPE(UInt16) |
454 | CREATE_FOR_TYPE(UInt32) |
455 | CREATE_FOR_TYPE(UInt64) |
456 | CREATE_FOR_TYPE(Int8) |
457 | CREATE_FOR_TYPE(Int16) |
458 | CREATE_FOR_TYPE(Int32) |
459 | CREATE_FOR_TYPE(Int64) |
460 | |
461 | #undef CREATE_FOR_TYPE |
462 | |
463 | throw Exception{"Sharding key expression does not evaluate to an integer type" , ErrorCodes::TYPE_MISMATCH}; |
464 | } |
465 | |
466 | |
467 | Blocks DistributedBlockOutputStream::splitBlock(const Block & block) |
468 | { |
469 | auto selector = createSelector(block); |
470 | |
471 | /// Split block to num_shard smaller block, using 'selector'. |
472 | |
473 | const size_t num_shards = cluster->getShardsInfo().size(); |
474 | Blocks splitted_blocks(num_shards); |
475 | |
476 | for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) |
477 | splitted_blocks[shard_idx] = block.cloneEmpty(); |
478 | |
479 | size_t columns_in_block = block.columns(); |
480 | for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block) |
481 | { |
482 | MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector); |
483 | for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) |
484 | splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]); |
485 | } |
486 | |
487 | return splitted_blocks; |
488 | } |
489 | |
490 | |
491 | void DistributedBlockOutputStream::writeSplitAsync(const Block & block) |
492 | { |
493 | Blocks splitted_blocks = splitBlock(block); |
494 | const size_t num_shards = splitted_blocks.size(); |
495 | |
496 | for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) |
497 | if (splitted_blocks[shard_idx].rows()) |
498 | writeAsyncImpl(splitted_blocks[shard_idx], shard_idx); |
499 | |
500 | ++inserted_blocks; |
501 | } |
502 | |
503 | |
504 | void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id) |
505 | { |
506 | const auto & shard_info = cluster->getShardsInfo()[shard_id]; |
507 | |
508 | if (shard_info.hasInternalReplication()) |
509 | { |
510 | if (shard_info.getLocalNodeCount() > 0) |
511 | { |
512 | /// Prefer insert into current instance directly |
513 | writeToLocal(block, shard_info.getLocalNodeCount()); |
514 | } |
515 | else |
516 | { |
517 | if (shard_info.dir_name_for_internal_replication.empty()) |
518 | throw Exception("Directory name for async inserts is empty, table " + storage.getTableName(), ErrorCodes::LOGICAL_ERROR); |
519 | |
520 | writeToShard(block, {shard_info.dir_name_for_internal_replication}); |
521 | } |
522 | } |
523 | else |
524 | { |
525 | if (shard_info.getLocalNodeCount() > 0) |
526 | writeToLocal(block, shard_info.getLocalNodeCount()); |
527 | |
528 | std::vector<std::string> dir_names; |
529 | for (const auto & address : cluster->getShardsAddresses()[shard_id]) |
530 | if (!address.is_local) |
531 | dir_names.push_back(address.toFullString()); |
532 | |
533 | if (!dir_names.empty()) |
534 | writeToShard(block, dir_names); |
535 | } |
536 | } |
537 | |
538 | |
539 | void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats) |
540 | { |
541 | /// Async insert does not support settings forwarding yet whereas sync one supports |
542 | InterpreterInsertQuery interp(query_ast, context); |
543 | |
544 | auto block_io = interp.execute(); |
545 | block_io.out->writePrefix(); |
546 | |
547 | for (size_t i = 0; i < repeats; ++i) |
548 | block_io.out->write(block); |
549 | |
550 | block_io.out->writeSuffix(); |
551 | } |
552 | |
553 | |
554 | void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names) |
555 | { |
556 | /** tmp directory is used to ensure atomicity of transactions |
557 | * and keep monitor thread out from reading incomplete data |
558 | */ |
559 | std::string first_file_tmp_path{}; |
560 | |
561 | auto first = true; |
562 | |
563 | /// write first file, hardlink the others |
564 | for (const auto & dir_name : dir_names) |
565 | { |
566 | const auto & path = storage.getPath() + dir_name + '/'; |
567 | |
568 | /// ensure shard subdirectory creation and notify storage |
569 | if (Poco::File(path).createDirectory()) |
570 | storage.requireDirectoryMonitor(dir_name); |
571 | |
572 | const auto & file_name = toString(storage.file_names_increment.get()) + ".bin" ; |
573 | const auto & block_file_path = path + file_name; |
574 | |
575 | /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure |
576 | * the inode is not freed until we're done */ |
577 | if (first) |
578 | { |
579 | first = false; |
580 | |
581 | const auto & tmp_path = path + "tmp/" ; |
582 | Poco::File(tmp_path).createDirectory(); |
583 | const auto & block_file_tmp_path = tmp_path + file_name; |
584 | |
585 | first_file_tmp_path = block_file_tmp_path; |
586 | |
587 | WriteBufferFromFile out{block_file_tmp_path}; |
588 | CompressedWriteBuffer compress{out}; |
589 | NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()}; |
590 | |
591 | /// Prepare the header. |
592 | /// We wrap the header into a string for compatibility with older versions: |
593 | /// a shard will able to read the header partly and ignore other parts based on its version. |
594 | WriteBufferFromOwnString ; |
595 | writeVarUInt(ClickHouseRevision::get(), header_buf); |
596 | writeStringBinary(query_string, header_buf); |
597 | context.getSettingsRef().serialize(header_buf); |
598 | |
599 | /// Add new fields here, for example: |
600 | /// writeVarUInt(my_new_data, header_buf); |
601 | |
602 | /// Write the header. |
603 | const StringRef = header_buf.stringRef(); |
604 | writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out); |
605 | writeStringBinary(header, out); |
606 | writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out); |
607 | |
608 | stream.writePrefix(); |
609 | stream.write(block); |
610 | stream.writeSuffix(); |
611 | } |
612 | |
613 | if (link(first_file_tmp_path.data(), block_file_path.data())) |
614 | throwFromErrnoWithPath("Could not link " + block_file_path + " to " + first_file_tmp_path, block_file_path, |
615 | ErrorCodes::CANNOT_LINK); |
616 | } |
617 | |
618 | /** remove the temporary file, enabling the OS to reclaim inode after all threads |
619 | * have removed their corresponding files */ |
620 | Poco::File(first_file_tmp_path).remove(); |
621 | } |
622 | |
623 | |
624 | } |
625 | |