1 | #pragma once |
2 | |
3 | #include <IO/ReadBuffer.h> |
4 | #include <IO/WriteBuffer.h> |
5 | #include <IO/ReadBufferFromString.h> |
6 | #include <IO/ReadHelpers.h> |
7 | #include <IO/WriteBufferFromString.h> |
8 | #include <IO/WriteHelpers.h> |
9 | #include <Common/ActionBlocker.h> |
10 | #include <Core/Types.h> |
11 | #include <map> |
12 | #include <atomic> |
13 | #include <utility> |
14 | #include <Poco/Net/HTMLForm.h> |
15 | |
16 | namespace Poco { namespace Net { class HTTPServerResponse; } } |
17 | |
18 | namespace DB |
19 | { |
20 | |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int DUPLICATE_INTERSERVER_IO_ENDPOINT; |
24 | extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT; |
25 | } |
26 | |
27 | /** Location of the service. |
28 | */ |
29 | struct InterserverIOEndpointLocation |
30 | { |
31 | public: |
32 | InterserverIOEndpointLocation(const std::string & name_, const std::string & host_, UInt16 port_) |
33 | : name(name_), host(host_), port(port_) |
34 | { |
35 | } |
36 | |
37 | /// Creates a location based on its serialized representation. |
38 | InterserverIOEndpointLocation(const std::string & serialized_location) |
39 | { |
40 | ReadBufferFromString buf(serialized_location); |
41 | readBinary(name, buf); |
42 | readBinary(host, buf); |
43 | readBinary(port, buf); |
44 | assertEOF(buf); |
45 | } |
46 | |
47 | /// Serializes the location. |
48 | std::string toString() const |
49 | { |
50 | WriteBufferFromOwnString buf; |
51 | writeBinary(name, buf); |
52 | writeBinary(host, buf); |
53 | writeBinary(port, buf); |
54 | return buf.str(); |
55 | } |
56 | |
57 | public: |
58 | std::string name; |
59 | std::string host; |
60 | UInt16 port; |
61 | }; |
62 | |
63 | /** Query processor from other servers. |
64 | */ |
65 | class InterserverIOEndpoint |
66 | { |
67 | public: |
68 | virtual std::string getId(const std::string & path) const = 0; |
69 | virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0; |
70 | virtual ~InterserverIOEndpoint() {} |
71 | |
72 | /// You need to stop the data transfer if blocker is activated. |
73 | ActionBlocker blocker; |
74 | }; |
75 | |
76 | using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>; |
77 | |
78 | |
79 | /** Here you can register a service that processes requests from other servers. |
80 | * Used to transfer chunks in ReplicatedMergeTree. |
81 | */ |
82 | class InterserverIOHandler |
83 | { |
84 | public: |
85 | void addEndpoint(const String & name, InterserverIOEndpointPtr endpoint) |
86 | { |
87 | std::lock_guard lock(mutex); |
88 | bool inserted = endpoint_map.try_emplace(name, std::move(endpoint)).second; |
89 | if (!inserted) |
90 | throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT); |
91 | } |
92 | |
93 | void removeEndpoint(const String & name) |
94 | { |
95 | std::lock_guard lock(mutex); |
96 | if (!endpoint_map.erase(name)) |
97 | throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT); |
98 | } |
99 | |
100 | InterserverIOEndpointPtr getEndpoint(const String & name) |
101 | try |
102 | { |
103 | std::lock_guard lock(mutex); |
104 | return endpoint_map.at(name); |
105 | } |
106 | catch (...) |
107 | { |
108 | throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT); |
109 | } |
110 | |
111 | private: |
112 | using EndpointMap = std::map<String, InterserverIOEndpointPtr>; |
113 | |
114 | EndpointMap endpoint_map; |
115 | std::mutex mutex; |
116 | }; |
117 | |
118 | /// In the constructor calls `addEndpoint`, in the destructor - `removeEndpoint`. |
119 | class InterserverIOEndpointHolder |
120 | { |
121 | public: |
122 | InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_) |
123 | : name(name_), endpoint(std::move(endpoint_)), handler(handler_) |
124 | { |
125 | handler.addEndpoint(name, endpoint); |
126 | } |
127 | |
128 | InterserverIOEndpointPtr getEndpoint() |
129 | { |
130 | return endpoint; |
131 | } |
132 | |
133 | ~InterserverIOEndpointHolder() |
134 | try |
135 | { |
136 | handler.removeEndpoint(name); |
137 | /// After destroying the object, `endpoint` can still live, since its ownership is acquired during the processing of the request, |
138 | /// see InterserverIOHTTPHandler.cpp |
139 | } |
140 | catch (...) |
141 | { |
142 | tryLogCurrentException("~InterserverIOEndpointHolder" ); |
143 | } |
144 | |
145 | ActionBlocker & getBlocker() { return endpoint->blocker; } |
146 | |
147 | private: |
148 | String name; |
149 | InterserverIOEndpointPtr endpoint; |
150 | InterserverIOHandler & handler; |
151 | }; |
152 | |
153 | using InterserverIOEndpointHolderPtr = std::shared_ptr<InterserverIOEndpointHolder>; |
154 | |
155 | } |
156 | |