1 | #include "ClickHouseDictionarySource.h" |
2 | #include <memory> |
3 | #include <Client/ConnectionPool.h> |
4 | #include <DataStreams/RemoteBlockInputStream.h> |
5 | #include <IO/ConnectionTimeouts.h> |
6 | #include <Interpreters/executeQuery.h> |
7 | #include <Common/isLocalAddress.h> |
8 | #include <common/logger_useful.h> |
9 | #include "DictionarySourceFactory.h" |
10 | #include "DictionaryStructure.h" |
11 | #include "ExternalQueryBuilder.h" |
12 | #include "readInvalidateQuery.h" |
13 | #include "writeParenthesisedString.h" |
14 | #include "DictionaryFactory.h" |
15 | |
16 | |
17 | namespace DB |
18 | { |
19 | namespace ErrorCodes |
20 | { |
21 | extern const int UNSUPPORTED_METHOD; |
22 | } |
23 | |
24 | |
25 | static const size_t MAX_CONNECTIONS = 16; |
26 | |
27 | static ConnectionPoolWithFailoverPtr createPool( |
28 | const std::string & host, |
29 | UInt16 port, |
30 | bool secure, |
31 | const std::string & db, |
32 | const std::string & user, |
33 | const std::string & password) |
34 | { |
35 | ConnectionPoolPtrs pools; |
36 | pools.emplace_back(std::make_shared<ConnectionPool>( |
37 | MAX_CONNECTIONS, |
38 | host, |
39 | port, |
40 | db, |
41 | user, |
42 | password, |
43 | "ClickHouseDictionarySource" , |
44 | Protocol::Compression::Enable, |
45 | secure ? Protocol::Secure::Enable : Protocol::Secure::Disable)); |
46 | return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM); |
47 | } |
48 | |
49 | |
50 | ClickHouseDictionarySource::ClickHouseDictionarySource( |
51 | const DictionaryStructure & dict_struct_, |
52 | const Poco::Util::AbstractConfiguration & config, |
53 | const std::string & config_prefix, |
54 | const Block & sample_block_, |
55 | const Context & context_) |
56 | : update_time{std::chrono::system_clock::from_time_t(0)} |
57 | , dict_struct{dict_struct_} |
58 | , host{config.getString(config_prefix + ".host" )} |
59 | , port(config.getInt(config_prefix + ".port" )) |
60 | , secure(config.getBool(config_prefix + ".secure" , false)) |
61 | , user{config.getString(config_prefix + ".user" , "" )} |
62 | , password{config.getString(config_prefix + ".password" , "" )} |
63 | , db{config.getString(config_prefix + ".db" , "" )} |
64 | , table{config.getString(config_prefix + ".table" )} |
65 | , where{config.getString(config_prefix + ".where" , "" )} |
66 | , update_field{config.getString(config_prefix + ".update_field" , "" )} |
67 | , invalidate_query{config.getString(config_prefix + ".invalidate_query" , "" )} |
68 | , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks} |
69 | , sample_block{sample_block_} |
70 | , context(context_) |
71 | , is_local{isLocalAddress({host, port}, context.getTCPPort())} |
72 | , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)} |
73 | , load_all_query{query_builder.composeLoadAllQuery()} |
74 | { |
75 | /// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication). |
76 | context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1" , 0), {}); |
77 | /// Processors are not supported here yet. |
78 | context.getSettingsRef().experimental_use_processors = false; |
79 | /// Query context is needed because some code in executeQuery function may assume it exists. |
80 | /// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock. |
81 | context.makeQueryContext(); |
82 | } |
83 | |
84 | |
85 | ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other) |
86 | : update_time{other.update_time} |
87 | , dict_struct{other.dict_struct} |
88 | , host{other.host} |
89 | , port{other.port} |
90 | , secure{other.secure} |
91 | , user{other.user} |
92 | , password{other.password} |
93 | , db{other.db} |
94 | , table{other.table} |
95 | , where{other.where} |
96 | , update_field{other.update_field} |
97 | , invalidate_query{other.invalidate_query} |
98 | , invalidate_query_response{other.invalidate_query_response} |
99 | , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks} |
100 | , sample_block{other.sample_block} |
101 | , context(other.context) |
102 | , is_local{other.is_local} |
103 | , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)} |
104 | , load_all_query{other.load_all_query} |
105 | { |
106 | context.makeQueryContext(); |
107 | } |
108 | |
109 | std::string ClickHouseDictionarySource::getUpdateFieldAndDate() |
110 | { |
111 | if (update_time != std::chrono::system_clock::from_time_t(0)) |
112 | { |
113 | auto tmp_time = update_time; |
114 | update_time = std::chrono::system_clock::now(); |
115 | time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1; |
116 | std::string str_time = std::to_string(LocalDateTime(hr_time)); |
117 | return query_builder.composeUpdateQuery(update_field, str_time); |
118 | } |
119 | else |
120 | { |
121 | update_time = std::chrono::system_clock::now(); |
122 | return query_builder.composeLoadAllQuery(); |
123 | } |
124 | } |
125 | |
126 | BlockInputStreamPtr ClickHouseDictionarySource::loadAll() |
127 | { |
128 | /** Query to local ClickHouse is marked internal in order to avoid |
129 | * the necessity of holding process_list_element shared pointer. |
130 | */ |
131 | if (is_local) |
132 | { |
133 | BlockIO res = executeQuery(load_all_query, context, true); |
134 | /// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return |
135 | return res.in; |
136 | } |
137 | return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context); |
138 | } |
139 | |
140 | BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll() |
141 | { |
142 | std::string load_update_query = getUpdateFieldAndDate(); |
143 | if (is_local) |
144 | return executeQuery(load_update_query, context, true).in; |
145 | return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, sample_block, context); |
146 | } |
147 | |
148 | BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids) |
149 | { |
150 | return createStreamForSelectiveLoad(query_builder.composeLoadIdsQuery(ids)); |
151 | } |
152 | |
153 | |
154 | BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) |
155 | { |
156 | return createStreamForSelectiveLoad( |
157 | query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES)); |
158 | } |
159 | |
160 | bool ClickHouseDictionarySource::isModified() const |
161 | { |
162 | if (!invalidate_query.empty()) |
163 | { |
164 | auto response = doInvalidateQuery(invalidate_query); |
165 | LOG_TRACE(log, "Invalidate query has returned: " << response << ", previous value: " << invalidate_query_response); |
166 | if (invalidate_query_response == response) |
167 | return false; |
168 | invalidate_query_response = response; |
169 | } |
170 | return true; |
171 | } |
172 | |
173 | bool ClickHouseDictionarySource::hasUpdateField() const |
174 | { |
175 | return !update_field.empty(); |
176 | } |
177 | |
178 | std::string ClickHouseDictionarySource::toString() const |
179 | { |
180 | return "ClickHouse: " + db + '.' + table + (where.empty() ? "" : ", where: " + where); |
181 | } |
182 | |
183 | |
184 | BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(const std::string & query) |
185 | { |
186 | if (is_local) |
187 | return executeQuery(query, context, true).in; |
188 | |
189 | return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context); |
190 | } |
191 | |
192 | std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const |
193 | { |
194 | LOG_TRACE(log, "Performing invalidate query" ); |
195 | if (is_local) |
196 | { |
197 | Context query_context = context; |
198 | auto input_block = executeQuery(request, query_context, true).in; |
199 | return readInvalidateQuery(*input_block); |
200 | } |
201 | else |
202 | { |
203 | /// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result. |
204 | Block invalidate_sample_block; |
205 | RemoteBlockInputStream invalidate_stream(pool, request, invalidate_sample_block, context); |
206 | return readInvalidateQuery(invalidate_stream); |
207 | } |
208 | } |
209 | |
210 | |
211 | void registerDictionarySourceClickHouse(DictionarySourceFactory & factory) |
212 | { |
213 | auto createTableSource = [=](const DictionaryStructure & dict_struct, |
214 | const Poco::Util::AbstractConfiguration & config, |
215 | const std::string & config_prefix, |
216 | Block & sample_block, |
217 | const Context & context, |
218 | bool /* check_config */) -> DictionarySourcePtr |
219 | { |
220 | return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse" , sample_block, context); |
221 | }; |
222 | factory.registerSource("clickhouse" , createTableSource); |
223 | } |
224 | |
225 | } |
226 | |