1 | #pragma once |
2 | |
3 | #include <mutex> |
4 | #include <Common/Throttler.h> |
5 | #include <Client/Connection.h> |
6 | #include <Client/ConnectionPoolWithFailover.h> |
7 | #include <IO/ConnectionTimeouts.h> |
8 | |
9 | namespace DB |
10 | { |
11 | |
12 | |
13 | /** To retrieve data directly from multiple replicas (connections) from one shard |
14 | * within a single thread. As a degenerate case, it can also work with one connection. |
15 | * It is assumed that all functions except sendCancel are always executed in one thread. |
16 | * |
17 | * The interface is almost the same as Connection. |
18 | */ |
19 | class MultiplexedConnections final : private boost::noncopyable |
20 | { |
21 | public: |
22 | /// Accepts ready connection. |
23 | MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_); |
24 | |
25 | /// Accepts a vector of connections to replicas of one shard already taken from pool. |
26 | MultiplexedConnections( |
27 | std::vector<IConnectionPool::Entry> && connections, |
28 | const Settings & settings_, const ThrottlerPtr & throttler_); |
29 | |
30 | /// Send all scalars to replicas. |
31 | void sendScalarsData(Scalars & data); |
32 | /// Send all content of external tables to replicas. |
33 | void sendExternalTablesData(std::vector<ExternalTablesData> & data); |
34 | |
35 | /// Send request to replicas. |
36 | void sendQuery( |
37 | const ConnectionTimeouts & timeouts, |
38 | const String & query, |
39 | const String & query_id = "" , |
40 | UInt64 stage = QueryProcessingStage::Complete, |
41 | const ClientInfo * client_info = nullptr, |
42 | bool with_pending_data = false); |
43 | |
44 | /// Get packet from any replica. |
45 | Packet receivePacket(); |
46 | |
47 | /// Break all active connections. |
48 | void disconnect(); |
49 | |
50 | /// Send a request to the replica to cancel the request |
51 | void sendCancel(); |
52 | |
53 | /** On each replica, read and skip all packets to EndOfStream or Exception. |
54 | * Returns EndOfStream if no exception has been received. Otherwise |
55 | * returns the last received packet of type Exception. |
56 | */ |
57 | Packet drain(); |
58 | |
59 | /// Get the replica addresses as a string. |
60 | std::string dumpAddresses() const; |
61 | |
62 | /// Returns the number of replicas. |
63 | /// Without locking, because sendCancel() does not change this number. |
64 | size_t size() const { return replica_states.size(); } |
65 | |
66 | /// Check if there are any valid replicas. |
67 | /// Without locking, because sendCancel() does not change the state of the replicas. |
68 | bool hasActiveConnections() const { return active_connection_count > 0; } |
69 | |
70 | private: |
71 | /// Internal version of `receivePacket` function without locking. |
72 | Packet receivePacketUnlocked(); |
73 | |
74 | /// Internal version of `dumpAddresses` function without locking. |
75 | std::string dumpAddressesUnlocked() const; |
76 | |
77 | /// Description of a single replica. |
78 | struct ReplicaState |
79 | { |
80 | Connection * connection = nullptr; |
81 | ConnectionPool::Entry pool_entry; |
82 | }; |
83 | |
84 | /// Get a replica where you can read the data. |
85 | ReplicaState & getReplicaForReading(); |
86 | |
87 | /// Mark the replica as invalid. |
88 | void invalidateReplica(ReplicaState & replica_state); |
89 | |
90 | private: |
91 | const Settings & settings; |
92 | |
93 | /// The current number of valid connections to the replicas of this shard. |
94 | size_t active_connection_count = 0; |
95 | |
96 | std::vector<ReplicaState> replica_states; |
97 | std::unordered_map<int, size_t> fd_to_replica_state_idx; |
98 | |
99 | /// Connection that received last block. |
100 | Connection * current_connection = nullptr; |
101 | |
102 | bool sent_query = false; |
103 | bool cancelled = false; |
104 | |
105 | /// A mutex for the sendCancel function to execute safely |
106 | /// in separate thread. |
107 | mutable std::mutex cancel_mutex; |
108 | }; |
109 | |
110 | } |
111 | |