1#include <Storages/StorageSet.h>
2#include <Storages/StorageFactory.h>
3#include <IO/ReadBufferFromFile.h>
4#include <Compression/CompressedReadBuffer.h>
5#include <IO/WriteBufferFromFile.h>
6#include <Compression/CompressedWriteBuffer.h>
7#include <DataStreams/NativeBlockOutputStream.h>
8#include <DataStreams/NativeBlockInputStream.h>
9#include <Common/escapeForFileName.h>
10#include <Common/StringUtils/StringUtils.h>
11#include <Interpreters/Set.h>
12#include <Poco/DirectoryIterator.h>
13
14
15namespace DB
16{
17
18namespace ErrorCodes
19{
20 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
21}
22
23
24namespace ErrorCodes
25{
26 extern const int INCORRECT_FILE_NAME;
27}
28
29
30class SetOrJoinBlockOutputStream : public IBlockOutputStream
31{
32public:
33 SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
34 const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_);
35
36 Block getHeader() const override { return table.getSampleBlock(); }
37 void write(const Block & block) override;
38 void writeSuffix() override;
39
40private:
41 StorageSetOrJoinBase & table;
42 String backup_path;
43 String backup_tmp_path;
44 String backup_file_name;
45 WriteBufferFromFile backup_buf;
46 CompressedWriteBuffer compressed_backup_buf;
47 NativeBlockOutputStream backup_stream;
48};
49
50
51SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
52 const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
53 : table(table_),
54 backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
55 backup_file_name(backup_file_name_),
56 backup_buf(backup_tmp_path + backup_file_name),
57 compressed_backup_buf(backup_buf),
58 backup_stream(compressed_backup_buf, 0, table.getSampleBlock())
59{
60}
61
62void SetOrJoinBlockOutputStream::write(const Block & block)
63{
64 /// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
65 Block sorted_block = block.sortColumns();
66
67 table.insertBlock(sorted_block);
68 backup_stream.write(sorted_block);
69}
70
71void SetOrJoinBlockOutputStream::writeSuffix()
72{
73 table.finishInsert();
74 backup_stream.flush();
75 compressed_backup_buf.next();
76 backup_buf.next();
77
78 Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
79}
80
81
82
83BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const Context & /*context*/)
84{
85 UInt64 id = ++increment;
86 return std::make_shared<SetOrJoinBlockOutputStream>(*this, path, path + "tmp/", toString(id) + ".bin");
87}
88
89
90StorageSetOrJoinBase::StorageSetOrJoinBase(
91 const String & relative_path_,
92 const String & database_name_,
93 const String & table_name_,
94 const ColumnsDescription & columns_,
95 const ConstraintsDescription & constraints_,
96 const Context & context_)
97 : table_name(table_name_), database_name(database_name_)
98{
99 setColumns(columns_);
100 setConstraints(constraints_);
101
102 if (relative_path_.empty())
103 throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
104
105 base_path = context_.getPath();
106 path = base_path + relative_path_;
107}
108
109
110
111StorageSet::StorageSet(
112 const String & relative_path_,
113 const String & database_name_,
114 const String & table_name_,
115 const ColumnsDescription & columns_,
116 const ConstraintsDescription & constraints_,
117 const Context & context_)
118 : StorageSetOrJoinBase{relative_path_, database_name_, table_name_, columns_, constraints_, context_},
119 set(std::make_shared<Set>(SizeLimits(), false))
120{
121 Block header = getSampleBlock();
122 header = header.sortColumns();
123 set->setHeader(header);
124
125 restore();
126}
127
128
129void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); }
130void StorageSet::finishInsert() { set->finishInsert(); }
131size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
132
133
134void StorageSet::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
135{
136 Poco::File(path).remove(true);
137 Poco::File(path).createDirectories();
138 Poco::File(path + "tmp/").createDirectories();
139
140 Block header = getSampleBlock();
141 header = header.sortColumns();
142
143 increment = 0;
144 set = std::make_shared<Set>(SizeLimits(), false);
145 set->setHeader(header);
146}
147
148
149void StorageSetOrJoinBase::restore()
150{
151 Poco::File tmp_dir(path + "tmp/");
152 if (!tmp_dir.exists())
153 {
154 tmp_dir.createDirectories();
155 return;
156 }
157
158 static const auto file_suffix = ".bin";
159 static const auto file_suffix_size = strlen(".bin");
160
161 Poco::DirectoryIterator dir_end;
162 for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
163 {
164 const auto & name = dir_it.name();
165
166 if (dir_it->isFile()
167 && endsWith(name, file_suffix)
168 && dir_it->getSize() > 0)
169 {
170 /// Calculate the maximum number of available files with a backup to add the following files with large numbers.
171 UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
172 if (file_num > increment)
173 increment = file_num;
174
175 restoreFromFile(dir_it->path());
176 }
177 }
178}
179
180
181void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
182{
183 ReadBufferFromFile backup_buf(file_path);
184 CompressedReadBuffer compressed_backup_buf(backup_buf);
185 NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
186
187 backup_stream.readPrefix();
188
189 while (Block block = backup_stream.read())
190 insertBlock(block);
191
192 finishInsert();
193 backup_stream.readSuffix();
194
195 /// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
196 LOG_INFO(&Logger::get("StorageSetOrJoinBase"), std::fixed << std::setprecision(2)
197 << "Loaded from backup file " << file_path << ". "
198 << backup_stream.getProfileInfo().rows << " rows, "
199 << backup_stream.getProfileInfo().bytes / 1048576.0 << " MiB. "
200 << "State has " << getSize() << " unique rows.");
201}
202
203
204void StorageSetOrJoinBase::rename(
205 const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
206{
207 /// Rename directory with data.
208 String new_path = base_path + new_path_to_table_data;
209 Poco::File(path).renameTo(new_path);
210
211 path = new_path;
212 table_name = new_table_name;
213 database_name = new_database_name;
214}
215
216
217void registerStorageSet(StorageFactory & factory)
218{
219 factory.registerStorage("Set", [](const StorageFactory::Arguments & args)
220 {
221 if (!args.engine_args.empty())
222 throw Exception(
223 "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
224 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
225
226 return StorageSet::create(args.relative_data_path, args.database_name, args.table_name, args.columns, args.constraints, args.context);
227 });
228}
229
230
231}
232