1#include <Client/MultiplexedConnections.h>
2#include <IO/ConnectionTimeouts.h>
3
4namespace DB
5{
6
7namespace ErrorCodes
8{
9 extern const int LOGICAL_ERROR;
10 extern const int MISMATCH_REPLICAS_DATA_SOURCES;
11 extern const int NO_AVAILABLE_REPLICA;
12 extern const int TIMEOUT_EXCEEDED;
13}
14
15
16MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
17 : settings(settings_)
18{
19 connection.setThrottler(throttler);
20
21 ReplicaState replica_state;
22 replica_state.connection = &connection;
23 replica_states.push_back(replica_state);
24
25 active_connection_count = 1;
26}
27
28MultiplexedConnections::MultiplexedConnections(
29 std::vector<IConnectionPool::Entry> && connections,
30 const Settings & settings_, const ThrottlerPtr & throttler)
31 : settings(settings_)
32{
33 /// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
34 /// `skip_unavailable_shards` was set. Then just return.
35 if (connections.empty())
36 return;
37
38 replica_states.reserve(connections.size());
39 for (size_t i = 0; i < connections.size(); ++i)
40 {
41 Connection * connection = &(*connections[i]);
42 connection->setThrottler(throttler);
43
44 ReplicaState replica_state;
45 replica_state.pool_entry = std::move(connections[i]);
46 replica_state.connection = connection;
47
48 replica_states.push_back(std::move(replica_state));
49 }
50
51 active_connection_count = connections.size();
52}
53
54void MultiplexedConnections::sendScalarsData(Scalars & data)
55{
56 std::lock_guard lock(cancel_mutex);
57
58 if (!sent_query)
59 throw Exception("Cannot send scalars data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
60
61 for (ReplicaState & state : replica_states)
62 {
63 Connection * connection = state.connection;
64 if (connection != nullptr)
65 connection->sendScalarsData(data);
66 }
67}
68
69void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
70{
71 std::lock_guard lock(cancel_mutex);
72
73 if (!sent_query)
74 throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
75
76 if (data.size() != active_connection_count)
77 throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
78
79 auto it = data.begin();
80 for (ReplicaState & state : replica_states)
81 {
82 Connection * connection = state.connection;
83 if (connection != nullptr)
84 {
85 connection->sendExternalTablesData(*it);
86 ++it;
87 }
88 }
89}
90
91void MultiplexedConnections::sendQuery(
92 const ConnectionTimeouts & timeouts,
93 const String & query,
94 const String & query_id,
95 UInt64 stage,
96 const ClientInfo * client_info,
97 bool with_pending_data)
98{
99 std::lock_guard lock(cancel_mutex);
100
101 if (sent_query)
102 throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
103
104 Settings modified_settings = settings;
105
106 for (auto & replica : replica_states)
107 {
108 if (!replica.connection)
109 throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
110
111 if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
112 {
113 /// Disable two-level aggregation due to version incompatibility.
114 modified_settings.group_by_two_level_threshold = 0;
115 modified_settings.group_by_two_level_threshold_bytes = 0;
116 }
117 }
118
119 size_t num_replicas = replica_states.size();
120 if (num_replicas > 1)
121 {
122 /// Use multiple replicas for parallel query processing.
123 modified_settings.parallel_replicas_count = num_replicas;
124 for (size_t i = 0; i < num_replicas; ++i)
125 {
126 modified_settings.parallel_replica_offset = i;
127 replica_states[i].connection->sendQuery(timeouts, query, query_id,
128 stage, &modified_settings, client_info, with_pending_data);
129 }
130 }
131 else
132 {
133 /// Use single replica.
134 replica_states[0].connection->sendQuery(timeouts, query, query_id, stage,
135 &modified_settings, client_info, with_pending_data);
136 }
137
138 sent_query = true;
139}
140
141Packet MultiplexedConnections::receivePacket()
142{
143 std::lock_guard lock(cancel_mutex);
144 Packet packet = receivePacketUnlocked();
145 return packet;
146}
147
148void MultiplexedConnections::disconnect()
149{
150 std::lock_guard lock(cancel_mutex);
151
152 for (ReplicaState & state : replica_states)
153 {
154 Connection * connection = state.connection;
155 if (connection != nullptr)
156 {
157 connection->disconnect();
158 invalidateReplica(state);
159 }
160 }
161}
162
163void MultiplexedConnections::sendCancel()
164{
165 std::lock_guard lock(cancel_mutex);
166
167 if (!sent_query || cancelled)
168 throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
169
170 for (ReplicaState & state : replica_states)
171 {
172 Connection * connection = state.connection;
173 if (connection != nullptr)
174 connection->sendCancel();
175 }
176
177 cancelled = true;
178}
179
180Packet MultiplexedConnections::drain()
181{
182 std::lock_guard lock(cancel_mutex);
183
184 if (!cancelled)
185 throw Exception("Cannot drain connections: cancel first.", ErrorCodes::LOGICAL_ERROR);
186
187 Packet res;
188 res.type = Protocol::Server::EndOfStream;
189
190 while (hasActiveConnections())
191 {
192 Packet packet = receivePacketUnlocked();
193
194 switch (packet.type)
195 {
196 case Protocol::Server::Data:
197 case Protocol::Server::Progress:
198 case Protocol::Server::ProfileInfo:
199 case Protocol::Server::Totals:
200 case Protocol::Server::Extremes:
201 case Protocol::Server::EndOfStream:
202 break;
203
204 case Protocol::Server::Exception:
205 default:
206 /// If we receive an exception or an unknown packet, we save it.
207 res = std::move(packet);
208 break;
209 }
210 }
211
212 return res;
213}
214
215std::string MultiplexedConnections::dumpAddresses() const
216{
217 std::lock_guard lock(cancel_mutex);
218 return dumpAddressesUnlocked();
219}
220
221std::string MultiplexedConnections::dumpAddressesUnlocked() const
222{
223 bool is_first = true;
224 std::ostringstream os;
225 for (const ReplicaState & state : replica_states)
226 {
227 const Connection * connection = state.connection;
228 if (connection != nullptr)
229 {
230 os << (is_first ? "" : "; ") << connection->getDescription();
231 is_first = false;
232 }
233 }
234
235 return os.str();
236}
237
238Packet MultiplexedConnections::receivePacketUnlocked()
239{
240 if (!sent_query)
241 throw Exception("Cannot receive packets: no query sent.", ErrorCodes::LOGICAL_ERROR);
242 if (!hasActiveConnections())
243 throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
244
245 ReplicaState & state = getReplicaForReading();
246 current_connection = state.connection;
247 if (current_connection == nullptr)
248 throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
249
250 Packet packet = current_connection->receivePacket();
251
252 switch (packet.type)
253 {
254 case Protocol::Server::Data:
255 case Protocol::Server::Progress:
256 case Protocol::Server::ProfileInfo:
257 case Protocol::Server::Totals:
258 case Protocol::Server::Extremes:
259 case Protocol::Server::Log:
260 break;
261
262 case Protocol::Server::EndOfStream:
263 invalidateReplica(state);
264 break;
265
266 case Protocol::Server::Exception:
267 default:
268 current_connection->disconnect();
269 invalidateReplica(state);
270 break;
271 }
272
273 return packet;
274}
275
276MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading()
277{
278 if (replica_states.size() == 1)
279 return replica_states[0];
280
281 Poco::Net::Socket::SocketList read_list;
282 read_list.reserve(active_connection_count);
283
284 /// First, we check if there are data already in the buffer
285 /// of at least one connection.
286 for (const ReplicaState & state : replica_states)
287 {
288 Connection * connection = state.connection;
289 if ((connection != nullptr) && connection->hasReadPendingData())
290 read_list.push_back(*connection->socket);
291 }
292
293 /// If no data was found, then we check if there are any connections ready for reading.
294 if (read_list.empty())
295 {
296 Poco::Net::Socket::SocketList write_list;
297 Poco::Net::Socket::SocketList except_list;
298
299 for (const ReplicaState & state : replica_states)
300 {
301 Connection * connection = state.connection;
302 if (connection != nullptr)
303 read_list.push_back(*connection->socket);
304 }
305
306 int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.receive_timeout);
307
308 if (n == 0)
309 throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED);
310 }
311
312 /// TODO Absolutely wrong code: read_list could be empty; rand() is not thread safe and has low quality; motivation of rand is unclear.
313 /// This code path is disabled by default.
314
315 auto & socket = read_list[rand() % read_list.size()];
316 if (fd_to_replica_state_idx.empty())
317 {
318 fd_to_replica_state_idx.reserve(replica_states.size());
319 size_t replica_state_number = 0;
320 for (const auto & replica_state : replica_states)
321 {
322 fd_to_replica_state_idx.emplace(replica_state.connection->socket->impl()->sockfd(), replica_state_number);
323 ++replica_state_number;
324 }
325 }
326 return replica_states[fd_to_replica_state_idx.at(socket.impl()->sockfd())];
327}
328
329void MultiplexedConnections::invalidateReplica(ReplicaState & state)
330{
331 state.connection = nullptr;
332 state.pool_entry = IConnectionPool::Entry();
333 --active_connection_count;
334}
335
336}
337