1 | #include <Storages/StorageSet.h> |
2 | #include <Storages/StorageFactory.h> |
3 | #include <IO/ReadBufferFromFile.h> |
4 | #include <Compression/CompressedReadBuffer.h> |
5 | #include <IO/WriteBufferFromFile.h> |
6 | #include <Compression/CompressedWriteBuffer.h> |
7 | #include <DataStreams/NativeBlockOutputStream.h> |
8 | #include <DataStreams/NativeBlockInputStream.h> |
9 | #include <Common/escapeForFileName.h> |
10 | #include <Common/StringUtils/StringUtils.h> |
11 | #include <Interpreters/Set.h> |
12 | #include <Poco/DirectoryIterator.h> |
13 | |
14 | |
15 | namespace DB |
16 | { |
17 | |
18 | namespace ErrorCodes |
19 | { |
20 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
21 | } |
22 | |
23 | |
24 | namespace ErrorCodes |
25 | { |
26 | extern const int INCORRECT_FILE_NAME; |
27 | } |
28 | |
29 | |
30 | class SetOrJoinBlockOutputStream : public IBlockOutputStream |
31 | { |
32 | public: |
33 | SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_, |
34 | const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_); |
35 | |
36 | Block () const override { return table.getSampleBlock(); } |
37 | void write(const Block & block) override; |
38 | void writeSuffix() override; |
39 | |
40 | private: |
41 | StorageSetOrJoinBase & table; |
42 | String backup_path; |
43 | String backup_tmp_path; |
44 | String backup_file_name; |
45 | WriteBufferFromFile backup_buf; |
46 | CompressedWriteBuffer compressed_backup_buf; |
47 | NativeBlockOutputStream backup_stream; |
48 | }; |
49 | |
50 | |
51 | SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_, |
52 | const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_) |
53 | : table(table_), |
54 | backup_path(backup_path_), backup_tmp_path(backup_tmp_path_), |
55 | backup_file_name(backup_file_name_), |
56 | backup_buf(backup_tmp_path + backup_file_name), |
57 | compressed_backup_buf(backup_buf), |
58 | backup_stream(compressed_backup_buf, 0, table.getSampleBlock()) |
59 | { |
60 | } |
61 | |
62 | void SetOrJoinBlockOutputStream::write(const Block & block) |
63 | { |
64 | /// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks. |
65 | Block sorted_block = block.sortColumns(); |
66 | |
67 | table.insertBlock(sorted_block); |
68 | backup_stream.write(sorted_block); |
69 | } |
70 | |
71 | void SetOrJoinBlockOutputStream::writeSuffix() |
72 | { |
73 | table.finishInsert(); |
74 | backup_stream.flush(); |
75 | compressed_backup_buf.next(); |
76 | backup_buf.next(); |
77 | |
78 | Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name); |
79 | } |
80 | |
81 | |
82 | |
83 | BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const Context & /*context*/) |
84 | { |
85 | UInt64 id = ++increment; |
86 | return std::make_shared<SetOrJoinBlockOutputStream>(*this, path, path + "tmp/" , toString(id) + ".bin" ); |
87 | } |
88 | |
89 | |
90 | StorageSetOrJoinBase::StorageSetOrJoinBase( |
91 | const String & relative_path_, |
92 | const String & database_name_, |
93 | const String & table_name_, |
94 | const ColumnsDescription & columns_, |
95 | const ConstraintsDescription & constraints_, |
96 | const Context & context_) |
97 | : table_name(table_name_), database_name(database_name_) |
98 | { |
99 | setColumns(columns_); |
100 | setConstraints(constraints_); |
101 | |
102 | if (relative_path_.empty()) |
103 | throw Exception("Join and Set storages require data path" , ErrorCodes::INCORRECT_FILE_NAME); |
104 | |
105 | base_path = context_.getPath(); |
106 | path = base_path + relative_path_; |
107 | } |
108 | |
109 | |
110 | |
111 | StorageSet::StorageSet( |
112 | const String & relative_path_, |
113 | const String & database_name_, |
114 | const String & table_name_, |
115 | const ColumnsDescription & columns_, |
116 | const ConstraintsDescription & constraints_, |
117 | const Context & context_) |
118 | : StorageSetOrJoinBase{relative_path_, database_name_, table_name_, columns_, constraints_, context_}, |
119 | set(std::make_shared<Set>(SizeLimits(), false)) |
120 | { |
121 | Block = getSampleBlock(); |
122 | header = header.sortColumns(); |
123 | set->setHeader(header); |
124 | |
125 | restore(); |
126 | } |
127 | |
128 | |
129 | void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); } |
130 | void StorageSet::finishInsert() { set->finishInsert(); } |
131 | size_t StorageSet::getSize() const { return set->getTotalRowCount(); } |
132 | |
133 | |
134 | void StorageSet::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
135 | { |
136 | Poco::File(path).remove(true); |
137 | Poco::File(path).createDirectories(); |
138 | Poco::File(path + "tmp/" ).createDirectories(); |
139 | |
140 | Block = getSampleBlock(); |
141 | header = header.sortColumns(); |
142 | |
143 | increment = 0; |
144 | set = std::make_shared<Set>(SizeLimits(), false); |
145 | set->setHeader(header); |
146 | } |
147 | |
148 | |
149 | void StorageSetOrJoinBase::restore() |
150 | { |
151 | Poco::File tmp_dir(path + "tmp/" ); |
152 | if (!tmp_dir.exists()) |
153 | { |
154 | tmp_dir.createDirectories(); |
155 | return; |
156 | } |
157 | |
158 | static const auto file_suffix = ".bin" ; |
159 | static const auto file_suffix_size = strlen(".bin" ); |
160 | |
161 | Poco::DirectoryIterator dir_end; |
162 | for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it) |
163 | { |
164 | const auto & name = dir_it.name(); |
165 | |
166 | if (dir_it->isFile() |
167 | && endsWith(name, file_suffix) |
168 | && dir_it->getSize() > 0) |
169 | { |
170 | /// Calculate the maximum number of available files with a backup to add the following files with large numbers. |
171 | UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size)); |
172 | if (file_num > increment) |
173 | increment = file_num; |
174 | |
175 | restoreFromFile(dir_it->path()); |
176 | } |
177 | } |
178 | } |
179 | |
180 | |
181 | void StorageSetOrJoinBase::restoreFromFile(const String & file_path) |
182 | { |
183 | ReadBufferFromFile backup_buf(file_path); |
184 | CompressedReadBuffer compressed_backup_buf(backup_buf); |
185 | NativeBlockInputStream backup_stream(compressed_backup_buf, 0); |
186 | |
187 | backup_stream.readPrefix(); |
188 | |
189 | while (Block block = backup_stream.read()) |
190 | insertBlock(block); |
191 | |
192 | finishInsert(); |
193 | backup_stream.readSuffix(); |
194 | |
195 | /// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project. |
196 | LOG_INFO(&Logger::get("StorageSetOrJoinBase" ), std::fixed << std::setprecision(2) |
197 | << "Loaded from backup file " << file_path << ". " |
198 | << backup_stream.getProfileInfo().rows << " rows, " |
199 | << backup_stream.getProfileInfo().bytes / 1048576.0 << " MiB. " |
200 | << "State has " << getSize() << " unique rows." ); |
201 | } |
202 | |
203 | |
204 | void StorageSetOrJoinBase::rename( |
205 | const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
206 | { |
207 | /// Rename directory with data. |
208 | String new_path = base_path + new_path_to_table_data; |
209 | Poco::File(path).renameTo(new_path); |
210 | |
211 | path = new_path; |
212 | table_name = new_table_name; |
213 | database_name = new_database_name; |
214 | } |
215 | |
216 | |
217 | void registerStorageSet(StorageFactory & factory) |
218 | { |
219 | factory.registerStorage("Set" , [](const StorageFactory::Arguments & args) |
220 | { |
221 | if (!args.engine_args.empty()) |
222 | throw Exception( |
223 | "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)" , |
224 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
225 | |
226 | return StorageSet::create(args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints, args.context); |
227 | }); |
228 | } |
229 | |
230 | |
231 | } |
232 | |