| 1 | #include "ClickHouseDictionarySource.h" |
| 2 | #include <memory> |
| 3 | #include <Client/ConnectionPool.h> |
| 4 | #include <DataStreams/RemoteBlockInputStream.h> |
| 5 | #include <IO/ConnectionTimeouts.h> |
| 6 | #include <Interpreters/executeQuery.h> |
| 7 | #include <Common/isLocalAddress.h> |
| 8 | #include <common/logger_useful.h> |
| 9 | #include "DictionarySourceFactory.h" |
| 10 | #include "DictionaryStructure.h" |
| 11 | #include "ExternalQueryBuilder.h" |
| 12 | #include "readInvalidateQuery.h" |
| 13 | #include "writeParenthesisedString.h" |
| 14 | #include "DictionaryFactory.h" |
| 15 | |
| 16 | |
| 17 | namespace DB |
| 18 | { |
| 19 | namespace ErrorCodes |
| 20 | { |
| 21 | extern const int UNSUPPORTED_METHOD; |
| 22 | } |
| 23 | |
| 24 | |
| 25 | static const size_t MAX_CONNECTIONS = 16; |
| 26 | |
| 27 | static ConnectionPoolWithFailoverPtr createPool( |
| 28 | const std::string & host, |
| 29 | UInt16 port, |
| 30 | bool secure, |
| 31 | const std::string & db, |
| 32 | const std::string & user, |
| 33 | const std::string & password) |
| 34 | { |
| 35 | ConnectionPoolPtrs pools; |
| 36 | pools.emplace_back(std::make_shared<ConnectionPool>( |
| 37 | MAX_CONNECTIONS, |
| 38 | host, |
| 39 | port, |
| 40 | db, |
| 41 | user, |
| 42 | password, |
| 43 | "ClickHouseDictionarySource" , |
| 44 | Protocol::Compression::Enable, |
| 45 | secure ? Protocol::Secure::Enable : Protocol::Secure::Disable)); |
| 46 | return std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM); |
| 47 | } |
| 48 | |
| 49 | |
| 50 | ClickHouseDictionarySource::ClickHouseDictionarySource( |
| 51 | const DictionaryStructure & dict_struct_, |
| 52 | const Poco::Util::AbstractConfiguration & config, |
| 53 | const std::string & config_prefix, |
| 54 | const Block & sample_block_, |
| 55 | const Context & context_) |
| 56 | : update_time{std::chrono::system_clock::from_time_t(0)} |
| 57 | , dict_struct{dict_struct_} |
| 58 | , host{config.getString(config_prefix + ".host" )} |
| 59 | , port(config.getInt(config_prefix + ".port" )) |
| 60 | , secure(config.getBool(config_prefix + ".secure" , false)) |
| 61 | , user{config.getString(config_prefix + ".user" , "" )} |
| 62 | , password{config.getString(config_prefix + ".password" , "" )} |
| 63 | , db{config.getString(config_prefix + ".db" , "" )} |
| 64 | , table{config.getString(config_prefix + ".table" )} |
| 65 | , where{config.getString(config_prefix + ".where" , "" )} |
| 66 | , update_field{config.getString(config_prefix + ".update_field" , "" )} |
| 67 | , invalidate_query{config.getString(config_prefix + ".invalidate_query" , "" )} |
| 68 | , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks} |
| 69 | , sample_block{sample_block_} |
| 70 | , context(context_) |
| 71 | , is_local{isLocalAddress({host, port}, context.getTCPPort())} |
| 72 | , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)} |
| 73 | , load_all_query{query_builder.composeLoadAllQuery()} |
| 74 | { |
| 75 | /// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication). |
| 76 | context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1" , 0), {}); |
| 77 | /// Processors are not supported here yet. |
| 78 | context.getSettingsRef().experimental_use_processors = false; |
| 79 | /// Query context is needed because some code in executeQuery function may assume it exists. |
| 80 | /// Current example is Context::getSampleBlockCache from InterpreterSelectWithUnionQuery::getSampleBlock. |
| 81 | context.makeQueryContext(); |
| 82 | } |
| 83 | |
| 84 | |
| 85 | ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionarySource & other) |
| 86 | : update_time{other.update_time} |
| 87 | , dict_struct{other.dict_struct} |
| 88 | , host{other.host} |
| 89 | , port{other.port} |
| 90 | , secure{other.secure} |
| 91 | , user{other.user} |
| 92 | , password{other.password} |
| 93 | , db{other.db} |
| 94 | , table{other.table} |
| 95 | , where{other.where} |
| 96 | , update_field{other.update_field} |
| 97 | , invalidate_query{other.invalidate_query} |
| 98 | , invalidate_query_response{other.invalidate_query_response} |
| 99 | , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks} |
| 100 | , sample_block{other.sample_block} |
| 101 | , context(other.context) |
| 102 | , is_local{other.is_local} |
| 103 | , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)} |
| 104 | , load_all_query{other.load_all_query} |
| 105 | { |
| 106 | context.makeQueryContext(); |
| 107 | } |
| 108 | |
| 109 | std::string ClickHouseDictionarySource::getUpdateFieldAndDate() |
| 110 | { |
| 111 | if (update_time != std::chrono::system_clock::from_time_t(0)) |
| 112 | { |
| 113 | auto tmp_time = update_time; |
| 114 | update_time = std::chrono::system_clock::now(); |
| 115 | time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1; |
| 116 | std::string str_time = std::to_string(LocalDateTime(hr_time)); |
| 117 | return query_builder.composeUpdateQuery(update_field, str_time); |
| 118 | } |
| 119 | else |
| 120 | { |
| 121 | update_time = std::chrono::system_clock::now(); |
| 122 | return query_builder.composeLoadAllQuery(); |
| 123 | } |
| 124 | } |
| 125 | |
| 126 | BlockInputStreamPtr ClickHouseDictionarySource::loadAll() |
| 127 | { |
| 128 | /** Query to local ClickHouse is marked internal in order to avoid |
| 129 | * the necessity of holding process_list_element shared pointer. |
| 130 | */ |
| 131 | if (is_local) |
| 132 | { |
| 133 | BlockIO res = executeQuery(load_all_query, context, true); |
| 134 | /// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return |
| 135 | return res.in; |
| 136 | } |
| 137 | return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context); |
| 138 | } |
| 139 | |
| 140 | BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll() |
| 141 | { |
| 142 | std::string load_update_query = getUpdateFieldAndDate(); |
| 143 | if (is_local) |
| 144 | return executeQuery(load_update_query, context, true).in; |
| 145 | return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, sample_block, context); |
| 146 | } |
| 147 | |
| 148 | BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids) |
| 149 | { |
| 150 | return createStreamForSelectiveLoad(query_builder.composeLoadIdsQuery(ids)); |
| 151 | } |
| 152 | |
| 153 | |
| 154 | BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) |
| 155 | { |
| 156 | return createStreamForSelectiveLoad( |
| 157 | query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES)); |
| 158 | } |
| 159 | |
| 160 | bool ClickHouseDictionarySource::isModified() const |
| 161 | { |
| 162 | if (!invalidate_query.empty()) |
| 163 | { |
| 164 | auto response = doInvalidateQuery(invalidate_query); |
| 165 | LOG_TRACE(log, "Invalidate query has returned: " << response << ", previous value: " << invalidate_query_response); |
| 166 | if (invalidate_query_response == response) |
| 167 | return false; |
| 168 | invalidate_query_response = response; |
| 169 | } |
| 170 | return true; |
| 171 | } |
| 172 | |
| 173 | bool ClickHouseDictionarySource::hasUpdateField() const |
| 174 | { |
| 175 | return !update_field.empty(); |
| 176 | } |
| 177 | |
| 178 | std::string ClickHouseDictionarySource::toString() const |
| 179 | { |
| 180 | return "ClickHouse: " + db + '.' + table + (where.empty() ? "" : ", where: " + where); |
| 181 | } |
| 182 | |
| 183 | |
| 184 | BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(const std::string & query) |
| 185 | { |
| 186 | if (is_local) |
| 187 | return executeQuery(query, context, true).in; |
| 188 | |
| 189 | return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context); |
| 190 | } |
| 191 | |
| 192 | std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const |
| 193 | { |
| 194 | LOG_TRACE(log, "Performing invalidate query" ); |
| 195 | if (is_local) |
| 196 | { |
| 197 | Context query_context = context; |
| 198 | auto input_block = executeQuery(request, query_context, true).in; |
| 199 | return readInvalidateQuery(*input_block); |
| 200 | } |
| 201 | else |
| 202 | { |
| 203 | /// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result. |
| 204 | Block invalidate_sample_block; |
| 205 | RemoteBlockInputStream invalidate_stream(pool, request, invalidate_sample_block, context); |
| 206 | return readInvalidateQuery(invalidate_stream); |
| 207 | } |
| 208 | } |
| 209 | |
| 210 | |
| 211 | void registerDictionarySourceClickHouse(DictionarySourceFactory & factory) |
| 212 | { |
| 213 | auto createTableSource = [=](const DictionaryStructure & dict_struct, |
| 214 | const Poco::Util::AbstractConfiguration & config, |
| 215 | const std::string & config_prefix, |
| 216 | Block & sample_block, |
| 217 | const Context & context, |
| 218 | bool /* check_config */) -> DictionarySourcePtr |
| 219 | { |
| 220 | return std::make_unique<ClickHouseDictionarySource>(dict_struct, config, config_prefix + ".clickhouse" , sample_block, context); |
| 221 | }; |
| 222 | factory.registerSource("clickhouse" , createTableSource); |
| 223 | } |
| 224 | |
| 225 | } |
| 226 | |