1#pragma once
2
3#include <IO/ReadBuffer.h>
4#include <IO/WriteBuffer.h>
5#include <IO/ReadBufferFromString.h>
6#include <IO/ReadHelpers.h>
7#include <IO/WriteBufferFromString.h>
8#include <IO/WriteHelpers.h>
9#include <Common/ActionBlocker.h>
10#include <Core/Types.h>
11#include <map>
12#include <atomic>
13#include <utility>
14#include <Poco/Net/HTMLForm.h>
15
16namespace Poco { namespace Net { class HTTPServerResponse; } }
17
18namespace DB
19{
20
21namespace ErrorCodes
22{
23 extern const int DUPLICATE_INTERSERVER_IO_ENDPOINT;
24 extern const int NO_SUCH_INTERSERVER_IO_ENDPOINT;
25}
26
27/** Location of the service.
28 */
29struct InterserverIOEndpointLocation
30{
31public:
32 InterserverIOEndpointLocation(const std::string & name_, const std::string & host_, UInt16 port_)
33 : name(name_), host(host_), port(port_)
34 {
35 }
36
37 /// Creates a location based on its serialized representation.
38 InterserverIOEndpointLocation(const std::string & serialized_location)
39 {
40 ReadBufferFromString buf(serialized_location);
41 readBinary(name, buf);
42 readBinary(host, buf);
43 readBinary(port, buf);
44 assertEOF(buf);
45 }
46
47 /// Serializes the location.
48 std::string toString() const
49 {
50 WriteBufferFromOwnString buf;
51 writeBinary(name, buf);
52 writeBinary(host, buf);
53 writeBinary(port, buf);
54 return buf.str();
55 }
56
57public:
58 std::string name;
59 std::string host;
60 UInt16 port;
61};
62
63/** Query processor from other servers.
64 */
65class InterserverIOEndpoint
66{
67public:
68 virtual std::string getId(const std::string & path) const = 0;
69 virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0;
70 virtual ~InterserverIOEndpoint() {}
71
72 /// You need to stop the data transfer if blocker is activated.
73 ActionBlocker blocker;
74};
75
76using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>;
77
78
79/** Here you can register a service that processes requests from other servers.
80 * Used to transfer chunks in ReplicatedMergeTree.
81 */
82class InterserverIOHandler
83{
84public:
85 void addEndpoint(const String & name, InterserverIOEndpointPtr endpoint)
86 {
87 std::lock_guard lock(mutex);
88 bool inserted = endpoint_map.try_emplace(name, std::move(endpoint)).second;
89 if (!inserted)
90 throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT);
91 }
92
93 void removeEndpoint(const String & name)
94 {
95 std::lock_guard lock(mutex);
96 if (!endpoint_map.erase(name))
97 throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT);
98 }
99
100 InterserverIOEndpointPtr getEndpoint(const String & name)
101 try
102 {
103 std::lock_guard lock(mutex);
104 return endpoint_map.at(name);
105 }
106 catch (...)
107 {
108 throw Exception("No interserver IO endpoint named " + name, ErrorCodes::NO_SUCH_INTERSERVER_IO_ENDPOINT);
109 }
110
111private:
112 using EndpointMap = std::map<String, InterserverIOEndpointPtr>;
113
114 EndpointMap endpoint_map;
115 std::mutex mutex;
116};
117
118/// In the constructor calls `addEndpoint`, in the destructor - `removeEndpoint`.
119class InterserverIOEndpointHolder
120{
121public:
122 InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_)
123 : name(name_), endpoint(std::move(endpoint_)), handler(handler_)
124 {
125 handler.addEndpoint(name, endpoint);
126 }
127
128 InterserverIOEndpointPtr getEndpoint()
129 {
130 return endpoint;
131 }
132
133 ~InterserverIOEndpointHolder()
134 try
135 {
136 handler.removeEndpoint(name);
137 /// After destroying the object, `endpoint` can still live, since its ownership is acquired during the processing of the request,
138 /// see InterserverIOHTTPHandler.cpp
139 }
140 catch (...)
141 {
142 tryLogCurrentException("~InterserverIOEndpointHolder");
143 }
144
145 ActionBlocker & getBlocker() { return endpoint->blocker; }
146
147private:
148 String name;
149 InterserverIOEndpointPtr endpoint;
150 InterserverIOHandler & handler;
151};
152
153using InterserverIOEndpointHolderPtr = std::shared_ptr<InterserverIOEndpointHolder>;
154
155}
156