| 1 | #include <Storages/Distributed/DistributedBlockOutputStream.h> |
| 2 | #include <Storages/Distributed/DirectoryMonitor.h> |
| 3 | #include <Storages/StorageDistributed.h> |
| 4 | |
| 5 | #include <Parsers/formatAST.h> |
| 6 | #include <Parsers/queryToString.h> |
| 7 | |
| 8 | #include <IO/WriteBufferFromFile.h> |
| 9 | #include <Compression/CompressedWriteBuffer.h> |
| 10 | #include <IO/Operators.h> |
| 11 | #include <IO/WriteBufferFromString.h> |
| 12 | #include <DataStreams/NativeBlockOutputStream.h> |
| 13 | #include <DataStreams/RemoteBlockOutputStream.h> |
| 14 | #include <Interpreters/InterpreterInsertQuery.h> |
| 15 | #include <Interpreters/createBlockSelector.h> |
| 16 | |
| 17 | #include <DataTypes/DataTypesNumber.h> |
| 18 | #include <DataTypes/DataTypeLowCardinality.h> |
| 19 | #include <Common/setThreadName.h> |
| 20 | #include <Common/ClickHouseRevision.h> |
| 21 | #include <Common/CurrentMetrics.h> |
| 22 | #include <Common/typeid_cast.h> |
| 23 | #include <Common/Exception.h> |
| 24 | #include <Common/ProfileEvents.h> |
| 25 | #include <Common/escapeForFileName.h> |
| 26 | #include <Common/CurrentThread.h> |
| 27 | #include <common/logger_useful.h> |
| 28 | #include <ext/range.h> |
| 29 | #include <ext/scope_guard.h> |
| 30 | |
| 31 | #include <Poco/DirectoryIterator.h> |
| 32 | |
| 33 | #include <future> |
| 34 | #include <condition_variable> |
| 35 | #include <mutex> |
| 36 | |
| 37 | |
| 38 | |
| 39 | namespace CurrentMetrics |
| 40 | { |
| 41 | extern const Metric DistributedSend; |
| 42 | } |
| 43 | |
| 44 | namespace ProfileEvents |
| 45 | { |
| 46 | extern const Event DistributedSyncInsertionTimeoutExceeded; |
| 47 | } |
| 48 | |
| 49 | namespace DB |
| 50 | { |
| 51 | |
| 52 | |
| 53 | namespace ErrorCodes |
| 54 | { |
| 55 | extern const int TIMEOUT_EXCEEDED; |
| 56 | extern const int TYPE_MISMATCH; |
| 57 | extern const int CANNOT_LINK; |
| 58 | } |
| 59 | |
| 60 | |
| 61 | DistributedBlockOutputStream::DistributedBlockOutputStream( |
| 62 | const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, |
| 63 | bool insert_sync_, UInt64 insert_timeout_) |
| 64 | : context(context_), storage(storage_), query_ast(query_ast_), query_string(queryToString(query_ast_)), |
| 65 | cluster(cluster_), insert_sync(insert_sync_), |
| 66 | insert_timeout(insert_timeout_), log(&Logger::get("DistributedBlockOutputStream" )) |
| 67 | { |
| 68 | } |
| 69 | |
| 70 | |
| 71 | Block DistributedBlockOutputStream::() const |
| 72 | { |
| 73 | return storage.getSampleBlock(); |
| 74 | } |
| 75 | |
| 76 | |
| 77 | void DistributedBlockOutputStream::writePrefix() |
| 78 | { |
| 79 | } |
| 80 | |
| 81 | |
| 82 | void DistributedBlockOutputStream::write(const Block & block) |
| 83 | { |
| 84 | Block ordinary_block{ block }; |
| 85 | |
| 86 | /* They are added by the AddingDefaultBlockOutputStream, and we will get |
| 87 | * different number of columns eventually */ |
| 88 | for (const auto & col : storage.getColumns().getMaterialized()) |
| 89 | { |
| 90 | if (ordinary_block.has(col.name)) |
| 91 | { |
| 92 | ordinary_block.erase(col.name); |
| 93 | LOG_DEBUG(log, storage.getTableName() |
| 94 | << ": column " + col.name + " will be removed, " |
| 95 | << "because it is MATERIALIZED" ); |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | |
| 100 | if (insert_sync) |
| 101 | writeSync(ordinary_block); |
| 102 | else |
| 103 | writeAsync(ordinary_block); |
| 104 | } |
| 105 | |
| 106 | void DistributedBlockOutputStream::writeAsync(const Block & block) |
| 107 | { |
| 108 | if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) |
| 109 | return writeSplitAsync(block); |
| 110 | |
| 111 | writeAsyncImpl(block); |
| 112 | ++inserted_blocks; |
| 113 | } |
| 114 | |
| 115 | |
| 116 | std::string DistributedBlockOutputStream::getCurrentStateDescription() |
| 117 | { |
| 118 | std::stringstream buffer; |
| 119 | const auto & addresses = cluster->getShardsAddresses(); |
| 120 | |
| 121 | buffer << "Insertion status:\n" ; |
| 122 | for (auto & shard_jobs : per_shard_jobs) |
| 123 | for (JobReplica & job : shard_jobs.replicas_jobs) |
| 124 | { |
| 125 | buffer << "Wrote " << job.blocks_written << " blocks and " << job.rows_written << " rows" |
| 126 | << " on shard " << job.shard_index << " replica " << job.replica_index |
| 127 | << ", " << addresses[job.shard_index][job.replica_index].readableString(); |
| 128 | |
| 129 | /// Performance statistics |
| 130 | if (job.blocks_started > 0) |
| 131 | { |
| 132 | buffer << " (average " << job.elapsed_time_ms / job.blocks_started << " ms per block" ; |
| 133 | if (job.blocks_started > 1) |
| 134 | buffer << ", the slowest block " << job.max_elapsed_time_for_block_ms << " ms" ; |
| 135 | buffer << ")" ; |
| 136 | } |
| 137 | |
| 138 | buffer << "\n" ; |
| 139 | } |
| 140 | |
| 141 | return buffer.str(); |
| 142 | } |
| 143 | |
| 144 | |
| 145 | void DistributedBlockOutputStream::initWritingJobs(const Block & first_block) |
| 146 | { |
| 147 | const auto & addresses_with_failovers = cluster->getShardsAddresses(); |
| 148 | const auto & shards_info = cluster->getShardsInfo(); |
| 149 | size_t num_shards = shards_info.size(); |
| 150 | |
| 151 | remote_jobs_count = 0; |
| 152 | local_jobs_count = 0; |
| 153 | per_shard_jobs.resize(shards_info.size()); |
| 154 | |
| 155 | for (size_t shard_index : ext::range(0, shards_info.size())) |
| 156 | { |
| 157 | const auto & shard_info = shards_info[shard_index]; |
| 158 | auto & shard_jobs = per_shard_jobs[shard_index]; |
| 159 | |
| 160 | /// If hasInternalReplication, than prefer local replica |
| 161 | if (!shard_info.hasInternalReplication() || !shard_info.isLocal()) |
| 162 | { |
| 163 | const auto & replicas = addresses_with_failovers[shard_index]; |
| 164 | |
| 165 | for (size_t replica_index : ext::range(0, replicas.size())) |
| 166 | { |
| 167 | if (!replicas[replica_index].is_local) |
| 168 | { |
| 169 | shard_jobs.replicas_jobs.emplace_back(shard_index, replica_index, false, first_block); |
| 170 | ++remote_jobs_count; |
| 171 | |
| 172 | if (shard_info.hasInternalReplication()) |
| 173 | break; |
| 174 | } |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | if (shard_info.isLocal()) |
| 179 | { |
| 180 | shard_jobs.replicas_jobs.emplace_back(shard_index, 0, true, first_block); |
| 181 | ++local_jobs_count; |
| 182 | } |
| 183 | |
| 184 | if (num_shards > 1) |
| 185 | shard_jobs.shard_current_block_permuation.reserve(first_block.rows()); |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | |
| 190 | void DistributedBlockOutputStream::waitForJobs() |
| 191 | { |
| 192 | pool->wait(); |
| 193 | |
| 194 | if (insert_timeout) |
| 195 | { |
| 196 | if (static_cast<UInt64>(watch.elapsedSeconds()) > insert_timeout) |
| 197 | { |
| 198 | ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded); |
| 199 | throw Exception("Synchronous distributed insert timeout exceeded." , ErrorCodes::TIMEOUT_EXCEEDED); |
| 200 | } |
| 201 | } |
| 202 | |
| 203 | size_t jobs_count = remote_jobs_count + local_jobs_count; |
| 204 | size_t num_finished_jobs = finished_jobs_count; |
| 205 | |
| 206 | if (num_finished_jobs < jobs_count) |
| 207 | LOG_WARNING(log, "Expected " << jobs_count << " writing jobs, but finished only " << num_finished_jobs); |
| 208 | } |
| 209 | |
| 210 | |
| 211 | ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block) |
| 212 | { |
| 213 | auto thread_group = CurrentThread::getGroup(); |
| 214 | return [this, thread_group, &job, ¤t_block]() |
| 215 | { |
| 216 | if (thread_group) |
| 217 | CurrentThread::attachToIfDetached(thread_group); |
| 218 | setThreadName("DistrOutStrProc" ); |
| 219 | |
| 220 | ++job.blocks_started; |
| 221 | |
| 222 | SCOPE_EXIT({ |
| 223 | ++finished_jobs_count; |
| 224 | |
| 225 | UInt64 elapsed_time_for_block_ms = watch_current_block.elapsedMilliseconds(); |
| 226 | job.elapsed_time_ms += elapsed_time_for_block_ms; |
| 227 | job.max_elapsed_time_for_block_ms = std::max(job.max_elapsed_time_for_block_ms, elapsed_time_for_block_ms); |
| 228 | }); |
| 229 | |
| 230 | const auto & shard_info = cluster->getShardsInfo()[job.shard_index]; |
| 231 | size_t num_shards = cluster->getShardsInfo().size(); |
| 232 | auto & shard_job = per_shard_jobs[job.shard_index]; |
| 233 | const auto & addresses = cluster->getShardsAddresses(); |
| 234 | |
| 235 | /// Generate current shard block |
| 236 | if (num_shards > 1) |
| 237 | { |
| 238 | auto & shard_permutation = shard_job.shard_current_block_permuation; |
| 239 | size_t num_shard_rows = shard_permutation.size(); |
| 240 | |
| 241 | for (size_t j = 0; j < current_block.columns(); ++j) |
| 242 | { |
| 243 | auto & src_column = current_block.getByPosition(j).column; |
| 244 | auto & dst_column = job.current_shard_block.getByPosition(j).column; |
| 245 | |
| 246 | /// Zero permutation size has special meaning in IColumn::permute |
| 247 | if (num_shard_rows) |
| 248 | dst_column = src_column->permute(shard_permutation, num_shard_rows); |
| 249 | else |
| 250 | dst_column = src_column->cloneEmpty(); |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block; |
| 255 | |
| 256 | if (!job.is_local_job) |
| 257 | { |
| 258 | if (!job.stream) |
| 259 | { |
| 260 | const Settings & settings = context.getSettingsRef(); |
| 261 | auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); |
| 262 | if (shard_info.hasInternalReplication()) |
| 263 | { |
| 264 | /// Skip replica_index in case of internal replication |
| 265 | if (shard_job.replicas_jobs.size() != 1) |
| 266 | throw Exception("There are several writing job for an automatically replicated shard" , ErrorCodes::LOGICAL_ERROR); |
| 267 | |
| 268 | /// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here |
| 269 | auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE); |
| 270 | if (connections.empty() || connections.front().isNull()) |
| 271 | throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR); |
| 272 | |
| 273 | job.connection_entry = std::move(connections.front()); |
| 274 | } |
| 275 | else |
| 276 | { |
| 277 | const auto & replica = addresses.at(job.shard_index).at(job.replica_index); |
| 278 | |
| 279 | const ConnectionPoolPtr & connection_pool = shard_info.per_replica_pools.at(job.replica_index); |
| 280 | if (!connection_pool) |
| 281 | throw Exception("Connection pool for replica " + replica.readableString() + " does not exist" , ErrorCodes::LOGICAL_ERROR); |
| 282 | |
| 283 | job.connection_entry = connection_pool->get(timeouts, &settings); |
| 284 | if (job.connection_entry.isNull()) |
| 285 | throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR); |
| 286 | } |
| 287 | |
| 288 | if (throttler) |
| 289 | job.connection_entry->setThrottler(throttler); |
| 290 | |
| 291 | job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings); |
| 292 | job.stream->writePrefix(); |
| 293 | } |
| 294 | |
| 295 | CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; |
| 296 | job.stream->write(shard_block); |
| 297 | } |
| 298 | else |
| 299 | { |
| 300 | if (!job.stream) |
| 301 | { |
| 302 | /// Forward user settings |
| 303 | job.local_context = std::make_unique<Context>(context); |
| 304 | |
| 305 | InterpreterInsertQuery interp(query_ast, *job.local_context); |
| 306 | job.stream = interp.execute().out; |
| 307 | job.stream->writePrefix(); |
| 308 | } |
| 309 | |
| 310 | size_t num_repetitions = shard_info.getLocalNodeCount(); |
| 311 | for (size_t i = 0; i < num_repetitions; ++i) |
| 312 | job.stream->write(shard_block); |
| 313 | } |
| 314 | |
| 315 | job.blocks_written += 1; |
| 316 | job.rows_written += shard_block.rows(); |
| 317 | }; |
| 318 | } |
| 319 | |
| 320 | |
| 321 | void DistributedBlockOutputStream::writeSync(const Block & block) |
| 322 | { |
| 323 | const Settings & settings = context.getSettingsRef(); |
| 324 | const auto & shards_info = cluster->getShardsInfo(); |
| 325 | size_t num_shards = shards_info.size(); |
| 326 | |
| 327 | if (!pool) |
| 328 | { |
| 329 | /// Deferred initialization. Only for sync insertion. |
| 330 | initWritingJobs(block); |
| 331 | |
| 332 | pool.emplace(remote_jobs_count + local_jobs_count); |
| 333 | |
| 334 | if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes)) |
| 335 | { |
| 336 | throttler = std::make_shared<Throttler>(settings.max_network_bandwidth, settings.max_network_bytes, |
| 337 | "Network bandwidth limit for a query exceeded." ); |
| 338 | } |
| 339 | |
| 340 | watch.restart(); |
| 341 | } |
| 342 | |
| 343 | watch_current_block.restart(); |
| 344 | |
| 345 | if (num_shards > 1) |
| 346 | { |
| 347 | auto current_selector = createSelector(block); |
| 348 | |
| 349 | /// Prepare row numbers for each shard |
| 350 | for (size_t shard_index : ext::range(0, num_shards)) |
| 351 | per_shard_jobs[shard_index].shard_current_block_permuation.resize(0); |
| 352 | |
| 353 | for (size_t i = 0; i < block.rows(); ++i) |
| 354 | per_shard_jobs[current_selector[i]].shard_current_block_permuation.push_back(i); |
| 355 | } |
| 356 | |
| 357 | try |
| 358 | { |
| 359 | /// Run jobs in parallel for each block and wait them |
| 360 | finished_jobs_count = 0; |
| 361 | for (size_t shard_index : ext::range(0, shards_info.size())) |
| 362 | for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) |
| 363 | pool->scheduleOrThrowOnError(runWritingJob(job, block)); |
| 364 | } |
| 365 | catch (...) |
| 366 | { |
| 367 | pool->wait(); |
| 368 | throw; |
| 369 | } |
| 370 | |
| 371 | try |
| 372 | { |
| 373 | waitForJobs(); |
| 374 | } |
| 375 | catch (Exception & exception) |
| 376 | { |
| 377 | exception.addMessage(getCurrentStateDescription()); |
| 378 | throw; |
| 379 | } |
| 380 | |
| 381 | inserted_blocks += 1; |
| 382 | inserted_rows += block.rows(); |
| 383 | } |
| 384 | |
| 385 | |
| 386 | void DistributedBlockOutputStream::writeSuffix() |
| 387 | { |
| 388 | auto log_performance = [this] () |
| 389 | { |
| 390 | double elapsed = watch.elapsedSeconds(); |
| 391 | LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks" |
| 392 | << ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second" |
| 393 | << ". " << getCurrentStateDescription()); |
| 394 | }; |
| 395 | |
| 396 | if (insert_sync && pool) |
| 397 | { |
| 398 | finished_jobs_count = 0; |
| 399 | try |
| 400 | { |
| 401 | for (auto & shard_jobs : per_shard_jobs) |
| 402 | { |
| 403 | for (JobReplica & job : shard_jobs.replicas_jobs) |
| 404 | { |
| 405 | if (job.stream) |
| 406 | { |
| 407 | pool->scheduleOrThrowOnError([&job]() |
| 408 | { |
| 409 | job.stream->writeSuffix(); |
| 410 | }); |
| 411 | } |
| 412 | } |
| 413 | } |
| 414 | } |
| 415 | catch (...) |
| 416 | { |
| 417 | pool->wait(); |
| 418 | throw; |
| 419 | } |
| 420 | |
| 421 | try |
| 422 | { |
| 423 | pool->wait(); |
| 424 | log_performance(); |
| 425 | } |
| 426 | catch (Exception & exception) |
| 427 | { |
| 428 | log_performance(); |
| 429 | exception.addMessage(getCurrentStateDescription()); |
| 430 | throw; |
| 431 | } |
| 432 | } |
| 433 | } |
| 434 | |
| 435 | |
| 436 | IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block) |
| 437 | { |
| 438 | Block current_block_with_sharding_key_expr = source_block; |
| 439 | storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr); |
| 440 | |
| 441 | const auto & key_column = current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName()); |
| 442 | const auto & slot_to_shard = cluster->getSlotToShard(); |
| 443 | |
| 444 | // If key_column.type is DataTypeLowCardinality, do shard according to its dictionaryType |
| 445 | #define CREATE_FOR_TYPE(TYPE) \ |
| 446 | if (typeid_cast<const DataType ## TYPE *>(key_column.type.get())) \ |
| 447 | return createBlockSelector<TYPE>(*key_column.column, slot_to_shard); \ |
| 448 | else if (auto * type_low_cardinality = typeid_cast<const DataTypeLowCardinality *>(key_column.type.get())) \ |
| 449 | if (typeid_cast<const DataType ## TYPE *>(type_low_cardinality->getDictionaryType().get())) \ |
| 450 | return createBlockSelector<TYPE>(*key_column.column->convertToFullColumnIfLowCardinality(), slot_to_shard); |
| 451 | |
| 452 | CREATE_FOR_TYPE(UInt8) |
| 453 | CREATE_FOR_TYPE(UInt16) |
| 454 | CREATE_FOR_TYPE(UInt32) |
| 455 | CREATE_FOR_TYPE(UInt64) |
| 456 | CREATE_FOR_TYPE(Int8) |
| 457 | CREATE_FOR_TYPE(Int16) |
| 458 | CREATE_FOR_TYPE(Int32) |
| 459 | CREATE_FOR_TYPE(Int64) |
| 460 | |
| 461 | #undef CREATE_FOR_TYPE |
| 462 | |
| 463 | throw Exception{"Sharding key expression does not evaluate to an integer type" , ErrorCodes::TYPE_MISMATCH}; |
| 464 | } |
| 465 | |
| 466 | |
| 467 | Blocks DistributedBlockOutputStream::splitBlock(const Block & block) |
| 468 | { |
| 469 | auto selector = createSelector(block); |
| 470 | |
| 471 | /// Split block to num_shard smaller block, using 'selector'. |
| 472 | |
| 473 | const size_t num_shards = cluster->getShardsInfo().size(); |
| 474 | Blocks splitted_blocks(num_shards); |
| 475 | |
| 476 | for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) |
| 477 | splitted_blocks[shard_idx] = block.cloneEmpty(); |
| 478 | |
| 479 | size_t columns_in_block = block.columns(); |
| 480 | for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block) |
| 481 | { |
| 482 | MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector); |
| 483 | for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) |
| 484 | splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]); |
| 485 | } |
| 486 | |
| 487 | return splitted_blocks; |
| 488 | } |
| 489 | |
| 490 | |
| 491 | void DistributedBlockOutputStream::writeSplitAsync(const Block & block) |
| 492 | { |
| 493 | Blocks splitted_blocks = splitBlock(block); |
| 494 | const size_t num_shards = splitted_blocks.size(); |
| 495 | |
| 496 | for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx) |
| 497 | if (splitted_blocks[shard_idx].rows()) |
| 498 | writeAsyncImpl(splitted_blocks[shard_idx], shard_idx); |
| 499 | |
| 500 | ++inserted_blocks; |
| 501 | } |
| 502 | |
| 503 | |
| 504 | void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id) |
| 505 | { |
| 506 | const auto & shard_info = cluster->getShardsInfo()[shard_id]; |
| 507 | |
| 508 | if (shard_info.hasInternalReplication()) |
| 509 | { |
| 510 | if (shard_info.getLocalNodeCount() > 0) |
| 511 | { |
| 512 | /// Prefer insert into current instance directly |
| 513 | writeToLocal(block, shard_info.getLocalNodeCount()); |
| 514 | } |
| 515 | else |
| 516 | { |
| 517 | if (shard_info.dir_name_for_internal_replication.empty()) |
| 518 | throw Exception("Directory name for async inserts is empty, table " + storage.getTableName(), ErrorCodes::LOGICAL_ERROR); |
| 519 | |
| 520 | writeToShard(block, {shard_info.dir_name_for_internal_replication}); |
| 521 | } |
| 522 | } |
| 523 | else |
| 524 | { |
| 525 | if (shard_info.getLocalNodeCount() > 0) |
| 526 | writeToLocal(block, shard_info.getLocalNodeCount()); |
| 527 | |
| 528 | std::vector<std::string> dir_names; |
| 529 | for (const auto & address : cluster->getShardsAddresses()[shard_id]) |
| 530 | if (!address.is_local) |
| 531 | dir_names.push_back(address.toFullString()); |
| 532 | |
| 533 | if (!dir_names.empty()) |
| 534 | writeToShard(block, dir_names); |
| 535 | } |
| 536 | } |
| 537 | |
| 538 | |
| 539 | void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats) |
| 540 | { |
| 541 | /// Async insert does not support settings forwarding yet whereas sync one supports |
| 542 | InterpreterInsertQuery interp(query_ast, context); |
| 543 | |
| 544 | auto block_io = interp.execute(); |
| 545 | block_io.out->writePrefix(); |
| 546 | |
| 547 | for (size_t i = 0; i < repeats; ++i) |
| 548 | block_io.out->write(block); |
| 549 | |
| 550 | block_io.out->writeSuffix(); |
| 551 | } |
| 552 | |
| 553 | |
| 554 | void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names) |
| 555 | { |
| 556 | /** tmp directory is used to ensure atomicity of transactions |
| 557 | * and keep monitor thread out from reading incomplete data |
| 558 | */ |
| 559 | std::string first_file_tmp_path{}; |
| 560 | |
| 561 | auto first = true; |
| 562 | |
| 563 | /// write first file, hardlink the others |
| 564 | for (const auto & dir_name : dir_names) |
| 565 | { |
| 566 | const auto & path = storage.getPath() + dir_name + '/'; |
| 567 | |
| 568 | /// ensure shard subdirectory creation and notify storage |
| 569 | if (Poco::File(path).createDirectory()) |
| 570 | storage.requireDirectoryMonitor(dir_name); |
| 571 | |
| 572 | const auto & file_name = toString(storage.file_names_increment.get()) + ".bin" ; |
| 573 | const auto & block_file_path = path + file_name; |
| 574 | |
| 575 | /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure |
| 576 | * the inode is not freed until we're done */ |
| 577 | if (first) |
| 578 | { |
| 579 | first = false; |
| 580 | |
| 581 | const auto & tmp_path = path + "tmp/" ; |
| 582 | Poco::File(tmp_path).createDirectory(); |
| 583 | const auto & block_file_tmp_path = tmp_path + file_name; |
| 584 | |
| 585 | first_file_tmp_path = block_file_tmp_path; |
| 586 | |
| 587 | WriteBufferFromFile out{block_file_tmp_path}; |
| 588 | CompressedWriteBuffer compress{out}; |
| 589 | NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()}; |
| 590 | |
| 591 | /// Prepare the header. |
| 592 | /// We wrap the header into a string for compatibility with older versions: |
| 593 | /// a shard will able to read the header partly and ignore other parts based on its version. |
| 594 | WriteBufferFromOwnString ; |
| 595 | writeVarUInt(ClickHouseRevision::get(), header_buf); |
| 596 | writeStringBinary(query_string, header_buf); |
| 597 | context.getSettingsRef().serialize(header_buf); |
| 598 | |
| 599 | /// Add new fields here, for example: |
| 600 | /// writeVarUInt(my_new_data, header_buf); |
| 601 | |
| 602 | /// Write the header. |
| 603 | const StringRef = header_buf.stringRef(); |
| 604 | writeVarUInt(DBMS_DISTRIBUTED_SIGNATURE_HEADER, out); |
| 605 | writeStringBinary(header, out); |
| 606 | writePODBinary(CityHash_v1_0_2::CityHash128(header.data, header.size), out); |
| 607 | |
| 608 | stream.writePrefix(); |
| 609 | stream.write(block); |
| 610 | stream.writeSuffix(); |
| 611 | } |
| 612 | |
| 613 | if (link(first_file_tmp_path.data(), block_file_path.data())) |
| 614 | throwFromErrnoWithPath("Could not link " + block_file_path + " to " + first_file_tmp_path, block_file_path, |
| 615 | ErrorCodes::CANNOT_LINK); |
| 616 | } |
| 617 | |
| 618 | /** remove the temporary file, enabling the OS to reclaim inode after all threads |
| 619 | * have removed their corresponding files */ |
| 620 | Poco::File(first_file_tmp_path).remove(); |
| 621 | } |
| 622 | |
| 623 | |
| 624 | } |
| 625 | |