| 1 | #include "MainHandler.h" |
| 2 | |
| 3 | #include "validateODBCConnectionString.h" |
| 4 | #include <memory> |
| 5 | #include <DataStreams/copyData.h> |
| 6 | #include <DataTypes/DataTypeFactory.h> |
| 7 | #include "ODBCBlockInputStream.h" |
| 8 | #include <Formats/FormatFactory.h> |
| 9 | #include <IO/WriteBufferFromHTTPServerResponse.h> |
| 10 | #include <IO/WriteHelpers.h> |
| 11 | #include <IO/ReadHelpers.h> |
| 12 | #include <Interpreters/Context.h> |
| 13 | #include <Poco/Net/HTTPServerRequest.h> |
| 14 | #include <Poco/Net/HTTPServerResponse.h> |
| 15 | #include <Poco/Net/HTMLForm.h> |
| 16 | #include <common/logger_useful.h> |
| 17 | #include <mutex> |
| 18 | #include <Poco/ThreadPool.h> |
| 19 | |
| 20 | namespace DB |
| 21 | { |
| 22 | namespace |
| 23 | { |
| 24 | std::unique_ptr<Block> parseColumns(std::string && column_string) |
| 25 | { |
| 26 | std::unique_ptr<Block> sample_block = std::make_unique<Block>(); |
| 27 | auto names_and_types = NamesAndTypesList::parse(column_string); |
| 28 | for (const NameAndTypePair & column_data : names_and_types) |
| 29 | sample_block->insert({column_data.type, column_data.name}); |
| 30 | return sample_block; |
| 31 | } |
| 32 | } |
| 33 | |
| 34 | using PocoSessionPoolConstructor = std::function<std::shared_ptr<Poco::Data::SessionPool>()>; |
| 35 | /** Is used to adjust max size of default Poco thread pool. See issue #750 |
| 36 | * Acquire the lock, resize pool and construct new Session. |
| 37 | */ |
| 38 | static std::shared_ptr<Poco::Data::SessionPool> createAndCheckResizePocoSessionPool(PocoSessionPoolConstructor pool_constr) |
| 39 | { |
| 40 | static std::mutex mutex; |
| 41 | |
| 42 | Poco::ThreadPool & pool = Poco::ThreadPool::defaultPool(); |
| 43 | |
| 44 | /// NOTE: The lock don't guarantee that external users of the pool don't change its capacity |
| 45 | std::unique_lock lock(mutex); |
| 46 | |
| 47 | if (pool.available() == 0) |
| 48 | pool.addCapacity(2 * std::max(pool.capacity(), 1)); |
| 49 | |
| 50 | return pool_constr(); |
| 51 | } |
| 52 | |
| 53 | ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str) |
| 54 | { |
| 55 | std::lock_guard lock(mutex); |
| 56 | if (!pool_map->count(connection_str)) |
| 57 | { |
| 58 | pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] |
| 59 | { |
| 60 | return std::make_shared<Poco::Data::SessionPool>("ODBC" , validateODBCConnectionString(connection_str)); |
| 61 | })); |
| 62 | } |
| 63 | return pool_map->at(connection_str); |
| 64 | } |
| 65 | |
| 66 | void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) |
| 67 | { |
| 68 | Poco::Net::HTMLForm params(request, request.stream()); |
| 69 | LOG_TRACE(log, "Request URI: " + request.getURI()); |
| 70 | |
| 71 | auto process_error = [&response, this](const std::string & message) |
| 72 | { |
| 73 | response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); |
| 74 | if (!response.sent()) |
| 75 | response.send() << message << std::endl; |
| 76 | LOG_WARNING(log, message); |
| 77 | }; |
| 78 | |
| 79 | if (!params.has("query" )) |
| 80 | { |
| 81 | process_error("No 'query' in request body" ); |
| 82 | return; |
| 83 | } |
| 84 | |
| 85 | if (!params.has("columns" )) |
| 86 | { |
| 87 | process_error("No 'columns' in request URL" ); |
| 88 | return; |
| 89 | } |
| 90 | |
| 91 | if (!params.has("connection_string" )) |
| 92 | { |
| 93 | process_error("No 'connection_string' in request URL" ); |
| 94 | return; |
| 95 | } |
| 96 | |
| 97 | UInt64 max_block_size = DEFAULT_BLOCK_SIZE; |
| 98 | if (params.has("max_block_size" )) |
| 99 | { |
| 100 | std::string max_block_size_str = params.get("max_block_size" , "" ); |
| 101 | if (max_block_size_str.empty()) |
| 102 | { |
| 103 | process_error("Empty max_block_size specified" ); |
| 104 | return; |
| 105 | } |
| 106 | max_block_size = parse<size_t>(max_block_size_str); |
| 107 | } |
| 108 | |
| 109 | std::string columns = params.get("columns" ); |
| 110 | std::unique_ptr<Block> sample_block; |
| 111 | try |
| 112 | { |
| 113 | sample_block = parseColumns(std::move(columns)); |
| 114 | } |
| 115 | catch (const Exception & ex) |
| 116 | { |
| 117 | process_error("Invalid 'columns' parameter in request body '" + ex.message() + "'" ); |
| 118 | LOG_WARNING(log, ex.getStackTrace().toString()); |
| 119 | return; |
| 120 | } |
| 121 | |
| 122 | std::string format = params.get("format" , "RowBinary" ); |
| 123 | std::string query = params.get("query" ); |
| 124 | LOG_TRACE(log, "Query: " << query); |
| 125 | |
| 126 | std::string connection_string = params.get("connection_string" ); |
| 127 | LOG_TRACE(log, "Connection string: '" << connection_string << "'" ); |
| 128 | |
| 129 | WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); |
| 130 | try |
| 131 | { |
| 132 | BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context); |
| 133 | auto pool = getPool(connection_string); |
| 134 | ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size); |
| 135 | copyData(inp, *writer); |
| 136 | } |
| 137 | catch (...) |
| 138 | { |
| 139 | auto message = getCurrentExceptionMessage(true); |
| 140 | response.setStatusAndReason( |
| 141 | Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending |
| 142 | writeStringBinary(message, out); |
| 143 | tryLogCurrentException(log); |
| 144 | } |
| 145 | } |
| 146 | } |
| 147 | |