1#pragma once
2
3#include <mutex>
4#include <Common/Throttler.h>
5#include <Client/Connection.h>
6#include <Client/ConnectionPoolWithFailover.h>
7#include <IO/ConnectionTimeouts.h>
8
9namespace DB
10{
11
12
13/** To retrieve data directly from multiple replicas (connections) from one shard
14 * within a single thread. As a degenerate case, it can also work with one connection.
15 * It is assumed that all functions except sendCancel are always executed in one thread.
16 *
17 * The interface is almost the same as Connection.
18 */
19class MultiplexedConnections final : private boost::noncopyable
20{
21public:
22 /// Accepts ready connection.
23 MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_);
24
25 /// Accepts a vector of connections to replicas of one shard already taken from pool.
26 MultiplexedConnections(
27 std::vector<IConnectionPool::Entry> && connections,
28 const Settings & settings_, const ThrottlerPtr & throttler_);
29
30 /// Send all scalars to replicas.
31 void sendScalarsData(Scalars & data);
32 /// Send all content of external tables to replicas.
33 void sendExternalTablesData(std::vector<ExternalTablesData> & data);
34
35 /// Send request to replicas.
36 void sendQuery(
37 const ConnectionTimeouts & timeouts,
38 const String & query,
39 const String & query_id = "",
40 UInt64 stage = QueryProcessingStage::Complete,
41 const ClientInfo * client_info = nullptr,
42 bool with_pending_data = false);
43
44 /// Get packet from any replica.
45 Packet receivePacket();
46
47 /// Break all active connections.
48 void disconnect();
49
50 /// Send a request to the replica to cancel the request
51 void sendCancel();
52
53 /** On each replica, read and skip all packets to EndOfStream or Exception.
54 * Returns EndOfStream if no exception has been received. Otherwise
55 * returns the last received packet of type Exception.
56 */
57 Packet drain();
58
59 /// Get the replica addresses as a string.
60 std::string dumpAddresses() const;
61
62 /// Returns the number of replicas.
63 /// Without locking, because sendCancel() does not change this number.
64 size_t size() const { return replica_states.size(); }
65
66 /// Check if there are any valid replicas.
67 /// Without locking, because sendCancel() does not change the state of the replicas.
68 bool hasActiveConnections() const { return active_connection_count > 0; }
69
70private:
71 /// Internal version of `receivePacket` function without locking.
72 Packet receivePacketUnlocked();
73
74 /// Internal version of `dumpAddresses` function without locking.
75 std::string dumpAddressesUnlocked() const;
76
77 /// Description of a single replica.
78 struct ReplicaState
79 {
80 Connection * connection = nullptr;
81 ConnectionPool::Entry pool_entry;
82 };
83
84 /// Get a replica where you can read the data.
85 ReplicaState & getReplicaForReading();
86
87 /// Mark the replica as invalid.
88 void invalidateReplica(ReplicaState & replica_state);
89
90private:
91 const Settings & settings;
92
93 /// The current number of valid connections to the replicas of this shard.
94 size_t active_connection_count = 0;
95
96 std::vector<ReplicaState> replica_states;
97 std::unordered_map<int, size_t> fd_to_replica_state_idx;
98
99 /// Connection that received last block.
100 Connection * current_connection = nullptr;
101
102 bool sent_query = false;
103 bool cancelled = false;
104
105 /// A mutex for the sendCancel function to execute safely
106 /// in separate thread.
107 mutable std::mutex cancel_mutex;
108};
109
110}
111