1#include <sys/stat.h>
2#include <sys/types.h>
3#include <errno.h>
4
5#include <map>
6
7#include <Poco/Path.h>
8#include <Poco/Util/XMLConfiguration.h>
9
10#include <Common/escapeForFileName.h>
11
12#include <Common/Exception.h>
13
14#include <IO/ReadBufferFromFile.h>
15#include <IO/WriteBufferFromFile.h>
16#include <Compression/CompressedReadBuffer.h>
17#include <Compression/CompressedWriteBuffer.h>
18#include <IO/ReadHelpers.h>
19#include <IO/WriteHelpers.h>
20
21#include <DataTypes/NestedUtils.h>
22
23#include <DataStreams/IBlockInputStream.h>
24#include <DataStreams/IBlockOutputStream.h>
25
26#include <Columns/ColumnArray.h>
27
28#include <Common/typeid_cast.h>
29#include <Compression/CompressionFactory.h>
30
31#include <Interpreters/Context.h>
32
33#include <Storages/StorageTinyLog.h>
34#include <Storages/StorageFactory.h>
35#include <Storages/CheckResults.h>
36
37#include <Poco/DirectoryIterator.h>
38
39#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
40
41
42namespace DB
43{
44
45namespace ErrorCodes
46{
47 extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
48 extern const int CANNOT_CREATE_DIRECTORY;
49 extern const int CANNOT_READ_ALL_DATA;
50 extern const int DUPLICATE_COLUMN;
51 extern const int LOGICAL_ERROR;
52 extern const int INCORRECT_FILE_NAME;
53 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
54}
55
56
57class TinyLogBlockInputStream final : public IBlockInputStream
58{
59public:
60 TinyLogBlockInputStream(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_)
61 : block_size(block_size_), columns(columns_),
62 storage(storage_), lock(storage_.rwlock),
63 max_read_buffer_size(max_read_buffer_size_) {}
64
65 String getName() const override { return "TinyLog"; }
66
67 Block getHeader() const override
68 {
69 Block res;
70
71 for (const auto & name_type : columns)
72 res.insert({ name_type.type->createColumn(), name_type.type, name_type.name });
73
74 return Nested::flatten(res);
75 }
76
77protected:
78 Block readImpl() override;
79private:
80 size_t block_size;
81 NamesAndTypesList columns;
82 StorageTinyLog & storage;
83 std::shared_lock<std::shared_mutex> lock;
84 bool finished = false;
85 size_t max_read_buffer_size;
86
87 struct Stream
88 {
89 Stream(const std::string & data_path, size_t max_read_buffer_size_)
90 : plain(data_path, std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size_), Poco::File(data_path).getSize())),
91 compressed(plain)
92 {
93 }
94
95 ReadBufferFromFile plain;
96 CompressedReadBuffer compressed;
97 };
98
99 using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
100 FileStreams streams;
101
102 using DeserializeState = IDataType::DeserializeBinaryBulkStatePtr;
103 using DeserializeStates = std::map<String, DeserializeState>;
104 DeserializeStates deserialize_states;
105
106 void readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit);
107};
108
109
110class TinyLogBlockOutputStream final : public IBlockOutputStream
111{
112public:
113 explicit TinyLogBlockOutputStream(StorageTinyLog & storage_)
114 : storage(storage_), lock(storage_.rwlock)
115 {
116 }
117
118 ~TinyLogBlockOutputStream() override
119 {
120 try
121 {
122 writeSuffix();
123 }
124 catch (...)
125 {
126 tryLogCurrentException(__PRETTY_FUNCTION__);
127 }
128 }
129
130 Block getHeader() const override { return storage.getSampleBlock(); }
131
132 void write(const Block & block) override;
133 void writeSuffix() override;
134
135private:
136 StorageTinyLog & storage;
137 std::unique_lock<std::shared_mutex> lock;
138 bool done = false;
139
140 struct Stream
141 {
142 Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
143 plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY),
144 compressed(plain, std::move(codec), max_compress_block_size)
145 {
146 }
147
148 WriteBufferFromFile plain;
149 CompressedWriteBuffer compressed;
150
151 void finalize()
152 {
153 compressed.next();
154 plain.next();
155 }
156 };
157
158 using FileStreams = std::map<std::string, std::unique_ptr<Stream>>;
159 FileStreams streams;
160
161 using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
162 using SerializeStates = std::map<String, SerializeState>;
163 SerializeStates serialize_states;
164
165 using WrittenStreams = std::set<std::string>;
166
167 IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
168 void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams);
169};
170
171
172Block TinyLogBlockInputStream::readImpl()
173{
174 Block res;
175
176 if (finished || (!streams.empty() && streams.begin()->second->compressed.eof()))
177 {
178 /** Close the files (before destroying the object).
179 * When many sources are created, but simultaneously reading only a few of them,
180 * buffers don't waste memory.
181 */
182 finished = true;
183 streams.clear();
184 return res;
185 }
186
187 {
188 /// if there are no files in the folder, it means that the table is empty
189 if (Poco::DirectoryIterator(storage.fullPath()) == Poco::DirectoryIterator())
190 return res;
191 }
192
193 for (const auto & name_type : columns)
194 {
195 MutableColumnPtr column = name_type.type->createColumn();
196
197 try
198 {
199 readData(name_type.name, *name_type.type, *column, block_size);
200 }
201 catch (Exception & e)
202 {
203 e.addMessage("while reading column " + name_type.name + " at " + storage.fullPath());
204 throw;
205 }
206
207 if (column->size())
208 res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name));
209 }
210
211 if (!res || streams.begin()->second->compressed.eof())
212 {
213 finished = true;
214 streams.clear();
215 }
216
217 return Nested::flatten(res);
218}
219
220
221void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit)
222{
223 IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint.
224 settings.getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer *
225 {
226 String stream_name = IDataType::getFileNameForStream(name, path);
227
228 if (!streams.count(stream_name))
229 streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), max_read_buffer_size);
230
231 return &streams[stream_name]->compressed;
232 };
233
234 if (deserialize_states.count(name) == 0)
235 type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]);
236
237 type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]);
238}
239
240
241IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const String & name,
242 WrittenStreams & written_streams)
243{
244 return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer *
245 {
246 String stream_name = IDataType::getFileNameForStream(name, path);
247
248 if (!written_streams.insert(stream_name).second)
249 return nullptr;
250
251 const auto & columns = storage.getColumns();
252 if (!streams.count(stream_name))
253 streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(),
254 columns.getCodecOrDefault(name),
255 storage.max_compress_block_size);
256
257 return &streams[stream_name]->compressed;
258 };
259}
260
261
262void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams)
263{
264 IDataType::SerializeBinaryBulkSettings settings;
265 settings.getter = createStreamGetter(name, written_streams);
266
267 if (serialize_states.count(name) == 0)
268 type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]);
269
270 type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]);
271}
272
273
274void TinyLogBlockOutputStream::writeSuffix()
275{
276 if (done)
277 return;
278 done = true;
279
280 /// If nothing was written - leave the table in initial state.
281 if (streams.empty())
282 return;
283
284 WrittenStreams written_streams;
285 IDataType::SerializeBinaryBulkSettings settings;
286 for (const auto & column : getHeader())
287 {
288 auto it = serialize_states.find(column.name);
289 if (it != serialize_states.end())
290 {
291 settings.getter = createStreamGetter(column.name, written_streams);
292 column.type->serializeBinaryBulkStateSuffix(settings, it->second);
293 }
294 }
295
296 /// Finish write.
297 for (auto & stream : streams)
298 stream.second->finalize();
299
300 std::vector<Poco::File> column_files;
301 for (auto & pair : streams)
302 column_files.push_back(storage.files[pair.first].data_file);
303
304 storage.file_checker.update(column_files.begin(), column_files.end());
305
306 streams.clear();
307}
308
309
310void TinyLogBlockOutputStream::write(const Block & block)
311{
312 storage.check(block, true);
313
314 /// The set of written offset columns so that you do not write shared columns for nested structures multiple times
315 WrittenStreams written_streams;
316
317 for (size_t i = 0; i < block.columns(); ++i)
318 {
319 const ColumnWithTypeAndName & column = block.safeGetByPosition(i);
320 writeData(column.name, *column.type, *column.column, written_streams);
321 }
322}
323
324
325StorageTinyLog::StorageTinyLog(
326 const std::string & relative_path_,
327 const std::string & database_name_,
328 const std::string & table_name_,
329 const ColumnsDescription & columns_,
330 const ConstraintsDescription & constraints_,
331 bool attach,
332 size_t max_compress_block_size_,
333 const Context & context_)
334 : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_),
335 max_compress_block_size(max_compress_block_size_),
336 file_checker(path + "sizes.json"),
337 log(&Logger::get("StorageTinyLog"))
338{
339 setColumns(columns_);
340 setConstraints(constraints_);
341
342 if (relative_path_.empty())
343 throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
344
345 if (!attach)
346 Poco::File(path).createDirectories();
347
348 for (const auto & col : getColumns().getAllPhysical())
349 addFiles(col.name, *col.type);
350}
351
352
353void StorageTinyLog::addFiles(const String & column_name, const IDataType & type)
354{
355 if (files.end() != files.find(column_name))
356 throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.",
357 ErrorCodes::DUPLICATE_COLUMN);
358
359 IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path)
360 {
361 String stream_name = IDataType::getFileNameForStream(column_name, substream_path);
362 if (!files.count(stream_name))
363 {
364 ColumnData column_data;
365 files.insert(std::make_pair(stream_name, column_data));
366 files[stream_name].data_file = Poco::File(
367 path + stream_name + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
368 }
369 };
370
371 IDataType::SubstreamPath substream_path;
372 type.enumerateStreams(stream_callback, substream_path);
373}
374
375
376void StorageTinyLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
377{
378 std::unique_lock<std::shared_mutex> lock(rwlock);
379
380 /// Rename directory with data.
381 String new_path = base_path + new_path_to_table_data;
382 Poco::File(path).renameTo(new_path);
383
384 path = new_path;
385 table_name = new_table_name;
386 database_name = new_database_name;
387 file_checker.setPath(path + "sizes.json");
388
389 for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
390 it->second.data_file = Poco::File(path + Poco::Path(it->second.data_file.path()).getFileName());
391}
392
393
394BlockInputStreams StorageTinyLog::read(
395 const Names & column_names,
396 const SelectQueryInfo & /*query_info*/,
397 const Context & context,
398 QueryProcessingStage::Enum /*processed_stage*/,
399 const size_t max_block_size,
400 const unsigned /*num_streams*/)
401{
402 check(column_names);
403 // When reading, we lock the entire storage, because we only have one file
404 // per column and can't modify it concurrently.
405 return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>(
406 max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
407}
408
409
410BlockOutputStreamPtr StorageTinyLog::write(
411 const ASTPtr & /*query*/, const Context & /*context*/)
412{
413 return std::make_shared<TinyLogBlockOutputStream>(*this);
414}
415
416
417CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
418{
419 std::shared_lock<std::shared_mutex> lock(rwlock);
420 return file_checker.check();
421}
422
423void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
424{
425 if (table_name.empty())
426 throw Exception("Logical error: table name is empty", ErrorCodes::LOGICAL_ERROR);
427
428 std::unique_lock<std::shared_mutex> lock(rwlock);
429
430 auto file = Poco::File(path);
431 file.remove(true);
432 file.createDirectories();
433
434 files.clear();
435 file_checker = FileChecker{path + "sizes.json"};
436
437 for (const auto &column : getColumns().getAllPhysical())
438 addFiles(column.name, *column.type);
439}
440
441
442void registerStorageTinyLog(StorageFactory & factory)
443{
444 factory.registerStorage("TinyLog", [](const StorageFactory::Arguments & args)
445 {
446 if (!args.engine_args.empty())
447 throw Exception(
448 "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
449 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
450
451 return StorageTinyLog::create(
452 args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints,
453 args.attach, args.context.getSettings().max_compress_block_size, args.context);
454 });
455}
456
457}
458