1#include <sys/stat.h>
2#include <sys/types.h>
3#include <errno.h>
4
5#include <map>
6#include <optional>
7
8#include <Common/escapeForFileName.h>
9
10#include <Common/Exception.h>
11
12#include <IO/ReadBufferFromFile.h>
13#include <IO/WriteBufferFromFile.h>
14#include <Compression/CompressedReadBufferFromFile.h>
15#include <Compression/CompressedWriteBuffer.h>
16#include <IO/ReadHelpers.h>
17#include <IO/WriteHelpers.h>
18
19#include <DataStreams/IBlockInputStream.h>
20#include <DataStreams/IBlockOutputStream.h>
21#include <DataStreams/NativeBlockInputStream.h>
22#include <DataStreams/NativeBlockOutputStream.h>
23#include <DataStreams/NullBlockInputStream.h>
24
25#include <DataTypes/DataTypeFactory.h>
26
27#include <Columns/ColumnArray.h>
28
29#include <Interpreters/Context.h>
30
31#include <Storages/StorageStripeLog.h>
32#include <Storages/StorageFactory.h>
33#include <Poco/DirectoryIterator.h>
34
35
36namespace DB
37{
38
39#define INDEX_BUFFER_SIZE 4096
40
41namespace ErrorCodes
42{
43 extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
44 extern const int CANNOT_CREATE_DIRECTORY;
45 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
46 extern const int INCORRECT_FILE_NAME;
47 extern const int LOGICAL_ERROR;
48}
49
50
51class StripeLogBlockInputStream final : public IBlockInputStream
52{
53public:
54 StripeLogBlockInputStream(StorageStripeLog & storage_, size_t max_read_buffer_size_,
55 std::shared_ptr<const IndexForNativeFormat> & index_,
56 IndexForNativeFormat::Blocks::const_iterator index_begin_,
57 IndexForNativeFormat::Blocks::const_iterator index_end_)
58 : storage(storage_), max_read_buffer_size(max_read_buffer_size_),
59 index(index_), index_begin(index_begin_), index_end(index_end_)
60 {
61 if (index_begin != index_end)
62 {
63 for (const auto & column : index_begin->columns)
64 {
65 auto type = DataTypeFactory::instance().get(column.type);
66 header.insert(ColumnWithTypeAndName{ type, column.name });
67 }
68 }
69 }
70
71 String getName() const override { return "StripeLog"; }
72
73 Block getHeader() const override
74 {
75 return header;
76 }
77
78protected:
79 Block readImpl() override
80 {
81 Block res;
82 start();
83
84 if (block_in)
85 {
86 res = block_in->read();
87
88 /// Freeing memory before destroying the object.
89 if (!res)
90 {
91 block_in.reset();
92 data_in.reset();
93 index.reset();
94 }
95 }
96
97 return res;
98 }
99
100private:
101 StorageStripeLog & storage;
102 size_t max_read_buffer_size;
103
104 std::shared_ptr<const IndexForNativeFormat> index;
105 IndexForNativeFormat::Blocks::const_iterator index_begin;
106 IndexForNativeFormat::Blocks::const_iterator index_end;
107 Block header;
108
109 /** optional - to create objects only on first reading
110 * and delete objects (release buffers) after the source is exhausted
111 * - to save RAM when using a large number of sources.
112 */
113 bool started = false;
114 std::optional<CompressedReadBufferFromFile> data_in;
115 std::optional<NativeBlockInputStream> block_in;
116
117 void start()
118 {
119 if (!started)
120 {
121 started = true;
122
123 data_in.emplace(
124 storage.fullPath() + "data.bin", 0, 0,
125 std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size), Poco::File(storage.fullPath() + "data.bin").getSize()));
126
127 block_in.emplace(*data_in, 0, index_begin, index_end);
128 }
129 }
130};
131
132
133class StripeLogBlockOutputStream final : public IBlockOutputStream
134{
135public:
136 explicit StripeLogBlockOutputStream(StorageStripeLog & storage_)
137 : storage(storage_), lock(storage.rwlock),
138 data_out_compressed(storage.fullPath() + "data.bin", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
139 data_out(data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size),
140 index_out_compressed(storage.fullPath() + "index.mrk", INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT),
141 index_out(index_out_compressed),
142 block_out(data_out, 0, storage.getSampleBlock(), false, &index_out, Poco::File(storage.fullPath() + "data.bin").getSize())
143 {
144 }
145
146 ~StripeLogBlockOutputStream() override
147 {
148 try
149 {
150 writeSuffix();
151 }
152 catch (...)
153 {
154 tryLogCurrentException(__PRETTY_FUNCTION__);
155 }
156 }
157
158 Block getHeader() const override { return storage.getSampleBlock(); }
159
160 void write(const Block & block) override
161 {
162 block_out.write(block);
163 }
164
165 void writeSuffix() override
166 {
167 if (done)
168 return;
169
170 block_out.writeSuffix();
171 data_out.next();
172 data_out_compressed.next();
173 index_out.next();
174 index_out_compressed.next();
175
176 FileChecker::Files files{ data_out_compressed.getFileName(), index_out_compressed.getFileName() };
177 storage.file_checker.update(files.begin(), files.end());
178
179 done = true;
180 }
181
182private:
183 StorageStripeLog & storage;
184 std::unique_lock<std::shared_mutex> lock;
185
186 WriteBufferFromFile data_out_compressed;
187 CompressedWriteBuffer data_out;
188 WriteBufferFromFile index_out_compressed;
189 CompressedWriteBuffer index_out;
190 NativeBlockOutputStream block_out;
191
192 bool done = false;
193};
194
195
196StorageStripeLog::StorageStripeLog(
197 const std::string & relative_path_,
198 const std::string & database_name_,
199 const std::string & table_name_,
200 const ColumnsDescription & columns_,
201 const ConstraintsDescription & constraints_,
202 bool attach,
203 size_t max_compress_block_size_,
204 const Context & context_)
205 : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_),
206 max_compress_block_size(max_compress_block_size_),
207 file_checker(path + "sizes.json"),
208 log(&Logger::get("StorageStripeLog"))
209{
210 setColumns(columns_);
211 setConstraints(constraints_);
212
213 if (relative_path_.empty())
214 throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
215
216 if (!attach)
217 Poco::File(path).createDirectories();
218}
219
220
221void StorageStripeLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
222{
223 std::unique_lock<std::shared_mutex> lock(rwlock);
224
225 /// Rename directory with data.
226 String new_path = base_path + new_path_to_table_data;
227 Poco::File(path).renameTo(new_path);
228
229 path = new_path;
230 table_name = new_table_name;
231 database_name = new_database_name;
232 file_checker.setPath(path + "sizes.json");
233}
234
235
236BlockInputStreams StorageStripeLog::read(
237 const Names & column_names,
238 const SelectQueryInfo & /*query_info*/,
239 const Context & context,
240 QueryProcessingStage::Enum /*processed_stage*/,
241 const size_t /*max_block_size*/,
242 unsigned num_streams)
243{
244 std::shared_lock<std::shared_mutex> lock(rwlock);
245
246 check(column_names);
247
248 NameSet column_names_set(column_names.begin(), column_names.end());
249
250 if (!Poco::File(fullPath() + "index.mrk").exists())
251 return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) };
252
253 CompressedReadBufferFromFile index_in(fullPath() + "index.mrk", 0, 0, INDEX_BUFFER_SIZE);
254 std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)};
255
256 BlockInputStreams res;
257
258 size_t size = index->blocks.size();
259 if (num_streams > size)
260 num_streams = size;
261
262 for (size_t stream = 0; stream < num_streams; ++stream)
263 {
264 IndexForNativeFormat::Blocks::const_iterator begin = index->blocks.begin();
265 IndexForNativeFormat::Blocks::const_iterator end = index->blocks.begin();
266
267 std::advance(begin, stream * size / num_streams);
268 std::advance(end, (stream + 1) * size / num_streams);
269
270 res.emplace_back(std::make_shared<StripeLogBlockInputStream>(
271 *this, context.getSettingsRef().max_read_buffer_size, index, begin, end));
272 }
273
274 /// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.
275
276 return res;
277}
278
279
280BlockOutputStreamPtr StorageStripeLog::write(
281 const ASTPtr & /*query*/, const Context & /*context*/)
282{
283 return std::make_shared<StripeLogBlockOutputStream>(*this);
284}
285
286
287CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */)
288{
289 std::shared_lock<std::shared_mutex> lock(rwlock);
290 return file_checker.check();
291}
292
293void StorageStripeLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
294{
295 if (table_name.empty())
296 throw Exception("Logical error: table name is empty", ErrorCodes::LOGICAL_ERROR);
297
298 std::shared_lock<std::shared_mutex> lock(rwlock);
299
300 auto file = Poco::File(path);
301 file.remove(true);
302 file.createDirectories();
303
304 file_checker = FileChecker{path + "sizes.json"};
305}
306
307
308void registerStorageStripeLog(StorageFactory & factory)
309{
310 factory.registerStorage("StripeLog", [](const StorageFactory::Arguments & args)
311 {
312 if (!args.engine_args.empty())
313 throw Exception(
314 "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
315 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
316
317 return StorageStripeLog::create(
318 args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints,
319 args.attach, args.context.getSettings().max_compress_block_size, args.context);
320 });
321}
322
323}
324