1 | #include <sys/stat.h> |
2 | #include <sys/types.h> |
3 | #include <errno.h> |
4 | |
5 | #include <map> |
6 | |
7 | #include <Poco/Path.h> |
8 | #include <Poco/Util/XMLConfiguration.h> |
9 | |
10 | #include <Common/escapeForFileName.h> |
11 | |
12 | #include <Common/Exception.h> |
13 | |
14 | #include <IO/ReadBufferFromFile.h> |
15 | #include <IO/WriteBufferFromFile.h> |
16 | #include <Compression/CompressedReadBuffer.h> |
17 | #include <Compression/CompressedWriteBuffer.h> |
18 | #include <IO/ReadHelpers.h> |
19 | #include <IO/WriteHelpers.h> |
20 | |
21 | #include <DataTypes/NestedUtils.h> |
22 | |
23 | #include <DataStreams/IBlockInputStream.h> |
24 | #include <DataStreams/IBlockOutputStream.h> |
25 | |
26 | #include <Columns/ColumnArray.h> |
27 | |
28 | #include <Common/typeid_cast.h> |
29 | #include <Compression/CompressionFactory.h> |
30 | |
31 | #include <Interpreters/Context.h> |
32 | |
33 | #include <Storages/StorageTinyLog.h> |
34 | #include <Storages/StorageFactory.h> |
35 | #include <Storages/CheckResults.h> |
36 | |
37 | #include <Poco/DirectoryIterator.h> |
38 | |
39 | #define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin" |
40 | |
41 | |
42 | namespace DB |
43 | { |
44 | |
45 | namespace ErrorCodes |
46 | { |
47 | extern const int EMPTY_LIST_OF_COLUMNS_PASSED; |
48 | extern const int CANNOT_CREATE_DIRECTORY; |
49 | extern const int CANNOT_READ_ALL_DATA; |
50 | extern const int DUPLICATE_COLUMN; |
51 | extern const int LOGICAL_ERROR; |
52 | extern const int INCORRECT_FILE_NAME; |
53 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
54 | } |
55 | |
56 | |
57 | class TinyLogBlockInputStream final : public IBlockInputStream |
58 | { |
59 | public: |
60 | TinyLogBlockInputStream(size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, size_t max_read_buffer_size_) |
61 | : block_size(block_size_), columns(columns_), |
62 | storage(storage_), lock(storage_.rwlock), |
63 | max_read_buffer_size(max_read_buffer_size_) {} |
64 | |
65 | String getName() const override { return "TinyLog" ; } |
66 | |
67 | Block () const override |
68 | { |
69 | Block res; |
70 | |
71 | for (const auto & name_type : columns) |
72 | res.insert({ name_type.type->createColumn(), name_type.type, name_type.name }); |
73 | |
74 | return Nested::flatten(res); |
75 | } |
76 | |
77 | protected: |
78 | Block readImpl() override; |
79 | private: |
80 | size_t block_size; |
81 | NamesAndTypesList columns; |
82 | StorageTinyLog & storage; |
83 | std::shared_lock<std::shared_mutex> lock; |
84 | bool finished = false; |
85 | size_t max_read_buffer_size; |
86 | |
87 | struct Stream |
88 | { |
89 | Stream(const std::string & data_path, size_t max_read_buffer_size_) |
90 | : plain(data_path, std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size_), Poco::File(data_path).getSize())), |
91 | compressed(plain) |
92 | { |
93 | } |
94 | |
95 | ReadBufferFromFile plain; |
96 | CompressedReadBuffer compressed; |
97 | }; |
98 | |
99 | using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; |
100 | FileStreams streams; |
101 | |
102 | using DeserializeState = IDataType::DeserializeBinaryBulkStatePtr; |
103 | using DeserializeStates = std::map<String, DeserializeState>; |
104 | DeserializeStates deserialize_states; |
105 | |
106 | void readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit); |
107 | }; |
108 | |
109 | |
110 | class TinyLogBlockOutputStream final : public IBlockOutputStream |
111 | { |
112 | public: |
113 | explicit TinyLogBlockOutputStream(StorageTinyLog & storage_) |
114 | : storage(storage_), lock(storage_.rwlock) |
115 | { |
116 | } |
117 | |
118 | ~TinyLogBlockOutputStream() override |
119 | { |
120 | try |
121 | { |
122 | writeSuffix(); |
123 | } |
124 | catch (...) |
125 | { |
126 | tryLogCurrentException(__PRETTY_FUNCTION__); |
127 | } |
128 | } |
129 | |
130 | Block () const override { return storage.getSampleBlock(); } |
131 | |
132 | void write(const Block & block) override; |
133 | void writeSuffix() override; |
134 | |
135 | private: |
136 | StorageTinyLog & storage; |
137 | std::unique_lock<std::shared_mutex> lock; |
138 | bool done = false; |
139 | |
140 | struct Stream |
141 | { |
142 | Stream(const std::string & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) : |
143 | plain(data_path, max_compress_block_size, O_APPEND | O_CREAT | O_WRONLY), |
144 | compressed(plain, std::move(codec), max_compress_block_size) |
145 | { |
146 | } |
147 | |
148 | WriteBufferFromFile plain; |
149 | CompressedWriteBuffer compressed; |
150 | |
151 | void finalize() |
152 | { |
153 | compressed.next(); |
154 | plain.next(); |
155 | } |
156 | }; |
157 | |
158 | using FileStreams = std::map<std::string, std::unique_ptr<Stream>>; |
159 | FileStreams streams; |
160 | |
161 | using SerializeState = IDataType::SerializeBinaryBulkStatePtr; |
162 | using SerializeStates = std::map<String, SerializeState>; |
163 | SerializeStates serialize_states; |
164 | |
165 | using WrittenStreams = std::set<std::string>; |
166 | |
167 | IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams); |
168 | void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams); |
169 | }; |
170 | |
171 | |
172 | Block TinyLogBlockInputStream::readImpl() |
173 | { |
174 | Block res; |
175 | |
176 | if (finished || (!streams.empty() && streams.begin()->second->compressed.eof())) |
177 | { |
178 | /** Close the files (before destroying the object). |
179 | * When many sources are created, but simultaneously reading only a few of them, |
180 | * buffers don't waste memory. |
181 | */ |
182 | finished = true; |
183 | streams.clear(); |
184 | return res; |
185 | } |
186 | |
187 | { |
188 | /// if there are no files in the folder, it means that the table is empty |
189 | if (Poco::DirectoryIterator(storage.fullPath()) == Poco::DirectoryIterator()) |
190 | return res; |
191 | } |
192 | |
193 | for (const auto & name_type : columns) |
194 | { |
195 | MutableColumnPtr column = name_type.type->createColumn(); |
196 | |
197 | try |
198 | { |
199 | readData(name_type.name, *name_type.type, *column, block_size); |
200 | } |
201 | catch (Exception & e) |
202 | { |
203 | e.addMessage("while reading column " + name_type.name + " at " + storage.fullPath()); |
204 | throw; |
205 | } |
206 | |
207 | if (column->size()) |
208 | res.insert(ColumnWithTypeAndName(std::move(column), name_type.type, name_type.name)); |
209 | } |
210 | |
211 | if (!res || streams.begin()->second->compressed.eof()) |
212 | { |
213 | finished = true; |
214 | streams.clear(); |
215 | } |
216 | |
217 | return Nested::flatten(res); |
218 | } |
219 | |
220 | |
221 | void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, UInt64 limit) |
222 | { |
223 | IDataType::DeserializeBinaryBulkSettings settings; /// TODO Use avg_value_size_hint. |
224 | settings.getter = [&] (const IDataType::SubstreamPath & path) -> ReadBuffer * |
225 | { |
226 | String stream_name = IDataType::getFileNameForStream(name, path); |
227 | |
228 | if (!streams.count(stream_name)) |
229 | streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), max_read_buffer_size); |
230 | |
231 | return &streams[stream_name]->compressed; |
232 | }; |
233 | |
234 | if (deserialize_states.count(name) == 0) |
235 | type.deserializeBinaryBulkStatePrefix(settings, deserialize_states[name]); |
236 | |
237 | type.deserializeBinaryBulkWithMultipleStreams(column, limit, settings, deserialize_states[name]); |
238 | } |
239 | |
240 | |
241 | IDataType::OutputStreamGetter TinyLogBlockOutputStream::createStreamGetter(const String & name, |
242 | WrittenStreams & written_streams) |
243 | { |
244 | return [&] (const IDataType::SubstreamPath & path) -> WriteBuffer * |
245 | { |
246 | String stream_name = IDataType::getFileNameForStream(name, path); |
247 | |
248 | if (!written_streams.insert(stream_name).second) |
249 | return nullptr; |
250 | |
251 | const auto & columns = storage.getColumns(); |
252 | if (!streams.count(stream_name)) |
253 | streams[stream_name] = std::make_unique<Stream>(storage.files[stream_name].data_file.path(), |
254 | columns.getCodecOrDefault(name), |
255 | storage.max_compress_block_size); |
256 | |
257 | return &streams[stream_name]->compressed; |
258 | }; |
259 | } |
260 | |
261 | |
262 | void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, WrittenStreams & written_streams) |
263 | { |
264 | IDataType::SerializeBinaryBulkSettings settings; |
265 | settings.getter = createStreamGetter(name, written_streams); |
266 | |
267 | if (serialize_states.count(name) == 0) |
268 | type.serializeBinaryBulkStatePrefix(settings, serialize_states[name]); |
269 | |
270 | type.serializeBinaryBulkWithMultipleStreams(column, 0, 0, settings, serialize_states[name]); |
271 | } |
272 | |
273 | |
274 | void TinyLogBlockOutputStream::writeSuffix() |
275 | { |
276 | if (done) |
277 | return; |
278 | done = true; |
279 | |
280 | /// If nothing was written - leave the table in initial state. |
281 | if (streams.empty()) |
282 | return; |
283 | |
284 | WrittenStreams written_streams; |
285 | IDataType::SerializeBinaryBulkSettings settings; |
286 | for (const auto & column : getHeader()) |
287 | { |
288 | auto it = serialize_states.find(column.name); |
289 | if (it != serialize_states.end()) |
290 | { |
291 | settings.getter = createStreamGetter(column.name, written_streams); |
292 | column.type->serializeBinaryBulkStateSuffix(settings, it->second); |
293 | } |
294 | } |
295 | |
296 | /// Finish write. |
297 | for (auto & stream : streams) |
298 | stream.second->finalize(); |
299 | |
300 | std::vector<Poco::File> column_files; |
301 | for (auto & pair : streams) |
302 | column_files.push_back(storage.files[pair.first].data_file); |
303 | |
304 | storage.file_checker.update(column_files.begin(), column_files.end()); |
305 | |
306 | streams.clear(); |
307 | } |
308 | |
309 | |
310 | void TinyLogBlockOutputStream::write(const Block & block) |
311 | { |
312 | storage.check(block, true); |
313 | |
314 | /// The set of written offset columns so that you do not write shared columns for nested structures multiple times |
315 | WrittenStreams written_streams; |
316 | |
317 | for (size_t i = 0; i < block.columns(); ++i) |
318 | { |
319 | const ColumnWithTypeAndName & column = block.safeGetByPosition(i); |
320 | writeData(column.name, *column.type, *column.column, written_streams); |
321 | } |
322 | } |
323 | |
324 | |
325 | StorageTinyLog::StorageTinyLog( |
326 | const std::string & relative_path_, |
327 | const std::string & database_name_, |
328 | const std::string & table_name_, |
329 | const ColumnsDescription & columns_, |
330 | const ConstraintsDescription & constraints_, |
331 | bool attach, |
332 | size_t max_compress_block_size_, |
333 | const Context & context_) |
334 | : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_), |
335 | max_compress_block_size(max_compress_block_size_), |
336 | file_checker(path + "sizes.json" ), |
337 | log(&Logger::get("StorageTinyLog" )) |
338 | { |
339 | setColumns(columns_); |
340 | setConstraints(constraints_); |
341 | |
342 | if (relative_path_.empty()) |
343 | throw Exception("Storage " + getName() + " requires data path" , ErrorCodes::INCORRECT_FILE_NAME); |
344 | |
345 | if (!attach) |
346 | Poco::File(path).createDirectories(); |
347 | |
348 | for (const auto & col : getColumns().getAllPhysical()) |
349 | addFiles(col.name, *col.type); |
350 | } |
351 | |
352 | |
353 | void StorageTinyLog::addFiles(const String & column_name, const IDataType & type) |
354 | { |
355 | if (files.end() != files.find(column_name)) |
356 | throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog." , |
357 | ErrorCodes::DUPLICATE_COLUMN); |
358 | |
359 | IDataType::StreamCallback stream_callback = [&] (const IDataType::SubstreamPath & substream_path) |
360 | { |
361 | String stream_name = IDataType::getFileNameForStream(column_name, substream_path); |
362 | if (!files.count(stream_name)) |
363 | { |
364 | ColumnData column_data; |
365 | files.insert(std::make_pair(stream_name, column_data)); |
366 | files[stream_name].data_file = Poco::File( |
367 | path + stream_name + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION); |
368 | } |
369 | }; |
370 | |
371 | IDataType::SubstreamPath substream_path; |
372 | type.enumerateStreams(stream_callback, substream_path); |
373 | } |
374 | |
375 | |
376 | void StorageTinyLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
377 | { |
378 | std::unique_lock<std::shared_mutex> lock(rwlock); |
379 | |
380 | /// Rename directory with data. |
381 | String new_path = base_path + new_path_to_table_data; |
382 | Poco::File(path).renameTo(new_path); |
383 | |
384 | path = new_path; |
385 | table_name = new_table_name; |
386 | database_name = new_database_name; |
387 | file_checker.setPath(path + "sizes.json" ); |
388 | |
389 | for (Files_t::iterator it = files.begin(); it != files.end(); ++it) |
390 | it->second.data_file = Poco::File(path + Poco::Path(it->second.data_file.path()).getFileName()); |
391 | } |
392 | |
393 | |
394 | BlockInputStreams StorageTinyLog::read( |
395 | const Names & column_names, |
396 | const SelectQueryInfo & /*query_info*/, |
397 | const Context & context, |
398 | QueryProcessingStage::Enum /*processed_stage*/, |
399 | const size_t max_block_size, |
400 | const unsigned /*num_streams*/) |
401 | { |
402 | check(column_names); |
403 | // When reading, we lock the entire storage, because we only have one file |
404 | // per column and can't modify it concurrently. |
405 | return BlockInputStreams(1, std::make_shared<TinyLogBlockInputStream>( |
406 | max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); |
407 | } |
408 | |
409 | |
410 | BlockOutputStreamPtr StorageTinyLog::write( |
411 | const ASTPtr & /*query*/, const Context & /*context*/) |
412 | { |
413 | return std::make_shared<TinyLogBlockOutputStream>(*this); |
414 | } |
415 | |
416 | |
417 | CheckResults StorageTinyLog::checkData(const ASTPtr & /* query */, const Context & /* context */) |
418 | { |
419 | std::shared_lock<std::shared_mutex> lock(rwlock); |
420 | return file_checker.check(); |
421 | } |
422 | |
423 | void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
424 | { |
425 | if (table_name.empty()) |
426 | throw Exception("Logical error: table name is empty" , ErrorCodes::LOGICAL_ERROR); |
427 | |
428 | std::unique_lock<std::shared_mutex> lock(rwlock); |
429 | |
430 | auto file = Poco::File(path); |
431 | file.remove(true); |
432 | file.createDirectories(); |
433 | |
434 | files.clear(); |
435 | file_checker = FileChecker{path + "sizes.json" }; |
436 | |
437 | for (const auto &column : getColumns().getAllPhysical()) |
438 | addFiles(column.name, *column.type); |
439 | } |
440 | |
441 | |
442 | void registerStorageTinyLog(StorageFactory & factory) |
443 | { |
444 | factory.registerStorage("TinyLog" , [](const StorageFactory::Arguments & args) |
445 | { |
446 | if (!args.engine_args.empty()) |
447 | throw Exception( |
448 | "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)" , |
449 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
450 | |
451 | return StorageTinyLog::create( |
452 | args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints, |
453 | args.attach, args.context.getSettings().max_compress_block_size, args.context); |
454 | }); |
455 | } |
456 | |
457 | } |
458 | |