| 1 | #include <boost/program_options.hpp> |
| 2 | #include <boost/algorithm/string.hpp> |
| 3 | #include <DataStreams/AsynchronousBlockInputStream.h> |
| 4 | #include <DataTypes/DataTypeFactory.h> |
| 5 | #include <Interpreters/Context.h> |
| 6 | #include <IO/copyData.h> |
| 7 | #include <IO/ReadBufferFromIStream.h> |
| 8 | #include <IO/ReadBufferFromFile.h> |
| 9 | #include <IO/LimitReadBuffer.h> |
| 10 | #include <Storages/StorageMemory.h> |
| 11 | #include <Poco/Net/MessageHeader.h> |
| 12 | |
| 13 | #include <Core/ExternalTable.h> |
| 14 | |
| 15 | |
| 16 | namespace DB |
| 17 | { |
| 18 | |
| 19 | namespace ErrorCodes |
| 20 | { |
| 21 | extern const int BAD_ARGUMENTS; |
| 22 | } |
| 23 | |
| 24 | |
| 25 | ExternalTableData BaseExternalTable::getData(const Context & context) |
| 26 | { |
| 27 | initReadBuffer(); |
| 28 | initSampleBlock(); |
| 29 | auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE); |
| 30 | return std::make_pair(std::make_shared<AsynchronousBlockInputStream>(input), name); |
| 31 | } |
| 32 | |
| 33 | void BaseExternalTable::clean() |
| 34 | { |
| 35 | name = "" ; |
| 36 | file = "" ; |
| 37 | format = "" ; |
| 38 | structure.clear(); |
| 39 | sample_block = Block(); |
| 40 | read_buffer.reset(); |
| 41 | } |
| 42 | |
| 43 | /// Function for debugging information output |
| 44 | void BaseExternalTable::write() |
| 45 | { |
| 46 | std::cerr << "file " << file << std::endl; |
| 47 | std::cerr << "name " << name << std::endl; |
| 48 | std::cerr << "format " << format << std::endl; |
| 49 | std::cerr << "structure: \n" ; |
| 50 | for (size_t i = 0; i < structure.size(); ++i) |
| 51 | std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl; |
| 52 | } |
| 53 | |
| 54 | std::vector<std::string> BaseExternalTable::split(const std::string & s, const std::string & d) |
| 55 | { |
| 56 | std::vector<std::string> res; |
| 57 | boost::split(res, s, boost::algorithm::is_any_of(d), boost::algorithm::token_compress_on); |
| 58 | return res; |
| 59 | } |
| 60 | |
| 61 | void BaseExternalTable::parseStructureFromStructureField(const std::string & argument) |
| 62 | { |
| 63 | std::vector<std::string> vals = split(argument, " ," ); |
| 64 | |
| 65 | if (vals.size() & 1) |
| 66 | throw Exception("Odd number of attributes in section structure" , ErrorCodes::BAD_ARGUMENTS); |
| 67 | |
| 68 | for (size_t i = 0; i < vals.size(); i += 2) |
| 69 | structure.emplace_back(vals[i], vals[i + 1]); |
| 70 | } |
| 71 | |
| 72 | void BaseExternalTable::parseStructureFromTypesField(const std::string & argument) |
| 73 | { |
| 74 | std::vector<std::string> vals = split(argument, " ," ); |
| 75 | |
| 76 | for (size_t i = 0; i < vals.size(); ++i) |
| 77 | structure.emplace_back("_" + toString(i + 1), vals[i]); |
| 78 | } |
| 79 | |
| 80 | void BaseExternalTable::initSampleBlock() |
| 81 | { |
| 82 | const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); |
| 83 | |
| 84 | for (size_t i = 0; i < structure.size(); ++i) |
| 85 | { |
| 86 | ColumnWithTypeAndName column; |
| 87 | column.name = structure[i].first; |
| 88 | column.type = data_type_factory.get(structure[i].second); |
| 89 | column.column = column.type->createColumn(); |
| 90 | sample_block.insert(std::move(column)); |
| 91 | } |
| 92 | } |
| 93 | |
| 94 | |
| 95 | void ExternalTable::initReadBuffer() |
| 96 | { |
| 97 | if (file == "-" ) |
| 98 | read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO); |
| 99 | else |
| 100 | read_buffer = std::make_unique<ReadBufferFromFile>(file); |
| 101 | } |
| 102 | |
| 103 | ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options) |
| 104 | { |
| 105 | if (external_options.count("file" )) |
| 106 | file = external_options["file" ].as<std::string>(); |
| 107 | else |
| 108 | throw Exception("--file field have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
| 109 | |
| 110 | if (external_options.count("name" )) |
| 111 | name = external_options["name" ].as<std::string>(); |
| 112 | else |
| 113 | throw Exception("--name field have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
| 114 | |
| 115 | if (external_options.count("format" )) |
| 116 | format = external_options["format" ].as<std::string>(); |
| 117 | else |
| 118 | throw Exception("--format field have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
| 119 | |
| 120 | if (external_options.count("structure" )) |
| 121 | parseStructureFromStructureField(external_options["structure" ].as<std::string>()); |
| 122 | else if (external_options.count("types" )) |
| 123 | parseStructureFromTypesField(external_options["types" ].as<std::string>()); |
| 124 | else |
| 125 | throw Exception("Neither --structure nor --types have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
| 126 | } |
| 127 | |
| 128 | |
| 129 | void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & , std::istream & stream) |
| 130 | { |
| 131 | const Settings & settings = context.getSettingsRef(); |
| 132 | |
| 133 | /// The buffer is initialized here, not in the virtual function initReadBuffer |
| 134 | read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream); |
| 135 | |
| 136 | if (settings.http_max_multipart_form_data_size) |
| 137 | read_buffer = std::make_unique<LimitReadBuffer>( |
| 138 | *read_buffer_impl, settings.http_max_multipart_form_data_size, |
| 139 | true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting" ); |
| 140 | else |
| 141 | read_buffer = std::move(read_buffer_impl); |
| 142 | |
| 143 | /// Retrieve a collection of parameters from MessageHeader |
| 144 | Poco::Net::NameValueCollection content; |
| 145 | std::string label; |
| 146 | Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition" ), label, content); |
| 147 | |
| 148 | /// Get parameters |
| 149 | name = content.get("name" , "_data" ); |
| 150 | format = params.get(name + "_format" , "TabSeparated" ); |
| 151 | |
| 152 | if (params.has(name + "_structure" )) |
| 153 | parseStructureFromStructureField(params.get(name + "_structure" )); |
| 154 | else if (params.has(name + "_types" )) |
| 155 | parseStructureFromTypesField(params.get(name + "_types" )); |
| 156 | else |
| 157 | throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so." , ErrorCodes::BAD_ARGUMENTS); |
| 158 | |
| 159 | ExternalTableData data = getData(context); |
| 160 | |
| 161 | /// Create table |
| 162 | NamesAndTypesList columns = sample_block.getNamesAndTypesList(); |
| 163 | StoragePtr storage = StorageMemory::create("_external" , data.second, ColumnsDescription{columns}, ConstraintsDescription{}); |
| 164 | storage->startup(); |
| 165 | context.addExternalTable(data.second, storage); |
| 166 | BlockOutputStreamPtr output = storage->write(ASTPtr(), context); |
| 167 | |
| 168 | /// Write data |
| 169 | data.first->readPrefix(); |
| 170 | output->writePrefix(); |
| 171 | while (Block block = data.first->read()) |
| 172 | output->write(block); |
| 173 | data.first->readSuffix(); |
| 174 | output->writeSuffix(); |
| 175 | |
| 176 | /// We are ready to receive the next file, for this we clear all the information received |
| 177 | clean(); |
| 178 | } |
| 179 | |
| 180 | } |
| 181 | |