1 | #pragma once |
2 | |
3 | #include <optional> |
4 | |
5 | #include <common/logger_useful.h> |
6 | |
7 | #include <DataStreams/IBlockInputStream.h> |
8 | #include <Common/Throttler.h> |
9 | #include <Interpreters/Context.h> |
10 | #include <Client/ConnectionPool.h> |
11 | #include <Client/MultiplexedConnections.h> |
12 | #include <Interpreters/Cluster.h> |
13 | |
14 | |
15 | namespace DB |
16 | { |
17 | |
18 | /** This class allows one to launch queries on remote replicas of one shard and get results |
19 | */ |
20 | class RemoteBlockInputStream : public IBlockInputStream |
21 | { |
22 | public: |
23 | /// Takes already set connection. |
24 | /// If `settings` is nullptr, settings will be taken from context. |
25 | RemoteBlockInputStream( |
26 | Connection & connection, |
27 | const String & query_, const Block & , const Context & context_, const Settings * settings = nullptr, |
28 | const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), |
29 | QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); |
30 | |
31 | /// Accepts several connections already taken from pool. |
32 | /// If `settings` is nullptr, settings will be taken from context. |
33 | RemoteBlockInputStream( |
34 | std::vector<IConnectionPool::Entry> && connections, |
35 | const String & query_, const Block & , const Context & context_, const Settings * settings = nullptr, |
36 | const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), |
37 | QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); |
38 | |
39 | /// Takes a pool and gets one or several connections from it. |
40 | /// If `settings` is nullptr, settings will be taken from context. |
41 | RemoteBlockInputStream( |
42 | const ConnectionPoolWithFailoverPtr & pool, |
43 | const String & query_, const Block & , const Context & context_, const Settings * settings = nullptr, |
44 | const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(), |
45 | QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete); |
46 | |
47 | ~RemoteBlockInputStream() override; |
48 | |
49 | /// Set the query_id. For now, used by performance test to later find the query |
50 | /// in the server query_log. Must be called before sending the query to the |
51 | /// server. |
52 | void setQueryId(const std::string& query_id_) { assert(!sent_query); query_id = query_id_; } |
53 | |
54 | /// Specify how we allocate connections on a shard. |
55 | void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; } |
56 | |
57 | void setMainTable(QualifiedTableName main_table_) { main_table = std::move(main_table_); } |
58 | |
59 | /// Sends query (initiates calculation) before read() |
60 | void readPrefix() override; |
61 | |
62 | /** Prevent default progress notification because progress' callback is |
63 | called by its own |
64 | */ |
65 | void progress(const Progress & /*value*/) override {} |
66 | |
67 | void cancel(bool kill) override; |
68 | |
69 | String getName() const override { return "Remote" ; } |
70 | |
71 | Block () const override { return header; } |
72 | |
73 | protected: |
74 | /// Send all scalars to remote servers |
75 | void sendScalars(); |
76 | |
77 | /// Send all temporary tables to remote servers |
78 | void sendExternalTables(); |
79 | |
80 | Block readImpl() override; |
81 | |
82 | void readSuffixImpl() override; |
83 | |
84 | /// Returns true if query was sent |
85 | bool isQueryPending() const; |
86 | |
87 | /// Returns true if exception was thrown |
88 | bool hasThrownException() const; |
89 | |
90 | private: |
91 | void sendQuery(); |
92 | |
93 | Block receiveBlock(); |
94 | |
95 | /// If wasn't sent yet, send request to cancell all connections to replicas |
96 | void tryCancel(const char * reason); |
97 | |
98 | private: |
99 | Block ; |
100 | |
101 | std::function<std::unique_ptr<MultiplexedConnections>()> create_multiplexed_connections; |
102 | |
103 | std::unique_ptr<MultiplexedConnections> multiplexed_connections; |
104 | |
105 | const String query; |
106 | String query_id = "" ; |
107 | Context context; |
108 | |
109 | /// Scalars needed to be sent to remote servers |
110 | Scalars scalars; |
111 | /// Temporary tables needed to be sent to remote servers |
112 | Tables external_tables; |
113 | QueryProcessingStage::Enum stage; |
114 | |
115 | /// Streams for reading from temporary tables and following sending of data |
116 | /// to remote servers for GLOBAL-subqueries |
117 | std::vector<ExternalTablesData> external_tables_data; |
118 | std::mutex external_tables_mutex; |
119 | |
120 | /// Connections to replicas are established, but no queries are sent yet |
121 | std::atomic<bool> established { false }; |
122 | |
123 | /// Query is sent (used before getting first block) |
124 | std::atomic<bool> sent_query { false }; |
125 | |
126 | /** All data from all replicas are received, before EndOfStream packet. |
127 | * To prevent desynchronization, if not all data is read before object |
128 | * destruction, it's required to send cancel query request to replicas and |
129 | * read all packets before EndOfStream |
130 | */ |
131 | std::atomic<bool> finished { false }; |
132 | |
133 | /** Cancel query request was sent to all replicas because data is not needed anymore |
134 | * This behaviour may occur when: |
135 | * - data size is already satisfactory (when using LIMIT, for example) |
136 | * - an exception was thrown from client side |
137 | */ |
138 | std::atomic<bool> was_cancelled { false }; |
139 | |
140 | /** An exception from replica was received. No need in receiving more packets or |
141 | * requesting to cancel query execution |
142 | */ |
143 | std::atomic<bool> got_exception_from_replica { false }; |
144 | |
145 | /** Unkown packet was received from replica. No need in receiving more packets or |
146 | * requesting to cancel query execution |
147 | */ |
148 | std::atomic<bool> got_unknown_packet_from_replica { false }; |
149 | |
150 | PoolMode pool_mode = PoolMode::GET_MANY; |
151 | std::optional<QualifiedTableName> main_table; |
152 | |
153 | Logger * log = &Logger::get("RemoteBlockInputStream" ); |
154 | }; |
155 | |
156 | } |
157 | |