1#include <boost/program_options.hpp>
2#include <boost/algorithm/string.hpp>
3#include <DataStreams/AsynchronousBlockInputStream.h>
4#include <DataTypes/DataTypeFactory.h>
5#include <Interpreters/Context.h>
6#include <IO/copyData.h>
7#include <IO/ReadBufferFromIStream.h>
8#include <IO/ReadBufferFromFile.h>
9#include <IO/LimitReadBuffer.h>
10#include <Storages/StorageMemory.h>
11#include <Poco/Net/MessageHeader.h>
12
13#include <Core/ExternalTable.h>
14
15
16namespace DB
17{
18
19namespace ErrorCodes
20{
21 extern const int BAD_ARGUMENTS;
22}
23
24
25ExternalTableData BaseExternalTable::getData(const Context & context)
26{
27 initReadBuffer();
28 initSampleBlock();
29 auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
30 return std::make_pair(std::make_shared<AsynchronousBlockInputStream>(input), name);
31}
32
33void BaseExternalTable::clean()
34{
35 name = "";
36 file = "";
37 format = "";
38 structure.clear();
39 sample_block = Block();
40 read_buffer.reset();
41}
42
43/// Function for debugging information output
44void BaseExternalTable::write()
45{
46 std::cerr << "file " << file << std::endl;
47 std::cerr << "name " << name << std::endl;
48 std::cerr << "format " << format << std::endl;
49 std::cerr << "structure: \n";
50 for (size_t i = 0; i < structure.size(); ++i)
51 std::cerr << "\t" << structure[i].first << " " << structure[i].second << std::endl;
52}
53
54std::vector<std::string> BaseExternalTable::split(const std::string & s, const std::string & d)
55{
56 std::vector<std::string> res;
57 boost::split(res, s, boost::algorithm::is_any_of(d), boost::algorithm::token_compress_on);
58 return res;
59}
60
61void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
62{
63 std::vector<std::string> vals = split(argument, " ,");
64
65 if (vals.size() & 1)
66 throw Exception("Odd number of attributes in section structure", ErrorCodes::BAD_ARGUMENTS);
67
68 for (size_t i = 0; i < vals.size(); i += 2)
69 structure.emplace_back(vals[i], vals[i + 1]);
70}
71
72void BaseExternalTable::parseStructureFromTypesField(const std::string & argument)
73{
74 std::vector<std::string> vals = split(argument, " ,");
75
76 for (size_t i = 0; i < vals.size(); ++i)
77 structure.emplace_back("_" + toString(i + 1), vals[i]);
78}
79
80void BaseExternalTable::initSampleBlock()
81{
82 const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
83
84 for (size_t i = 0; i < structure.size(); ++i)
85 {
86 ColumnWithTypeAndName column;
87 column.name = structure[i].first;
88 column.type = data_type_factory.get(structure[i].second);
89 column.column = column.type->createColumn();
90 sample_block.insert(std::move(column));
91 }
92}
93
94
95void ExternalTable::initReadBuffer()
96{
97 if (file == "-")
98 read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
99 else
100 read_buffer = std::make_unique<ReadBufferFromFile>(file);
101}
102
103ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options)
104{
105 if (external_options.count("file"))
106 file = external_options["file"].as<std::string>();
107 else
108 throw Exception("--file field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
109
110 if (external_options.count("name"))
111 name = external_options["name"].as<std::string>();
112 else
113 throw Exception("--name field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
114
115 if (external_options.count("format"))
116 format = external_options["format"].as<std::string>();
117 else
118 throw Exception("--format field have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
119
120 if (external_options.count("structure"))
121 parseStructureFromStructureField(external_options["structure"].as<std::string>());
122 else if (external_options.count("types"))
123 parseStructureFromTypesField(external_options["types"].as<std::string>());
124 else
125 throw Exception("Neither --structure nor --types have not been provided for external table", ErrorCodes::BAD_ARGUMENTS);
126}
127
128
129void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, std::istream & stream)
130{
131 const Settings & settings = context.getSettingsRef();
132
133 /// The buffer is initialized here, not in the virtual function initReadBuffer
134 read_buffer_impl = std::make_unique<ReadBufferFromIStream>(stream);
135
136 if (settings.http_max_multipart_form_data_size)
137 read_buffer = std::make_unique<LimitReadBuffer>(
138 *read_buffer_impl, settings.http_max_multipart_form_data_size,
139 true, "the maximum size of multipart/form-data. This limit can be tuned by 'http_max_multipart_form_data_size' setting");
140 else
141 read_buffer = std::move(read_buffer_impl);
142
143 /// Retrieve a collection of parameters from MessageHeader
144 Poco::Net::NameValueCollection content;
145 std::string label;
146 Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content);
147
148 /// Get parameters
149 name = content.get("name", "_data");
150 format = params.get(name + "_format", "TabSeparated");
151
152 if (params.has(name + "_structure"))
153 parseStructureFromStructureField(params.get(name + "_structure"));
154 else if (params.has(name + "_types"))
155 parseStructureFromTypesField(params.get(name + "_types"));
156 else
157 throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so.", ErrorCodes::BAD_ARGUMENTS);
158
159 ExternalTableData data = getData(context);
160
161 /// Create table
162 NamesAndTypesList columns = sample_block.getNamesAndTypesList();
163 StoragePtr storage = StorageMemory::create("_external", data.second, ColumnsDescription{columns}, ConstraintsDescription{});
164 storage->startup();
165 context.addExternalTable(data.second, storage);
166 BlockOutputStreamPtr output = storage->write(ASTPtr(), context);
167
168 /// Write data
169 data.first->readPrefix();
170 output->writePrefix();
171 while (Block block = data.first->read())
172 output->write(block);
173 data.first->readSuffix();
174 output->writeSuffix();
175
176 /// We are ready to receive the next file, for this we clear all the information received
177 clean();
178}
179
180}
181