1#include "MySQLDictionarySource.h"
2#include <Poco/Util/AbstractConfiguration.h>
3#include "DictionarySourceFactory.h"
4#include "DictionaryStructure.h"
5#include "config_core.h"
6#include "registerDictionaries.h"
7
8namespace DB
9{
10namespace ErrorCodes
11{
12 extern const int SUPPORT_IS_DISABLED;
13}
14
15void registerDictionarySourceMysql(DictionarySourceFactory & factory)
16{
17 auto createTableSource = [=](const DictionaryStructure & dict_struct,
18 const Poco::Util::AbstractConfiguration & config,
19 const std::string & config_prefix,
20 Block & sample_block,
21 const Context & /* context */,
22 bool /* check_config */) -> DictionarySourcePtr {
23#if USE_MYSQL
24 return std::make_unique<MySQLDictionarySource>(dict_struct, config, config_prefix + ".mysql", sample_block);
25#else
26 (void)dict_struct;
27 (void)config;
28 (void)config_prefix;
29 (void)sample_block;
30 throw Exception{"Dictionary source of type `mysql` is disabled because ClickHouse was built without mysql support.",
31 ErrorCodes::SUPPORT_IS_DISABLED};
32#endif
33 };
34 factory.registerSource("mysql", createTableSource);
35}
36
37}
38
39
40#if USE_MYSQL
41# include <Columns/ColumnString.h>
42# include <DataTypes/DataTypeString.h>
43# include <IO/WriteBufferFromString.h>
44# include <IO/WriteHelpers.h>
45# include <common/LocalDateTime.h>
46# include <common/logger_useful.h>
47# include <Formats/MySQLBlockInputStream.h>
48# include "readInvalidateQuery.h"
49
50namespace DB
51{
52static const UInt64 max_block_size = 8192;
53
54
55MySQLDictionarySource::MySQLDictionarySource(
56 const DictionaryStructure & dict_struct_,
57 const Poco::Util::AbstractConfiguration & config,
58 const std::string & config_prefix,
59 const Block & sample_block_)
60 : log(&Logger::get("MySQLDictionarySource"))
61 , update_time{std::chrono::system_clock::from_time_t(0)}
62 , dict_struct{dict_struct_}
63 , db{config.getString(config_prefix + ".db", "")}
64 , table{config.getString(config_prefix + ".table")}
65 , where{config.getString(config_prefix + ".where", "")}
66 , update_field{config.getString(config_prefix + ".update_field", "")}
67 , dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
68 , sample_block{sample_block_}
69 , pool{config, config_prefix}
70 , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
71 , load_all_query{query_builder.composeLoadAllQuery()}
72 , invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
73 , close_connection{config.getBool(config_prefix + ".close_connection", false)}
74{
75}
76
77/// copy-constructor is provided in order to support cloneability
78MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other)
79 : log(&Logger::get("MySQLDictionarySource"))
80 , update_time{other.update_time}
81 , dict_struct{other.dict_struct}
82 , db{other.db}
83 , table{other.table}
84 , where{other.where}
85 , update_field{other.update_field}
86 , dont_check_update_time{other.dont_check_update_time}
87 , sample_block{other.sample_block}
88 , pool{other.pool}
89 , query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
90 , load_all_query{other.load_all_query}
91 , last_modification{other.last_modification}
92 , invalidate_query{other.invalidate_query}
93 , invalidate_query_response{other.invalidate_query_response}
94 , close_connection{other.close_connection}
95{
96}
97
98std::string MySQLDictionarySource::getUpdateFieldAndDate()
99{
100 if (update_time != std::chrono::system_clock::from_time_t(0))
101 {
102 auto tmp_time = update_time;
103 update_time = std::chrono::system_clock::now();
104 time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
105 std::string str_time = std::to_string(LocalDateTime(hr_time));
106 return query_builder.composeUpdateQuery(update_field, str_time);
107 }
108 else
109 {
110 update_time = std::chrono::system_clock::now();
111 return query_builder.composeLoadAllQuery();
112 }
113}
114
115BlockInputStreamPtr MySQLDictionarySource::loadAll()
116{
117 last_modification = getLastModification();
118
119 LOG_TRACE(log, load_all_query);
120 return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size, close_connection);
121}
122
123BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
124{
125 last_modification = getLastModification();
126
127 std::string load_update_query = getUpdateFieldAndDate();
128 LOG_TRACE(log, load_update_query);
129 return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size, close_connection);
130}
131
132BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
133{
134 /// We do not log in here and do not update the modification time, as the request can be large, and often called.
135
136 const auto query = query_builder.composeLoadIdsQuery(ids);
137 return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size, close_connection);
138}
139
140BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
141{
142 /// We do not log in here and do not update the modification time, as the request can be large, and often called.
143
144 const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
145 return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size, close_connection);
146}
147
148bool MySQLDictionarySource::isModified() const
149{
150 if (!invalidate_query.empty())
151 {
152 auto response = doInvalidateQuery(invalidate_query);
153 if (response == invalidate_query_response)
154 return false;
155 invalidate_query_response = response;
156 return true;
157 }
158
159 if (dont_check_update_time)
160 return true;
161
162 return getLastModification() > last_modification;
163}
164
165bool MySQLDictionarySource::supportsSelectiveLoad() const
166{
167 return true;
168}
169
170bool MySQLDictionarySource::hasUpdateField() const
171{
172 return !update_field.empty();
173}
174
175DictionarySourcePtr MySQLDictionarySource::clone() const
176{
177 return std::make_unique<MySQLDictionarySource>(*this);
178}
179
180std::string MySQLDictionarySource::toString() const
181{
182 return "MySQL: " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
183}
184
185std::string MySQLDictionarySource::quoteForLike(const std::string s)
186{
187 std::string tmp;
188 tmp.reserve(s.size());
189
190 for (auto c : s)
191 {
192 if (c == '%' || c == '_' || c == '\\')
193 tmp.push_back('\\');
194 tmp.push_back(c);
195 }
196
197 WriteBufferFromOwnString out;
198 writeQuoted(tmp, out);
199 return out.str();
200}
201
202LocalDateTime MySQLDictionarySource::getLastModification() const
203{
204 LocalDateTime modification_time{std::time(nullptr)};
205
206 if (dont_check_update_time)
207 return modification_time;
208
209 try
210 {
211 auto connection = pool.Get();
212 auto query = connection->query("SHOW TABLE STATUS LIKE " + quoteForLike(table));
213
214 LOG_TRACE(log, query.str());
215
216 auto result = query.use();
217
218 size_t fetched_rows = 0;
219 if (auto row = result.fetch())
220 {
221 ++fetched_rows;
222 const auto UPDATE_TIME_IDX = 12;
223 const auto & update_time_value = row[UPDATE_TIME_IDX];
224
225 if (!update_time_value.isNull())
226 {
227 modification_time = update_time_value.getDateTime();
228 LOG_TRACE(log, "Got modification time: " << modification_time);
229 }
230
231 /// fetch remaining rows to avoid "commands out of sync" error
232 while (result.fetch())
233 ++fetched_rows;
234 }
235
236 if (0 == fetched_rows)
237 LOG_ERROR(log, "Cannot find table in SHOW TABLE STATUS result.");
238
239 if (fetched_rows > 1)
240 LOG_ERROR(log, "Found more than one table in SHOW TABLE STATUS result.");
241 }
242 catch (...)
243 {
244 tryLogCurrentException("MySQLDictionarySource");
245 }
246
247 /// we suppose failure to get modification time is not an error, therefore return current time
248 return modification_time;
249}
250
251std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request) const
252{
253 Block invalidate_sample_block;
254 ColumnPtr column(ColumnString::create());
255 invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
256 MySQLBlockInputStream block_input_stream(pool.Get(), request, invalidate_sample_block, 1, close_connection);
257 return readInvalidateQuery(block_input_stream);
258}
259
260}
261
262#endif
263