1#include "XDBCDictionarySource.h"
2
3#include <Columns/ColumnString.h>
4#include <DataStreams/IBlockInputStream.h>
5#include <DataTypes/DataTypeString.h>
6#include <Formats/FormatFactory.h>
7#include <IO/ReadWriteBufferFromHTTP.h>
8#include <IO/WriteHelpers.h>
9#include <Interpreters/Context.h>
10#include <Poco/Net/HTTPRequest.h>
11#include <Poco/Util/AbstractConfiguration.h>
12#include <Common/XDBCBridgeHelper.h>
13#include <common/LocalDateTime.h>
14#include <common/logger_useful.h>
15#include "DictionarySourceFactory.h"
16#include "DictionaryStructure.h"
17#include "readInvalidateQuery.h"
18
19#include <Common/config.h>
20#include "registerDictionaries.h"
21
22#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
23# include <Poco/Data/ODBC/Connector.h>
24#endif
25
26namespace DB
27{
28namespace ErrorCodes
29{
30 extern const int SUPPORT_IS_DISABLED;
31}
32
33namespace
34{
35 class XDBCBridgeBlockInputStream : public IBlockInputStream
36 {
37 public:
38 XDBCBridgeBlockInputStream(
39 const Poco::URI & uri,
40 std::function<void(std::ostream &)> callback,
41 const Block & sample_block,
42 const Context & context,
43 UInt64 max_block_size,
44 const ConnectionTimeouts & timeouts,
45 const String name_)
46 : name(name_)
47 {
48 read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, callback, timeouts);
49 reader
50 = FormatFactory::instance().getInput(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
51 }
52
53 Block getHeader() const override { return reader->getHeader(); }
54
55 String getName() const override { return name; }
56
57 private:
58 Block readImpl() override { return reader->read(); }
59
60 String name;
61 std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
62 BlockInputStreamPtr reader;
63 };
64}
65
66static const UInt64 max_block_size = 8192;
67
68
69XDBCDictionarySource::XDBCDictionarySource(
70 const DictionaryStructure & dict_struct_,
71 const Poco::Util::AbstractConfiguration & config_,
72 const std::string & config_prefix_,
73 const Block & sample_block_,
74 const Context & context_,
75 const BridgeHelperPtr bridge_)
76 : log(&Logger::get(bridge_->getName() + "DictionarySource"))
77 , update_time{std::chrono::system_clock::from_time_t(0)}
78 , dict_struct{dict_struct_}
79 , db{config_.getString(config_prefix_ + ".db", "")}
80 , table{config_.getString(config_prefix_ + ".table")}
81 , where{config_.getString(config_prefix_ + ".where", "")}
82 , update_field{config_.getString(config_prefix_ + ".update_field", "")}
83 , sample_block{sample_block_}
84 , query_builder{dict_struct, db, table, where, bridge_->getIdentifierQuotingStyle()}
85 , load_all_query{query_builder.composeLoadAllQuery()}
86 , invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")}
87 , bridge_helper{bridge_}
88 , timeouts{ConnectionTimeouts::getHTTPTimeouts(context_)}
89 , global_context(context_)
90{
91 bridge_url = bridge_helper->getMainURI();
92
93 auto url_params = bridge_helper->getURLParams(sample_block_.getNamesAndTypesList().toString(), max_block_size);
94 for (const auto & [name, value] : url_params)
95 bridge_url.addQueryParameter(name, value);
96}
97
98/// copy-constructor is provided in order to support cloneability
99XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
100 : log(&Logger::get(other.bridge_helper->getName() + "DictionarySource"))
101 , update_time{other.update_time}
102 , dict_struct{other.dict_struct}
103 , db{other.db}
104 , table{other.table}
105 , where{other.where}
106 , update_field{other.update_field}
107 , sample_block{other.sample_block}
108 , query_builder{dict_struct, db, table, where, other.bridge_helper->getIdentifierQuotingStyle()}
109 , load_all_query{other.load_all_query}
110 , invalidate_query{other.invalidate_query}
111 , invalidate_query_response{other.invalidate_query_response}
112 , bridge_helper{other.bridge_helper}
113 , bridge_url{other.bridge_url}
114 , timeouts{other.timeouts}
115 , global_context{other.global_context}
116{
117}
118
119std::string XDBCDictionarySource::getUpdateFieldAndDate()
120{
121 if (update_time != std::chrono::system_clock::from_time_t(0))
122 {
123 auto tmp_time = update_time;
124 update_time = std::chrono::system_clock::now();
125 time_t hr_time = std::chrono::system_clock::to_time_t(tmp_time) - 1;
126 std::string str_time = std::to_string(LocalDateTime(hr_time));
127 return query_builder.composeUpdateQuery(update_field, str_time);
128 }
129 else
130 {
131 update_time = std::chrono::system_clock::now();
132 return query_builder.composeLoadAllQuery();
133 }
134}
135
136BlockInputStreamPtr XDBCDictionarySource::loadAll()
137{
138 LOG_TRACE(log, load_all_query);
139 return loadBase(load_all_query);
140}
141
142BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll()
143{
144 std::string load_query_update = getUpdateFieldAndDate();
145
146 LOG_TRACE(log, load_query_update);
147 return loadBase(load_query_update);
148}
149
150BlockInputStreamPtr XDBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
151{
152 const auto query = query_builder.composeLoadIdsQuery(ids);
153 return loadBase(query);
154}
155
156BlockInputStreamPtr XDBCDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
157{
158 const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
159 return loadBase(query);
160}
161
162bool XDBCDictionarySource::supportsSelectiveLoad() const
163{
164 return true;
165}
166
167bool XDBCDictionarySource::hasUpdateField() const
168{
169 return !update_field.empty();
170}
171
172DictionarySourcePtr XDBCDictionarySource::clone() const
173{
174 return std::make_unique<XDBCDictionarySource>(*this);
175}
176
177std::string XDBCDictionarySource::toString() const
178{
179 return bridge_helper->getName() + ": " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
180}
181
182bool XDBCDictionarySource::isModified() const
183{
184 if (!invalidate_query.empty())
185 {
186 auto response = doInvalidateQuery(invalidate_query);
187 if (invalidate_query_response == response)
188 return false;
189 invalidate_query_response = response;
190 }
191 return true;
192}
193
194
195std::string XDBCDictionarySource::doInvalidateQuery(const std::string & request) const
196{
197 Block invalidate_sample_block;
198 ColumnPtr column(ColumnString::create());
199 invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
200
201 bridge_helper->startBridgeSync();
202
203 auto invalidate_url = bridge_helper->getMainURI();
204 auto url_params = bridge_helper->getURLParams(invalidate_sample_block.getNamesAndTypesList().toString(), max_block_size);
205 for (const auto & [name, value] : url_params)
206 invalidate_url.addQueryParameter(name, value);
207
208 XDBCBridgeBlockInputStream stream(
209 invalidate_url,
210 [request](std::ostream & os) { os << "query=" << request; },
211 invalidate_sample_block,
212 global_context,
213 max_block_size,
214 timeouts,
215 bridge_helper->getName() + "BlockInputStream");
216
217 return readInvalidateQuery(stream);
218}
219
220BlockInputStreamPtr XDBCDictionarySource::loadBase(const std::string & query) const
221{
222 bridge_helper->startBridgeSync();
223 return std::make_shared<XDBCBridgeBlockInputStream>(
224 bridge_url,
225 [query](std::ostream & os) { os << "query=" << query; },
226 sample_block,
227 global_context,
228 max_block_size,
229 timeouts,
230 bridge_helper->getName() + "BlockInputStream");
231}
232
233void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
234{
235#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
236 Poco::Data::ODBC::Connector::registerConnector();
237#endif
238
239 auto createTableSource = [=](const DictionaryStructure & dict_struct,
240 const Poco::Util::AbstractConfiguration & config,
241 const std::string & config_prefix,
242 Block & sample_block,
243 const Context & context,
244 bool /* check_config */) -> DictionarySourcePtr {
245#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
246 BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<ODBCBridgeMixin>>(
247 context, context.getSettings().http_receive_timeout, config.getString(config_prefix + ".odbc.connection_string"));
248 return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".odbc", sample_block, context, bridge);
249#else
250 (void)dict_struct;
251 (void)config;
252 (void)config_prefix;
253 (void)sample_block;
254 (void)context;
255 throw Exception{"Dictionary source of type `odbc` is disabled because poco library was built without ODBC support.",
256 ErrorCodes::SUPPORT_IS_DISABLED};
257#endif
258 };
259 factory.registerSource("odbc", createTableSource);
260}
261
262void registerDictionarySourceJDBC(DictionarySourceFactory & factory)
263{
264 auto createTableSource = [=](const DictionaryStructure & /* dict_struct */,
265 const Poco::Util::AbstractConfiguration & /* config */,
266 const std::string & /* config_prefix */,
267 Block & /* sample_block */,
268 const Context & /* context */,
269 bool /* check_config */) -> DictionarySourcePtr {
270 throw Exception{"Dictionary source of type `jdbc` is disabled until consistent support for nullable fields.",
271 ErrorCodes::SUPPORT_IS_DISABLED};
272 // BridgeHelperPtr bridge = std::make_shared<XDBCBridgeHelper<JDBCBridgeMixin>>(config, context.getSettings().http_receive_timeout, config.getString(config_prefix + ".connection_string"));
273 // return std::make_unique<XDBCDictionarySource>(dict_struct, config, config_prefix + ".jdbc", sample_block, context, bridge);
274 };
275 factory.registerSource("jdbc", createTableSource);
276}
277
278
279}
280