1#pragma once
2
3#include <Parsers/formatAST.h>
4#include <DataStreams/IBlockOutputStream.h>
5#include <Core/Block.h>
6#include <Common/PODArray.h>
7#include <Common/Throttler.h>
8#include <Common/ThreadPool.h>
9#include <atomic>
10#include <memory>
11#include <chrono>
12#include <optional>
13#include <Interpreters/Cluster.h>
14#include <Interpreters/Context.h>
15
16
17namespace Poco
18{
19 class Logger;
20}
21
22namespace DB
23{
24
25class StorageDistributed;
26
27/** If insert_sync_ is true, the write is synchronous. Uses insert_timeout_ if it is not zero.
28 * Otherwise, the write is asynchronous - the data is first written to the local filesystem, and then sent to the remote servers.
29 * If the Distributed table uses more than one shard, then in order to support the write,
30 * when creating the table, an additional parameter must be specified for ENGINE - the sharding key.
31 * Sharding key is an arbitrary expression from the columns. For example, rand() or UserID.
32 * When writing, the data block is splitted by the remainder of the division of the sharding key by the total weight of the shards,
33 * and the resulting blocks are written in a compressed Native format in separate directories for sending.
34 * For each destination address (each directory with data to send), a separate thread is created in StorageDistributed,
35 * which monitors the directory and sends data. */
36class DistributedBlockOutputStream : public IBlockOutputStream
37{
38public:
39 DistributedBlockOutputStream(const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_,
40 const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_);
41
42 Block getHeader() const override;
43 void write(const Block & block) override;
44 void writePrefix() override;
45
46 void writeSuffix() override;
47
48private:
49
50 IColumn::Selector createSelector(const Block & source_block);
51
52
53 void writeAsync(const Block & block);
54
55 /// Split block between shards.
56 Blocks splitBlock(const Block & block);
57
58 void writeSplitAsync(const Block & block);
59
60 void writeAsyncImpl(const Block & block, const size_t shard_id = 0);
61
62 /// Increments finished_writings_count after each repeat.
63 void writeToLocal(const Block & block, const size_t repeats);
64
65 void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
66
67
68 /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
69 void writeSync(const Block & block);
70
71 void initWritingJobs(const Block & first_block);
72
73 struct JobReplica;
74 ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block);
75
76 void waitForJobs();
77
78 /// Returns the number of blocks was written for each cluster node. Uses during exception handling.
79 std::string getCurrentStateDescription();
80
81private:
82 const Context & context;
83 StorageDistributed & storage;
84 ASTPtr query_ast;
85 String query_string;
86 ClusterPtr cluster;
87 size_t inserted_blocks = 0;
88 size_t inserted_rows = 0;
89
90 bool insert_sync;
91
92 /// Sync-related stuff
93 UInt64 insert_timeout; // in seconds
94 Stopwatch watch;
95 Stopwatch watch_current_block;
96 std::optional<ThreadPool> pool;
97 ThrottlerPtr throttler;
98
99 struct JobReplica
100 {
101 JobReplica() = default;
102 JobReplica(size_t shard_index_, size_t replica_index_, bool is_local_job_, const Block & sample_block)
103 : shard_index(shard_index_), replica_index(replica_index_), is_local_job(is_local_job_), current_shard_block(sample_block.cloneEmpty()) {}
104
105 size_t shard_index = 0;
106 size_t replica_index = 0;
107 bool is_local_job = false;
108
109 Block current_shard_block;
110
111 ConnectionPool::Entry connection_entry;
112 std::unique_ptr<Context> local_context;
113 BlockOutputStreamPtr stream;
114
115 UInt64 blocks_written = 0;
116 UInt64 rows_written = 0;
117
118 UInt64 blocks_started = 0;
119 UInt64 elapsed_time_ms = 0;
120 UInt64 max_elapsed_time_for_block_ms = 0;
121 };
122
123 struct JobShard
124 {
125 std::list<JobReplica> replicas_jobs;
126 IColumn::Permutation shard_current_block_permuation;
127 };
128
129 std::vector<JobShard> per_shard_jobs;
130
131 size_t remote_jobs_count = 0;
132 size_t local_jobs_count = 0;
133
134 std::atomic<unsigned> finished_jobs_count{0};
135
136 Poco::Logger * log;
137};
138
139}
140