1 | #include "StorageXDBC.h" |
2 | #include <Interpreters/Context.h> |
3 | #include <Interpreters/evaluateConstantExpression.h> |
4 | #include <Parsers/ASTLiteral.h> |
5 | #include <Storages/StorageFactory.h> |
6 | #include <Storages/transformQueryForExternalDatabase.h> |
7 | #include <Poco/Util/AbstractConfiguration.h> |
8 | #include <common/logger_useful.h> |
9 | #include <Formats/FormatFactory.h> |
10 | #include <IO/CompressionMethod.h> |
11 | #include <IO/ReadHelpers.h> |
12 | #include <IO/ReadWriteBufferFromHTTP.h> |
13 | #include <Poco/File.h> |
14 | #include <Poco/Net/HTTPRequest.h> |
15 | #include <Poco/Path.h> |
16 | #include <Common/ShellCommand.h> |
17 | #include <ext/range.h> |
18 | |
19 | namespace DB |
20 | { |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
24 | } |
25 | |
26 | |
27 | StorageXDBC::StorageXDBC( |
28 | const std::string & database_name_, |
29 | const std::string & table_name_, |
30 | const std::string & remote_database_name_, |
31 | const std::string & remote_table_name_, |
32 | const ColumnsDescription & columns_, |
33 | const Context & context_, |
34 | const BridgeHelperPtr bridge_helper_) |
35 | /// Please add support for constraints as soon as StorageODBC or JDBC will support insertion. |
36 | : IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_, ConstraintsDescription{}, "" /* CompressionMethod */) |
37 | , bridge_helper(bridge_helper_) |
38 | , remote_database_name(remote_database_name_) |
39 | , remote_table_name(remote_table_name_) |
40 | { |
41 | log = &Poco::Logger::get("Storage" + bridge_helper->getName()); |
42 | uri = bridge_helper->getMainURI(); |
43 | } |
44 | |
45 | std::string StorageXDBC::getReadMethod() const |
46 | { |
47 | return Poco::Net::HTTPRequest::HTTP_POST; |
48 | } |
49 | |
50 | std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(const Names & column_names, |
51 | const SelectQueryInfo & /*query_info*/, |
52 | const Context & /*context*/, |
53 | QueryProcessingStage::Enum & /*processed_stage*/, |
54 | size_t max_block_size) const |
55 | { |
56 | NamesAndTypesList cols; |
57 | for (const String & name : column_names) |
58 | { |
59 | auto column_data = getColumn(name); |
60 | cols.emplace_back(column_data.name, column_data.type); |
61 | } |
62 | return bridge_helper->getURLParams(cols.toString(), max_block_size); |
63 | } |
64 | |
65 | std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(const Names & /*column_names*/, |
66 | const SelectQueryInfo & query_info, |
67 | const Context & context, |
68 | QueryProcessingStage::Enum & /*processed_stage*/, |
69 | size_t /*max_block_size*/) const |
70 | { |
71 | String query = transformQueryForExternalDatabase(*query_info.query, |
72 | getColumns().getOrdinary(), |
73 | bridge_helper->getIdentifierQuotingStyle(), |
74 | remote_database_name, |
75 | remote_table_name, |
76 | context); |
77 | |
78 | return [query](std::ostream & os) { os << "query=" << query; }; |
79 | } |
80 | |
81 | BlockInputStreams StorageXDBC::read(const Names & column_names, |
82 | const SelectQueryInfo & query_info, |
83 | const Context & context, |
84 | QueryProcessingStage::Enum processed_stage, |
85 | size_t max_block_size, |
86 | unsigned num_streams) |
87 | { |
88 | check(column_names); |
89 | |
90 | bridge_helper->startBridgeSync(); |
91 | return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams); |
92 | } |
93 | |
94 | |
95 | Block StorageXDBC::(const Names & column_names) const |
96 | { |
97 | return getSampleBlockForColumns(column_names); |
98 | } |
99 | |
100 | std::string StorageXDBC::getName() const |
101 | { |
102 | return bridge_helper->getName(); |
103 | } |
104 | |
105 | namespace |
106 | { |
107 | template <typename BridgeHelperMixin> |
108 | void registerXDBCStorage(StorageFactory & factory, const std::string & name) |
109 | { |
110 | factory.registerStorage(name, [name](const StorageFactory::Arguments & args) |
111 | { |
112 | ASTs & engine_args = args.engine_args; |
113 | |
114 | if (engine_args.size() != 3) |
115 | throw Exception("Storage " + name + " requires exactly 3 parameters: " + name + "('DSN', database or schema, table)" , |
116 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
117 | |
118 | for (size_t i = 0; i < 3; ++i) |
119 | engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context); |
120 | |
121 | BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<BridgeHelperMixin>>(args.context, |
122 | args.context.getSettingsRef().http_receive_timeout.value, |
123 | engine_args[0]->as<ASTLiteral &>().value.safeGet<String>()); |
124 | return std::make_shared<StorageXDBC>(args.database_name, args.table_name, |
125 | engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(), |
126 | engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(), |
127 | args.columns, |
128 | args.context, |
129 | bridge_helper); |
130 | |
131 | }); |
132 | } |
133 | } |
134 | |
135 | void registerStorageJDBC(StorageFactory & factory) |
136 | { |
137 | registerXDBCStorage<JDBCBridgeMixin>(factory, "JDBC" ); |
138 | } |
139 | |
140 | void registerStorageODBC(StorageFactory & factory) |
141 | { |
142 | registerXDBCStorage<ODBCBridgeMixin>(factory, "ODBC" ); |
143 | } |
144 | } |
145 | |