1 | #include <boost/program_options.hpp> |
2 | #include <boost/algorithm/string.hpp> |
3 | #include <DataStreams/AsynchronousBlockInputStream.h> |
4 | #include <DataTypes/DataTypeFactory.h> |
5 | #include <Interpreters/Context.h> |
6 | #include <IO/copyData.h> |
7 | #include <IO/ReadBufferFromIStream.h> |
8 | #include <IO/ReadBufferFromFile.h> |
9 | #include <IO/LimitReadBuffer.h> |
10 | #include <Storages/StorageMemory.h> |
11 | #include <Poco/Net/MessageHeader.h> |
12 | |
13 | #include <Core/ExternalTable.h> |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | namespace ErrorCodes |
20 | { |
21 | extern const int BAD_ARGUMENTS; |
22 | } |
23 | |
24 | |
25 | ExternalTableData BaseExternalTable::getData(const Context & context) |
26 | { |
27 | initReadBuffer(); |
28 | initSampleBlock(); |
29 | auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE); |
30 | return std::make_pair(std::make_shared<AsynchronousBlockInputStream>(input), name); |
31 | } |
32 | |
33 | void BaseExternalTable::clean() |
34 | { |
35 | name = "" ; |
36 | file = "" ; |
37 | format = "" ; |
38 | structure.clear(); |
39 | sample_block = Block(); |
40 | read_buffer.reset(); |
41 | } |
42 | |
43 | /// Function for debugging information output |
44 | void BaseExternalTable::write() |
45 | { |
46 | std::cerr << "file " << file << std::endl; |
47 | std::cerr << "name " << name << std::endl; |
48 | std::cerr << "format " << format << std::endl; |
49 | std::cerr << "structure: \n" ; |
50 | for (size_t i = 0; i < structure.size(); ++i) |
51 | std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl; |
52 | } |
53 | |
54 | std::vector<std::string> BaseExternalTable::split(const std::string & s, const std::string & d) |
55 | { |
56 | std::vector<std::string> res; |
57 | boost::split(res, s, boost::algorithm::is_any_of(d), boost::algorithm::token_compress_on); |
58 | return res; |
59 | } |
60 | |
61 | void BaseExternalTable::parseStructureFromStructureField(const std::string & argument) |
62 | { |
63 | std::vector<std::string> vals = split(argument, " ," ); |
64 | |
65 | if (vals.size() & 1) |
66 | throw Exception("Odd number of attributes in section structure" , ErrorCodes::BAD_ARGUMENTS); |
67 | |
68 | for (size_t i = 0; i < vals.size(); i += 2) |
69 | structure.emplace_back(vals[i], vals[i + 1]); |
70 | } |
71 | |
72 | void BaseExternalTable::parseStructureFromTypesField(const std::string & argument) |
73 | { |
74 | std::vector<std::string> vals = split(argument, " ," ); |
75 | |
76 | for (size_t i = 0; i < vals.size(); ++i) |
77 | structure.emplace_back("_" + toString(i + 1), vals[i]); |
78 | } |
79 | |
80 | void BaseExternalTable::initSampleBlock() |
81 | { |
82 | const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); |
83 | |
84 | for (size_t i = 0; i < structure.size(); ++i) |
85 | { |
86 | ColumnWithTypeAndName column; |
87 | column.name = structure[i].first; |
88 | column.type = data_type_factory.get(structure[i].second); |
89 | column.column = column.type->createColumn(); |
90 | sample_block.insert(std::move(column)); |
91 | } |
92 | } |
93 | |
94 | |
95 | void ExternalTable::initReadBuffer() |
96 | { |
97 | if (file == "-" ) |
98 | read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO); |
99 | else |
100 | read_buffer = std::make_unique<ReadBufferFromFile>(file); |
101 | } |
102 | |
103 | ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options) |
104 | { |
105 | if (external_options.count("file" )) |
106 | file = external_options["file" ].as<std::string>(); |
107 | else |
108 | throw Exception("--file field have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
109 | |
110 | if (external_options.count("name" )) |
111 | name = external_options["name" ].as<std::string>(); |
112 | else |
113 | throw Exception("--name field have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
114 | |
115 | if (external_options.count("format" )) |
116 | format = external_options["format" ].as<std::string>(); |
117 | else |
118 | throw Exception("--format field have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
119 | |
120 | if (external_options.count("structure" )) |
121 | parseStructureFromStructureField(external_options["structure" ].as<std::string>()); |
122 | else if (external_options.count("types" )) |
123 | parseStructureFromTypesField(external_options["types" ].as<std::string>()); |
124 | else |
125 | throw Exception("Neither --structure nor --types have not been provided for external table" , ErrorCodes::BAD_ARGUMENTS); |
126 | } |
127 | |
128 | |
129 | void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & , std::istream & stream) |
130 | { |
131 | const Settings & settings = context.getSettingsRef(); |
132 | |
133 | /// The buffer is initialized here, not in the virtual function initReadBuffer |
134 | read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream); |
135 | |
136 | if (settings.http_max_multipart_form_data_size) |
137 | read_buffer = std::make_unique<LimitReadBuffer>( |
138 | *read_buffer_impl, settings.http_max_multipart_form_data_size, |
139 | true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting" ); |
140 | else |
141 | read_buffer = std::move(read_buffer_impl); |
142 | |
143 | /// Retrieve a collection of parameters from MessageHeader |
144 | Poco::Net::NameValueCollection content; |
145 | std::string label; |
146 | Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition" ), label, content); |
147 | |
148 | /// Get parameters |
149 | name = content.get("name" , "_data" ); |
150 | format = params.get(name + "_format" , "TabSeparated" ); |
151 | |
152 | if (params.has(name + "_structure" )) |
153 | parseStructureFromStructureField(params.get(name + "_structure" )); |
154 | else if (params.has(name + "_types" )) |
155 | parseStructureFromTypesField(params.get(name + "_types" )); |
156 | else |
157 | throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so." , ErrorCodes::BAD_ARGUMENTS); |
158 | |
159 | ExternalTableData data = getData(context); |
160 | |
161 | /// Create table |
162 | NamesAndTypesList columns = sample_block.getNamesAndTypesList(); |
163 | StoragePtr storage = StorageMemory::create("_external" , data.second, ColumnsDescription{columns}, ConstraintsDescription{}); |
164 | storage->startup(); |
165 | context.addExternalTable(data.second, storage); |
166 | BlockOutputStreamPtr output = storage->write(ASTPtr(), context); |
167 | |
168 | /// Write data |
169 | data.first->readPrefix(); |
170 | output->writePrefix(); |
171 | while (Block block = data.first->read()) |
172 | output->write(block); |
173 | data.first->readSuffix(); |
174 | output->writeSuffix(); |
175 | |
176 | /// We are ready to receive the next file, for this we clear all the information received |
177 | clean(); |
178 | } |
179 | |
180 | } |
181 | |