1#include <Core/Settings.h>
2#include <Databases/DatabaseLazy.h>
3#include <Databases/DatabaseOnDisk.h>
4#include <Databases/DatabasesCommon.h>
5#include <Interpreters/Context.h>
6#include <IO/ReadHelpers.h>
7#include <IO/WriteBufferFromFile.h>
8#include <IO/WriteHelpers.h>
9#include <Parsers/ASTCreateQuery.h>
10#include <Storages/IStorage.h>
11
12#include <common/logger_useful.h>
13#include <ext/scope_guard.h>
14#include <iomanip>
15#include <Poco/File.h>
16
17
18namespace DB
19{
20
21namespace ErrorCodes
22{
23 extern const int TABLE_ALREADY_EXISTS;
24 extern const int UNKNOWN_TABLE;
25 extern const int UNSUPPORTED_METHOD;
26 extern const int CANNOT_CREATE_TABLE_FROM_METADATA;
27 extern const int LOGICAL_ERROR;
28}
29
30
31
32DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_, time_t expiration_time_, const Context & context_)
33 : DatabaseOnDisk(name_, metadata_path_, "DatabaseLazy (" + name_ + ")")
34 , expiration_time(expiration_time_)
35{
36 Poco::File(context_.getPath() + getDataPath()).createDirectories();
37}
38
39
40void DatabaseLazy::loadStoredObjects(
41 Context & context,
42 bool /* has_force_restore_data_flag */)
43{
44 iterateMetadataFiles(context, [this](const String & file_name)
45 {
46 const std::string table_name = file_name.substr(0, file_name.size() - 4);
47 attachTable(table_name, nullptr);
48 });
49}
50
51
52void DatabaseLazy::createTable(
53 const Context & context,
54 const String & table_name,
55 const StoragePtr & table,
56 const ASTPtr & query)
57{
58 SCOPE_EXIT({ clearExpiredTables(); });
59 if (!endsWith(table->getName(), "Log"))
60 throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD);
61 DatabaseOnDisk::createTable(context, table_name, table, query);
62
63 /// DatabaseOnDisk::createTable renames file, so we need to get new metadata_modification_time.
64 std::lock_guard lock(mutex);
65 auto it = tables_cache.find(table_name);
66 if (it != tables_cache.end())
67 it->second.metadata_modification_time = DatabaseOnDisk::getObjectMetadataModificationTime(table_name);
68}
69
70void DatabaseLazy::removeTable(
71 const Context & context,
72 const String & table_name)
73{
74 SCOPE_EXIT({ clearExpiredTables(); });
75 DatabaseOnDisk::removeTable(context, table_name);
76}
77
78void DatabaseLazy::renameTable(
79 const Context & context,
80 const String & table_name,
81 IDatabase & to_database,
82 const String & to_table_name,
83 TableStructureWriteLockHolder & lock)
84{
85 SCOPE_EXIT({ clearExpiredTables(); });
86 DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, lock);
87}
88
89
90time_t DatabaseLazy::getObjectMetadataModificationTime(const String & table_name) const
91{
92 std::lock_guard lock(mutex);
93 auto it = tables_cache.find(table_name);
94 if (it != tables_cache.end())
95 return it->second.metadata_modification_time;
96 throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
97}
98
99void DatabaseLazy::alterTable(
100 const Context & /* context */,
101 const String & /* table_name */,
102 const StorageInMemoryMetadata & /* metadata */)
103{
104 clearExpiredTables();
105 throw Exception("ALTER query is not supported for Lazy database.", ErrorCodes::UNSUPPORTED_METHOD);
106}
107
108bool DatabaseLazy::isTableExist(
109 const Context & /* context */,
110 const String & table_name) const
111{
112 SCOPE_EXIT({ clearExpiredTables(); });
113 std::lock_guard lock(mutex);
114 return tables_cache.find(table_name) != tables_cache.end();
115}
116
117StoragePtr DatabaseLazy::tryGetTable(
118 const Context & context,
119 const String & table_name) const
120{
121 SCOPE_EXIT({ clearExpiredTables(); });
122 {
123 std::lock_guard lock(mutex);
124 auto it = tables_cache.find(table_name);
125 if (it == tables_cache.end())
126 throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
127
128 if (it->second.table)
129 {
130 cache_expiration_queue.erase(it->second.expiration_iterator);
131 it->second.last_touched = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
132 it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), it->second.last_touched, table_name);
133
134 return it->second.table;
135 }
136 }
137
138 return loadTable(context, table_name);
139}
140
141DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
142{
143 std::lock_guard lock(mutex);
144 Strings filtered_tables;
145 for (const auto & [table_name, cached_table] : tables_cache)
146 {
147 if (!filter_by_table_name || filter_by_table_name(table_name))
148 filtered_tables.push_back(table_name);
149 }
150 std::sort(filtered_tables.begin(), filtered_tables.end());
151 return std::make_unique<DatabaseLazyIterator>(*this, context, std::move(filtered_tables));
152}
153
154bool DatabaseLazy::empty(const Context & /* context */) const
155{
156 return tables_cache.empty();
157}
158
159void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & table)
160{
161 LOG_DEBUG(log, "Attach table " << backQuote(table_name) << ".");
162 std::lock_guard lock(mutex);
163 time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
164
165 auto [it, inserted] = tables_cache.emplace(std::piecewise_construct,
166 std::forward_as_tuple(table_name),
167 std::forward_as_tuple(table, current_time, DatabaseOnDisk::getObjectMetadataModificationTime(table_name)));
168 if (!inserted)
169 throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
170
171 it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), current_time, table_name);
172}
173
174StoragePtr DatabaseLazy::detachTable(const String & table_name)
175{
176 StoragePtr res;
177 {
178 LOG_DEBUG(log, "Detach table " << backQuote(table_name) << ".");
179 std::lock_guard lock(mutex);
180 auto it = tables_cache.find(table_name);
181 if (it == tables_cache.end())
182 throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
183 res = it->second.table;
184 if (it->second.expiration_iterator != cache_expiration_queue.end())
185 cache_expiration_queue.erase(it->second.expiration_iterator);
186 tables_cache.erase(it);
187 }
188 return res;
189}
190
191void DatabaseLazy::shutdown()
192{
193 TablesCache tables_snapshot;
194 {
195 std::lock_guard lock(mutex);
196 tables_snapshot = tables_cache;
197 }
198
199 for (const auto & kv : tables_snapshot)
200 {
201 if (kv.second.table)
202 kv.second.table->shutdown();
203 }
204
205 std::lock_guard lock(mutex);
206 tables_cache.clear();
207}
208
209DatabaseLazy::~DatabaseLazy()
210{
211 try
212 {
213 shutdown();
214 }
215 catch (...)
216 {
217 tryLogCurrentException(__PRETTY_FUNCTION__);
218 }
219}
220
221StoragePtr DatabaseLazy::loadTable(const Context & context, const String & table_name) const
222{
223 SCOPE_EXIT({ clearExpiredTables(); });
224
225 LOG_DEBUG(log, "Load table " << backQuote(table_name) << " to cache.");
226
227 const String table_metadata_path = getMetadataPath() + "/" + escapeForFileName(table_name) + ".sql";
228
229 try
230 {
231 StoragePtr table;
232 Context context_copy(context); /// some tables can change context, but not LogTables
233
234 auto ast = parseQueryFromMetadata(table_metadata_path, /*throw_on_error*/ true, /*remove_empty*/false);
235 if (ast)
236 {
237 auto & ast_create = ast->as<const ASTCreateQuery &>();
238 String table_data_path_relative = getTableDataPath(ast_create);
239 table = createTableFromAST(ast_create, database_name, table_data_path_relative, context_copy, false).second;
240 }
241
242 if (!ast || !endsWith(table->getName(), "Log"))
243 throw Exception("Only *Log tables can be used with Lazy database engine.", ErrorCodes::LOGICAL_ERROR);
244 {
245 std::lock_guard lock(mutex);
246 auto it = tables_cache.find(table_name);
247 if (it == tables_cache.end())
248 throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
249
250 if (it->second.expiration_iterator != cache_expiration_queue.end())
251 cache_expiration_queue.erase(it->second.expiration_iterator);
252 it->second.last_touched = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
253 it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), it->second.last_touched, table_name);
254
255 return it->second.table = table;
256 }
257 }
258 catch (const Exception & e)
259 {
260 throw Exception("Cannot create table from metadata file " + table_metadata_path + ". Error: " + DB::getCurrentExceptionMessage(true),
261 e, DB::ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
262 }
263}
264
265void DatabaseLazy::clearExpiredTables() const
266{
267 std::lock_guard lock(mutex);
268 auto time_now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
269
270 CacheExpirationQueue expired_tables;
271 auto expired_it = cache_expiration_queue.begin();
272 while (expired_it != cache_expiration_queue.end() && (time_now - expired_it->last_touched) >= expiration_time)
273 ++expired_it;
274
275 expired_tables.splice(expired_tables.end(), cache_expiration_queue, cache_expiration_queue.begin(), expired_it);
276
277 CacheExpirationQueue busy_tables;
278
279 while (!expired_tables.empty())
280 {
281 String table_name = expired_tables.front().table_name;
282 auto it = tables_cache.find(table_name);
283
284 if (!it->second.table || it->second.table.unique())
285 {
286 LOG_DEBUG(log, "Drop table " << backQuote(it->first) << " from cache.");
287 it->second.table.reset();
288 expired_tables.erase(it->second.expiration_iterator);
289 it->second.expiration_iterator = cache_expiration_queue.end();
290 }
291 else
292 {
293 LOG_DEBUG(log, "Table " << backQuote(it->first) << " is busy.");
294 busy_tables.splice(busy_tables.end(), expired_tables, it->second.expiration_iterator);
295 }
296 }
297
298 cache_expiration_queue.splice(cache_expiration_queue.begin(), busy_tables, busy_tables.begin(), busy_tables.end());
299}
300
301
302DatabaseLazyIterator::DatabaseLazyIterator(DatabaseLazy & database_, const Context & context_, Strings && table_names_)
303 : database(database_)
304 , table_names(std::move(table_names_))
305 , context(context_)
306 , iterator(table_names.begin())
307 , current_storage(nullptr)
308{
309}
310
311void DatabaseLazyIterator::next()
312{
313 current_storage.reset();
314 ++iterator;
315 while (isValid() && !database.isTableExist(context, *iterator))
316 ++iterator;
317}
318
319bool DatabaseLazyIterator::isValid() const
320{
321 return iterator != table_names.end();
322}
323
324const String & DatabaseLazyIterator::name() const
325{
326 return *iterator;
327}
328
329const StoragePtr & DatabaseLazyIterator::table() const
330{
331 if (!current_storage)
332 current_storage = database.tryGetTable(context, *iterator);
333 return current_storage;
334}
335
336}
337