1 | #pragma once |
2 | |
3 | #include <Parsers/formatAST.h> |
4 | #include <DataStreams/IBlockOutputStream.h> |
5 | #include <Core/Block.h> |
6 | #include <Common/PODArray.h> |
7 | #include <Common/Throttler.h> |
8 | #include <Common/ThreadPool.h> |
9 | #include <atomic> |
10 | #include <memory> |
11 | #include <chrono> |
12 | #include <optional> |
13 | #include <Interpreters/Cluster.h> |
14 | #include <Interpreters/Context.h> |
15 | |
16 | |
17 | namespace Poco |
18 | { |
19 | class Logger; |
20 | } |
21 | |
22 | namespace DB |
23 | { |
24 | |
25 | class StorageDistributed; |
26 | |
27 | /** If insert_sync_ is true, the write is synchronous. Uses insert_timeout_ if it is not zero. |
28 | * Otherwise, the write is asynchronous - the data is first written to the local filesystem, and then sent to the remote servers. |
29 | * If the Distributed table uses more than one shard, then in order to support the write, |
30 | * when creating the table, an additional parameter must be specified for ENGINE - the sharding key. |
31 | * Sharding key is an arbitrary expression from the columns. For example, rand() or UserID. |
32 | * When writing, the data block is splitted by the remainder of the division of the sharding key by the total weight of the shards, |
33 | * and the resulting blocks are written in a compressed Native format in separate directories for sending. |
34 | * For each destination address (each directory with data to send), a separate thread is created in StorageDistributed, |
35 | * which monitors the directory and sends data. */ |
36 | class DistributedBlockOutputStream : public IBlockOutputStream |
37 | { |
38 | public: |
39 | DistributedBlockOutputStream(const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_, |
40 | const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_); |
41 | |
42 | Block () const override; |
43 | void write(const Block & block) override; |
44 | void writePrefix() override; |
45 | |
46 | void writeSuffix() override; |
47 | |
48 | private: |
49 | |
50 | IColumn::Selector createSelector(const Block & source_block); |
51 | |
52 | |
53 | void writeAsync(const Block & block); |
54 | |
55 | /// Split block between shards. |
56 | Blocks splitBlock(const Block & block); |
57 | |
58 | void writeSplitAsync(const Block & block); |
59 | |
60 | void writeAsyncImpl(const Block & block, const size_t shard_id = 0); |
61 | |
62 | /// Increments finished_writings_count after each repeat. |
63 | void writeToLocal(const Block & block, const size_t repeats); |
64 | |
65 | void writeToShard(const Block & block, const std::vector<std::string> & dir_names); |
66 | |
67 | |
68 | /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws. |
69 | void writeSync(const Block & block); |
70 | |
71 | void initWritingJobs(const Block & first_block); |
72 | |
73 | struct JobReplica; |
74 | ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block); |
75 | |
76 | void waitForJobs(); |
77 | |
78 | /// Returns the number of blocks was written for each cluster node. Uses during exception handling. |
79 | std::string getCurrentStateDescription(); |
80 | |
81 | private: |
82 | const Context & context; |
83 | StorageDistributed & storage; |
84 | ASTPtr query_ast; |
85 | String query_string; |
86 | ClusterPtr cluster; |
87 | size_t inserted_blocks = 0; |
88 | size_t inserted_rows = 0; |
89 | |
90 | bool insert_sync; |
91 | |
92 | /// Sync-related stuff |
93 | UInt64 insert_timeout; // in seconds |
94 | Stopwatch watch; |
95 | Stopwatch watch_current_block; |
96 | std::optional<ThreadPool> pool; |
97 | ThrottlerPtr throttler; |
98 | |
99 | struct JobReplica |
100 | { |
101 | JobReplica() = default; |
102 | JobReplica(size_t shard_index_, size_t replica_index_, bool is_local_job_, const Block & sample_block) |
103 | : shard_index(shard_index_), replica_index(replica_index_), is_local_job(is_local_job_), current_shard_block(sample_block.cloneEmpty()) {} |
104 | |
105 | size_t shard_index = 0; |
106 | size_t replica_index = 0; |
107 | bool is_local_job = false; |
108 | |
109 | Block current_shard_block; |
110 | |
111 | ConnectionPool::Entry connection_entry; |
112 | std::unique_ptr<Context> local_context; |
113 | BlockOutputStreamPtr stream; |
114 | |
115 | UInt64 blocks_written = 0; |
116 | UInt64 rows_written = 0; |
117 | |
118 | UInt64 blocks_started = 0; |
119 | UInt64 elapsed_time_ms = 0; |
120 | UInt64 max_elapsed_time_for_block_ms = 0; |
121 | }; |
122 | |
123 | struct JobShard |
124 | { |
125 | std::list<JobReplica> replicas_jobs; |
126 | IColumn::Permutation shard_current_block_permuation; |
127 | }; |
128 | |
129 | std::vector<JobShard> per_shard_jobs; |
130 | |
131 | size_t remote_jobs_count = 0; |
132 | size_t local_jobs_count = 0; |
133 | |
134 | std::atomic<unsigned> finished_jobs_count{0}; |
135 | |
136 | Poco::Logger * log; |
137 | }; |
138 | |
139 | } |
140 | |