1 | #include <sys/stat.h> |
2 | #include <sys/types.h> |
3 | #include <errno.h> |
4 | |
5 | #include <map> |
6 | #include <optional> |
7 | |
8 | #include <Common/escapeForFileName.h> |
9 | |
10 | #include <Common/Exception.h> |
11 | |
12 | #include <IO/ReadBufferFromFile.h> |
13 | #include <IO/WriteBufferFromFile.h> |
14 | #include <Compression/CompressedReadBufferFromFile.h> |
15 | #include <Compression/CompressedWriteBuffer.h> |
16 | #include <IO/ReadHelpers.h> |
17 | #include <IO/WriteHelpers.h> |
18 | |
19 | #include <DataStreams/IBlockInputStream.h> |
20 | #include <DataStreams/IBlockOutputStream.h> |
21 | #include <DataStreams/NativeBlockInputStream.h> |
22 | #include <DataStreams/NativeBlockOutputStream.h> |
23 | #include <DataStreams/NullBlockInputStream.h> |
24 | |
25 | #include <DataTypes/DataTypeFactory.h> |
26 | |
27 | #include <Columns/ColumnArray.h> |
28 | |
29 | #include <Interpreters/Context.h> |
30 | |
31 | #include <Storages/StorageStripeLog.h> |
32 | #include <Storages/StorageFactory.h> |
33 | #include <Poco/DirectoryIterator.h> |
34 | |
35 | |
36 | namespace DB |
37 | { |
38 | |
39 | #define INDEX_BUFFER_SIZE 4096 |
40 | |
41 | namespace ErrorCodes |
42 | { |
43 | extern const int EMPTY_LIST_OF_COLUMNS_PASSED; |
44 | extern const int CANNOT_CREATE_DIRECTORY; |
45 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
46 | extern const int INCORRECT_FILE_NAME; |
47 | extern const int LOGICAL_ERROR; |
48 | } |
49 | |
50 | |
51 | class StripeLogBlockInputStream final : public IBlockInputStream |
52 | { |
53 | public: |
54 | StripeLogBlockInputStream(StorageStripeLog & storage_, size_t max_read_buffer_size_, |
55 | std::shared_ptr<const IndexForNativeFormat> & index_, |
56 | IndexForNativeFormat::Blocks::const_iterator index_begin_, |
57 | IndexForNativeFormat::Blocks::const_iterator index_end_) |
58 | : storage(storage_), max_read_buffer_size(max_read_buffer_size_), |
59 | index(index_), index_begin(index_begin_), index_end(index_end_) |
60 | { |
61 | if (index_begin != index_end) |
62 | { |
63 | for (const auto & column : index_begin->columns) |
64 | { |
65 | auto type = DataTypeFactory::instance().get(column.type); |
66 | header.insert(ColumnWithTypeAndName{ type, column.name }); |
67 | } |
68 | } |
69 | } |
70 | |
71 | String getName() const override { return "StripeLog" ; } |
72 | |
73 | Block () const override |
74 | { |
75 | return header; |
76 | } |
77 | |
78 | protected: |
79 | Block readImpl() override |
80 | { |
81 | Block res; |
82 | start(); |
83 | |
84 | if (block_in) |
85 | { |
86 | res = block_in->read(); |
87 | |
88 | /// Freeing memory before destroying the object. |
89 | if (!res) |
90 | { |
91 | block_in.reset(); |
92 | data_in.reset(); |
93 | index.reset(); |
94 | } |
95 | } |
96 | |
97 | return res; |
98 | } |
99 | |
100 | private: |
101 | StorageStripeLog & storage; |
102 | size_t max_read_buffer_size; |
103 | |
104 | std::shared_ptr<const IndexForNativeFormat> index; |
105 | IndexForNativeFormat::Blocks::const_iterator index_begin; |
106 | IndexForNativeFormat::Blocks::const_iterator index_end; |
107 | Block ; |
108 | |
109 | /** optional - to create objects only on first reading |
110 | * and delete objects (release buffers) after the source is exhausted |
111 | * - to save RAM when using a large number of sources. |
112 | */ |
113 | bool started = false; |
114 | std::optional<CompressedReadBufferFromFile> data_in; |
115 | std::optional<NativeBlockInputStream> block_in; |
116 | |
117 | void start() |
118 | { |
119 | if (!started) |
120 | { |
121 | started = true; |
122 | |
123 | data_in.emplace( |
124 | storage.fullPath() + "data.bin" , 0, 0, |
125 | std::min(static_cast<Poco::File::FileSize>(max_read_buffer_size), Poco::File(storage.fullPath() + "data.bin" ).getSize())); |
126 | |
127 | block_in.emplace(*data_in, 0, index_begin, index_end); |
128 | } |
129 | } |
130 | }; |
131 | |
132 | |
133 | class StripeLogBlockOutputStream final : public IBlockOutputStream |
134 | { |
135 | public: |
136 | explicit StripeLogBlockOutputStream(StorageStripeLog & storage_) |
137 | : storage(storage_), lock(storage.rwlock), |
138 | data_out_compressed(storage.fullPath() + "data.bin" , DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), |
139 | data_out(data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size), |
140 | index_out_compressed(storage.fullPath() + "index.mrk" , INDEX_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT), |
141 | index_out(index_out_compressed), |
142 | block_out(data_out, 0, storage.getSampleBlock(), false, &index_out, Poco::File(storage.fullPath() + "data.bin" ).getSize()) |
143 | { |
144 | } |
145 | |
146 | ~StripeLogBlockOutputStream() override |
147 | { |
148 | try |
149 | { |
150 | writeSuffix(); |
151 | } |
152 | catch (...) |
153 | { |
154 | tryLogCurrentException(__PRETTY_FUNCTION__); |
155 | } |
156 | } |
157 | |
158 | Block () const override { return storage.getSampleBlock(); } |
159 | |
160 | void write(const Block & block) override |
161 | { |
162 | block_out.write(block); |
163 | } |
164 | |
165 | void writeSuffix() override |
166 | { |
167 | if (done) |
168 | return; |
169 | |
170 | block_out.writeSuffix(); |
171 | data_out.next(); |
172 | data_out_compressed.next(); |
173 | index_out.next(); |
174 | index_out_compressed.next(); |
175 | |
176 | FileChecker::Files files{ data_out_compressed.getFileName(), index_out_compressed.getFileName() }; |
177 | storage.file_checker.update(files.begin(), files.end()); |
178 | |
179 | done = true; |
180 | } |
181 | |
182 | private: |
183 | StorageStripeLog & storage; |
184 | std::unique_lock<std::shared_mutex> lock; |
185 | |
186 | WriteBufferFromFile data_out_compressed; |
187 | CompressedWriteBuffer data_out; |
188 | WriteBufferFromFile index_out_compressed; |
189 | CompressedWriteBuffer index_out; |
190 | NativeBlockOutputStream block_out; |
191 | |
192 | bool done = false; |
193 | }; |
194 | |
195 | |
196 | StorageStripeLog::StorageStripeLog( |
197 | const std::string & relative_path_, |
198 | const std::string & database_name_, |
199 | const std::string & table_name_, |
200 | const ColumnsDescription & columns_, |
201 | const ConstraintsDescription & constraints_, |
202 | bool attach, |
203 | size_t max_compress_block_size_, |
204 | const Context & context_) |
205 | : base_path(context_.getPath()), path(base_path + relative_path_), table_name(table_name_), database_name(database_name_), |
206 | max_compress_block_size(max_compress_block_size_), |
207 | file_checker(path + "sizes.json" ), |
208 | log(&Logger::get("StorageStripeLog" )) |
209 | { |
210 | setColumns(columns_); |
211 | setConstraints(constraints_); |
212 | |
213 | if (relative_path_.empty()) |
214 | throw Exception("Storage " + getName() + " requires data path" , ErrorCodes::INCORRECT_FILE_NAME); |
215 | |
216 | if (!attach) |
217 | Poco::File(path).createDirectories(); |
218 | } |
219 | |
220 | |
221 | void StorageStripeLog::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
222 | { |
223 | std::unique_lock<std::shared_mutex> lock(rwlock); |
224 | |
225 | /// Rename directory with data. |
226 | String new_path = base_path + new_path_to_table_data; |
227 | Poco::File(path).renameTo(new_path); |
228 | |
229 | path = new_path; |
230 | table_name = new_table_name; |
231 | database_name = new_database_name; |
232 | file_checker.setPath(path + "sizes.json" ); |
233 | } |
234 | |
235 | |
236 | BlockInputStreams StorageStripeLog::read( |
237 | const Names & column_names, |
238 | const SelectQueryInfo & /*query_info*/, |
239 | const Context & context, |
240 | QueryProcessingStage::Enum /*processed_stage*/, |
241 | const size_t /*max_block_size*/, |
242 | unsigned num_streams) |
243 | { |
244 | std::shared_lock<std::shared_mutex> lock(rwlock); |
245 | |
246 | check(column_names); |
247 | |
248 | NameSet column_names_set(column_names.begin(), column_names.end()); |
249 | |
250 | if (!Poco::File(fullPath() + "index.mrk" ).exists()) |
251 | return { std::make_shared<NullBlockInputStream>(getSampleBlockForColumns(column_names)) }; |
252 | |
253 | CompressedReadBufferFromFile index_in(fullPath() + "index.mrk" , 0, 0, INDEX_BUFFER_SIZE); |
254 | std::shared_ptr<const IndexForNativeFormat> index{std::make_shared<IndexForNativeFormat>(index_in, column_names_set)}; |
255 | |
256 | BlockInputStreams res; |
257 | |
258 | size_t size = index->blocks.size(); |
259 | if (num_streams > size) |
260 | num_streams = size; |
261 | |
262 | for (size_t stream = 0; stream < num_streams; ++stream) |
263 | { |
264 | IndexForNativeFormat::Blocks::const_iterator begin = index->blocks.begin(); |
265 | IndexForNativeFormat::Blocks::const_iterator end = index->blocks.begin(); |
266 | |
267 | std::advance(begin, stream * size / num_streams); |
268 | std::advance(end, (stream + 1) * size / num_streams); |
269 | |
270 | res.emplace_back(std::make_shared<StripeLogBlockInputStream>( |
271 | *this, context.getSettingsRef().max_read_buffer_size, index, begin, end)); |
272 | } |
273 | |
274 | /// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change. |
275 | |
276 | return res; |
277 | } |
278 | |
279 | |
280 | BlockOutputStreamPtr StorageStripeLog::write( |
281 | const ASTPtr & /*query*/, const Context & /*context*/) |
282 | { |
283 | return std::make_shared<StripeLogBlockOutputStream>(*this); |
284 | } |
285 | |
286 | |
287 | CheckResults StorageStripeLog::checkData(const ASTPtr & /* query */, const Context & /* context */) |
288 | { |
289 | std::shared_lock<std::shared_mutex> lock(rwlock); |
290 | return file_checker.check(); |
291 | } |
292 | |
293 | void StorageStripeLog::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
294 | { |
295 | if (table_name.empty()) |
296 | throw Exception("Logical error: table name is empty" , ErrorCodes::LOGICAL_ERROR); |
297 | |
298 | std::shared_lock<std::shared_mutex> lock(rwlock); |
299 | |
300 | auto file = Poco::File(path); |
301 | file.remove(true); |
302 | file.createDirectories(); |
303 | |
304 | file_checker = FileChecker{path + "sizes.json" }; |
305 | } |
306 | |
307 | |
308 | void registerStorageStripeLog(StorageFactory & factory) |
309 | { |
310 | factory.registerStorage("StripeLog" , [](const StorageFactory::Arguments & args) |
311 | { |
312 | if (!args.engine_args.empty()) |
313 | throw Exception( |
314 | "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)" , |
315 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
316 | |
317 | return StorageStripeLog::create( |
318 | args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints, |
319 | args.attach, args.context.getSettings().max_compress_block_size, args.context); |
320 | }); |
321 | } |
322 | |
323 | } |
324 | |