1 | #include "MainHandler.h" |
2 | |
3 | #include "validateODBCConnectionString.h" |
4 | #include <memory> |
5 | #include <DataStreams/copyData.h> |
6 | #include <DataTypes/DataTypeFactory.h> |
7 | #include "ODBCBlockInputStream.h" |
8 | #include <Formats/FormatFactory.h> |
9 | #include <IO/WriteBufferFromHTTPServerResponse.h> |
10 | #include <IO/WriteHelpers.h> |
11 | #include <IO/ReadHelpers.h> |
12 | #include <Interpreters/Context.h> |
13 | #include <Poco/Net/HTTPServerRequest.h> |
14 | #include <Poco/Net/HTTPServerResponse.h> |
15 | #include <Poco/Net/HTMLForm.h> |
16 | #include <common/logger_useful.h> |
17 | #include <mutex> |
18 | #include <Poco/ThreadPool.h> |
19 | |
20 | namespace DB |
21 | { |
22 | namespace |
23 | { |
24 | std::unique_ptr<Block> parseColumns(std::string && column_string) |
25 | { |
26 | std::unique_ptr<Block> sample_block = std::make_unique<Block>(); |
27 | auto names_and_types = NamesAndTypesList::parse(column_string); |
28 | for (const NameAndTypePair & column_data : names_and_types) |
29 | sample_block->insert({column_data.type, column_data.name}); |
30 | return sample_block; |
31 | } |
32 | } |
33 | |
34 | using PocoSessionPoolConstructor = std::function<std::shared_ptr<Poco::Data::SessionPool>()>; |
35 | /** Is used to adjust max size of default Poco thread pool. See issue #750 |
36 | * Acquire the lock, resize pool and construct new Session. |
37 | */ |
38 | static std::shared_ptr<Poco::Data::SessionPool> createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr) |
39 | { |
40 | static std::mutex mutex; |
41 | |
42 | Poco::ThreadPool & pool = Poco::ThreadPool::defaultPool(); |
43 | |
44 | /// NOTE: The lock don't guarantee that external users of the pool don't change its capacity |
45 | std::unique_lock lock(mutex); |
46 | |
47 | if (pool.available() == 0) |
48 | pool.addCapacity(2 * std::max(pool.capacity(), 1)); |
49 | |
50 | return pool_constr(); |
51 | } |
52 | |
53 | ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str) |
54 | { |
55 | std::lock_guard lock(mutex); |
56 | if (!pool_map->count(connection_str)) |
57 | { |
58 | pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] |
59 | { |
60 | return std::make_shared<Poco::Data::SessionPool>("ODBC" , validateODBCConnectionString(connection_str)); |
61 | })); |
62 | } |
63 | return pool_map->at(connection_str); |
64 | } |
65 | |
66 | void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) |
67 | { |
68 | Poco::Net::HTMLForm params(request, request.stream()); |
69 | LOG_TRACE(log, "Request URI: " + request.getURI()); |
70 | |
71 | auto process_error = [&response, this](const std::string & message) |
72 | { |
73 | response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); |
74 | if (!response.sent()) |
75 | response.send() << message << std::endl; |
76 | LOG_WARNING(log, message); |
77 | }; |
78 | |
79 | if (!params.has("query" )) |
80 | { |
81 | process_error("No 'query' in request body" ); |
82 | return; |
83 | } |
84 | |
85 | if (!params.has("columns" )) |
86 | { |
87 | process_error("No 'columns' in request URL" ); |
88 | return; |
89 | } |
90 | |
91 | if (!params.has("connection_string" )) |
92 | { |
93 | process_error("No 'connection_string' in request URL" ); |
94 | return; |
95 | } |
96 | |
97 | UInt64 max_block_size = DEFAULT_BLOCK_SIZE; |
98 | if (params.has("max_block_size" )) |
99 | { |
100 | std::string max_block_size_str = params.get("max_block_size" , "" ); |
101 | if (max_block_size_str.empty()) |
102 | { |
103 | process_error("Empty max_block_size specified" ); |
104 | return; |
105 | } |
106 | max_block_size = parse<size_t>(max_block_size_str); |
107 | } |
108 | |
109 | std::string columns = params.get("columns" ); |
110 | std::unique_ptr<Block> sample_block; |
111 | try |
112 | { |
113 | sample_block = parseColumns(std::move(columns)); |
114 | } |
115 | catch (const Exception & ex) |
116 | { |
117 | process_error("Invalid 'columns' parameter in request body '" + ex.message() + "'" ); |
118 | LOG_WARNING(log, ex.getStackTrace().toString()); |
119 | return; |
120 | } |
121 | |
122 | std::string format = params.get("format" , "RowBinary" ); |
123 | std::string query = params.get("query" ); |
124 | LOG_TRACE(log, "Query: " << query); |
125 | |
126 | std::string connection_string = params.get("connection_string" ); |
127 | LOG_TRACE(log, "Connection string: '" << connection_string << "'" ); |
128 | |
129 | WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); |
130 | try |
131 | { |
132 | BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context); |
133 | auto pool = getPool(connection_string); |
134 | ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size); |
135 | copyData(inp, *writer); |
136 | } |
137 | catch (...) |
138 | { |
139 | auto message = getCurrentExceptionMessage(true); |
140 | response.setStatusAndReason( |
141 | Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending |
142 | writeStringBinary(message, out); |
143 | tryLogCurrentException(log); |
144 | } |
145 | } |
146 | } |
147 | |