1#include "StorageXDBC.h"
2#include <Interpreters/Context.h>
3#include <Interpreters/evaluateConstantExpression.h>
4#include <Parsers/ASTLiteral.h>
5#include <Storages/StorageFactory.h>
6#include <Storages/transformQueryForExternalDatabase.h>
7#include <Poco/Util/AbstractConfiguration.h>
8#include <common/logger_useful.h>
9#include <Formats/FormatFactory.h>
10#include <IO/CompressionMethod.h>
11#include <IO/ReadHelpers.h>
12#include <IO/ReadWriteBufferFromHTTP.h>
13#include <Poco/File.h>
14#include <Poco/Net/HTTPRequest.h>
15#include <Poco/Path.h>
16#include <Common/ShellCommand.h>
17#include <ext/range.h>
18
19namespace DB
20{
21namespace ErrorCodes
22{
23 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
24}
25
26
27StorageXDBC::StorageXDBC(
28 const std::string & database_name_,
29 const std::string & table_name_,
30 const std::string & remote_database_name_,
31 const std::string & remote_table_name_,
32 const ColumnsDescription & columns_,
33 const Context & context_,
34 const BridgeHelperPtr bridge_helper_)
35 /// Please add support for constraints as soon as StorageODBC or JDBC will support insertion.
36 : IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_, ConstraintsDescription{}, "" /* CompressionMethod */)
37 , bridge_helper(bridge_helper_)
38 , remote_database_name(remote_database_name_)
39 , remote_table_name(remote_table_name_)
40{
41 log = &Poco::Logger::get("Storage" + bridge_helper->getName());
42 uri = bridge_helper->getMainURI();
43}
44
45std::string StorageXDBC::getReadMethod() const
46{
47 return Poco::Net::HTTPRequest::HTTP_POST;
48}
49
50std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(const Names & column_names,
51 const SelectQueryInfo & /*query_info*/,
52 const Context & /*context*/,
53 QueryProcessingStage::Enum & /*processed_stage*/,
54 size_t max_block_size) const
55{
56 NamesAndTypesList cols;
57 for (const String & name : column_names)
58 {
59 auto column_data = getColumn(name);
60 cols.emplace_back(column_data.name, column_data.type);
61 }
62 return bridge_helper->getURLParams(cols.toString(), max_block_size);
63}
64
65std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(const Names & /*column_names*/,
66 const SelectQueryInfo & query_info,
67 const Context & context,
68 QueryProcessingStage::Enum & /*processed_stage*/,
69 size_t /*max_block_size*/) const
70{
71 String query = transformQueryForExternalDatabase(*query_info.query,
72 getColumns().getOrdinary(),
73 bridge_helper->getIdentifierQuotingStyle(),
74 remote_database_name,
75 remote_table_name,
76 context);
77
78 return [query](std::ostream & os) { os << "query=" << query; };
79}
80
81BlockInputStreams StorageXDBC::read(const Names & column_names,
82 const SelectQueryInfo & query_info,
83 const Context & context,
84 QueryProcessingStage::Enum processed_stage,
85 size_t max_block_size,
86 unsigned num_streams)
87{
88 check(column_names);
89
90 bridge_helper->startBridgeSync();
91 return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
92}
93
94
95Block StorageXDBC::getHeaderBlock(const Names & column_names) const
96{
97 return getSampleBlockForColumns(column_names);
98}
99
100std::string StorageXDBC::getName() const
101{
102 return bridge_helper->getName();
103}
104
105namespace
106{
107 template <typename BridgeHelperMixin>
108 void registerXDBCStorage(StorageFactory & factory, const std::string & name)
109 {
110 factory.registerStorage(name, [name](const StorageFactory::Arguments & args)
111 {
112 ASTs & engine_args = args.engine_args;
113
114 if (engine_args.size() != 3)
115 throw Exception("Storage " + name + " requires exactly 3 parameters: " + name + "('DSN', database or schema, table)",
116 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
117
118 for (size_t i = 0; i < 3; ++i)
119 engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
120
121 BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<BridgeHelperMixin>>(args.context,
122 args.context.getSettingsRef().http_receive_timeout.value,
123 engine_args[0]->as<ASTLiteral &>().value.safeGet<String>());
124 return std::make_shared<StorageXDBC>(args.database_name, args.table_name,
125 engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(),
126 engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(),
127 args.columns,
128 args.context,
129 bridge_helper);
130
131 });
132 }
133}
134
135void registerStorageJDBC(StorageFactory & factory)
136{
137 registerXDBCStorage<JDBCBridgeMixin>(factory, "JDBC");
138}
139
140void registerStorageODBC(StorageFactory & factory)
141{
142 registerXDBCStorage<ODBCBridgeMixin>(factory, "ODBC");
143}
144}
145