1 | #include <Core/Settings.h> |
2 | #include <Databases/DatabaseLazy.h> |
3 | #include <Databases/DatabaseOnDisk.h> |
4 | #include <Databases/DatabasesCommon.h> |
5 | #include <Interpreters/Context.h> |
6 | #include <IO/ReadHelpers.h> |
7 | #include <IO/WriteBufferFromFile.h> |
8 | #include <IO/WriteHelpers.h> |
9 | #include <Parsers/ASTCreateQuery.h> |
10 | #include <Storages/IStorage.h> |
11 | |
12 | #include <common/logger_useful.h> |
13 | #include <ext/scope_guard.h> |
14 | #include <iomanip> |
15 | #include <Poco/File.h> |
16 | |
17 | |
18 | namespace DB |
19 | { |
20 | |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int TABLE_ALREADY_EXISTS; |
24 | extern const int UNKNOWN_TABLE; |
25 | extern const int UNSUPPORTED_METHOD; |
26 | extern const int CANNOT_CREATE_TABLE_FROM_METADATA; |
27 | extern const int LOGICAL_ERROR; |
28 | } |
29 | |
30 | |
31 | |
32 | DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_, time_t expiration_time_, const Context & context_) |
33 | : DatabaseOnDisk(name_, metadata_path_, "DatabaseLazy (" + name_ + ")" ) |
34 | , expiration_time(expiration_time_) |
35 | { |
36 | Poco::File(context_.getPath() + getDataPath()).createDirectories(); |
37 | } |
38 | |
39 | |
40 | void DatabaseLazy::loadStoredObjects( |
41 | Context & context, |
42 | bool /* has_force_restore_data_flag */) |
43 | { |
44 | iterateMetadataFiles(context, [this](const String & file_name) |
45 | { |
46 | const std::string table_name = file_name.substr(0, file_name.size() - 4); |
47 | attachTable(table_name, nullptr); |
48 | }); |
49 | } |
50 | |
51 | |
52 | void DatabaseLazy::createTable( |
53 | const Context & context, |
54 | const String & table_name, |
55 | const StoragePtr & table, |
56 | const ASTPtr & query) |
57 | { |
58 | SCOPE_EXIT({ clearExpiredTables(); }); |
59 | if (!endsWith(table->getName(), "Log" )) |
60 | throw Exception("Lazy engine can be used only with *Log tables." , ErrorCodes::UNSUPPORTED_METHOD); |
61 | DatabaseOnDisk::createTable(context, table_name, table, query); |
62 | |
63 | /// DatabaseOnDisk::createTable renames file, so we need to get new metadata_modification_time. |
64 | std::lock_guard lock(mutex); |
65 | auto it = tables_cache.find(table_name); |
66 | if (it != tables_cache.end()) |
67 | it->second.metadata_modification_time = DatabaseOnDisk::getObjectMetadataModificationTime(table_name); |
68 | } |
69 | |
70 | void DatabaseLazy::removeTable( |
71 | const Context & context, |
72 | const String & table_name) |
73 | { |
74 | SCOPE_EXIT({ clearExpiredTables(); }); |
75 | DatabaseOnDisk::removeTable(context, table_name); |
76 | } |
77 | |
78 | void DatabaseLazy::renameTable( |
79 | const Context & context, |
80 | const String & table_name, |
81 | IDatabase & to_database, |
82 | const String & to_table_name, |
83 | TableStructureWriteLockHolder & lock) |
84 | { |
85 | SCOPE_EXIT({ clearExpiredTables(); }); |
86 | DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, lock); |
87 | } |
88 | |
89 | |
90 | time_t DatabaseLazy::getObjectMetadataModificationTime(const String & table_name) const |
91 | { |
92 | std::lock_guard lock(mutex); |
93 | auto it = tables_cache.find(table_name); |
94 | if (it != tables_cache.end()) |
95 | return it->second.metadata_modification_time; |
96 | throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
97 | } |
98 | |
99 | void DatabaseLazy::alterTable( |
100 | const Context & /* context */, |
101 | const String & /* table_name */, |
102 | const StorageInMemoryMetadata & /* metadata */) |
103 | { |
104 | clearExpiredTables(); |
105 | throw Exception("ALTER query is not supported for Lazy database." , ErrorCodes::UNSUPPORTED_METHOD); |
106 | } |
107 | |
108 | bool DatabaseLazy::isTableExist( |
109 | const Context & /* context */, |
110 | const String & table_name) const |
111 | { |
112 | SCOPE_EXIT({ clearExpiredTables(); }); |
113 | std::lock_guard lock(mutex); |
114 | return tables_cache.find(table_name) != tables_cache.end(); |
115 | } |
116 | |
117 | StoragePtr DatabaseLazy::tryGetTable( |
118 | const Context & context, |
119 | const String & table_name) const |
120 | { |
121 | SCOPE_EXIT({ clearExpiredTables(); }); |
122 | { |
123 | std::lock_guard lock(mutex); |
124 | auto it = tables_cache.find(table_name); |
125 | if (it == tables_cache.end()) |
126 | throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
127 | |
128 | if (it->second.table) |
129 | { |
130 | cache_expiration_queue.erase(it->second.expiration_iterator); |
131 | it->second.last_touched = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); |
132 | it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), it->second.last_touched, table_name); |
133 | |
134 | return it->second.table; |
135 | } |
136 | } |
137 | |
138 | return loadTable(context, table_name); |
139 | } |
140 | |
141 | DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name) |
142 | { |
143 | std::lock_guard lock(mutex); |
144 | Strings filtered_tables; |
145 | for (const auto & [table_name, cached_table] : tables_cache) |
146 | { |
147 | if (!filter_by_table_name || filter_by_table_name(table_name)) |
148 | filtered_tables.push_back(table_name); |
149 | } |
150 | std::sort(filtered_tables.begin(), filtered_tables.end()); |
151 | return std::make_unique<DatabaseLazyIterator>(*this, context, std::move(filtered_tables)); |
152 | } |
153 | |
154 | bool DatabaseLazy::empty(const Context & /* context */) const |
155 | { |
156 | return tables_cache.empty(); |
157 | } |
158 | |
159 | void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & table) |
160 | { |
161 | LOG_DEBUG(log, "Attach table " << backQuote(table_name) << "." ); |
162 | std::lock_guard lock(mutex); |
163 | time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); |
164 | |
165 | auto [it, inserted] = tables_cache.emplace(std::piecewise_construct, |
166 | std::forward_as_tuple(table_name), |
167 | std::forward_as_tuple(table, current_time, DatabaseOnDisk::getObjectMetadataModificationTime(table_name))); |
168 | if (!inserted) |
169 | throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists." , ErrorCodes::TABLE_ALREADY_EXISTS); |
170 | |
171 | it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), current_time, table_name); |
172 | } |
173 | |
174 | StoragePtr DatabaseLazy::detachTable(const String & table_name) |
175 | { |
176 | StoragePtr res; |
177 | { |
178 | LOG_DEBUG(log, "Detach table " << backQuote(table_name) << "." ); |
179 | std::lock_guard lock(mutex); |
180 | auto it = tables_cache.find(table_name); |
181 | if (it == tables_cache.end()) |
182 | throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
183 | res = it->second.table; |
184 | if (it->second.expiration_iterator != cache_expiration_queue.end()) |
185 | cache_expiration_queue.erase(it->second.expiration_iterator); |
186 | tables_cache.erase(it); |
187 | } |
188 | return res; |
189 | } |
190 | |
191 | void DatabaseLazy::shutdown() |
192 | { |
193 | TablesCache tables_snapshot; |
194 | { |
195 | std::lock_guard lock(mutex); |
196 | tables_snapshot = tables_cache; |
197 | } |
198 | |
199 | for (const auto & kv : tables_snapshot) |
200 | { |
201 | if (kv.second.table) |
202 | kv.second.table->shutdown(); |
203 | } |
204 | |
205 | std::lock_guard lock(mutex); |
206 | tables_cache.clear(); |
207 | } |
208 | |
209 | DatabaseLazy::~DatabaseLazy() |
210 | { |
211 | try |
212 | { |
213 | shutdown(); |
214 | } |
215 | catch (...) |
216 | { |
217 | tryLogCurrentException(__PRETTY_FUNCTION__); |
218 | } |
219 | } |
220 | |
221 | StoragePtr DatabaseLazy::loadTable(const Context & context, const String & table_name) const |
222 | { |
223 | SCOPE_EXIT({ clearExpiredTables(); }); |
224 | |
225 | LOG_DEBUG(log, "Load table " << backQuote(table_name) << " to cache." ); |
226 | |
227 | const String table_metadata_path = getMetadataPath() + "/" + escapeForFileName(table_name) + ".sql" ; |
228 | |
229 | try |
230 | { |
231 | StoragePtr table; |
232 | Context context_copy(context); /// some tables can change context, but not LogTables |
233 | |
234 | auto ast = parseQueryFromMetadata(table_metadata_path, /*throw_on_error*/ true, /*remove_empty*/false); |
235 | if (ast) |
236 | { |
237 | auto & ast_create = ast->as<const ASTCreateQuery &>(); |
238 | String table_data_path_relative = getTableDataPath(ast_create); |
239 | table = createTableFromAST(ast_create, database_name, table_data_path_relative, context_copy, false).second; |
240 | } |
241 | |
242 | if (!ast || !endsWith(table->getName(), "Log" )) |
243 | throw Exception("Only *Log tables can be used with Lazy database engine." , ErrorCodes::LOGICAL_ERROR); |
244 | { |
245 | std::lock_guard lock(mutex); |
246 | auto it = tables_cache.find(table_name); |
247 | if (it == tables_cache.end()) |
248 | throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
249 | |
250 | if (it->second.expiration_iterator != cache_expiration_queue.end()) |
251 | cache_expiration_queue.erase(it->second.expiration_iterator); |
252 | it->second.last_touched = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); |
253 | it->second.expiration_iterator = cache_expiration_queue.emplace(cache_expiration_queue.end(), it->second.last_touched, table_name); |
254 | |
255 | return it->second.table = table; |
256 | } |
257 | } |
258 | catch (const Exception & e) |
259 | { |
260 | throw Exception("Cannot create table from metadata file " + table_metadata_path + ". Error: " + DB::getCurrentExceptionMessage(true), |
261 | e, DB::ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA); |
262 | } |
263 | } |
264 | |
265 | void DatabaseLazy::clearExpiredTables() const |
266 | { |
267 | std::lock_guard lock(mutex); |
268 | auto time_now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); |
269 | |
270 | CacheExpirationQueue expired_tables; |
271 | auto expired_it = cache_expiration_queue.begin(); |
272 | while (expired_it != cache_expiration_queue.end() && (time_now - expired_it->last_touched) >= expiration_time) |
273 | ++expired_it; |
274 | |
275 | expired_tables.splice(expired_tables.end(), cache_expiration_queue, cache_expiration_queue.begin(), expired_it); |
276 | |
277 | CacheExpirationQueue busy_tables; |
278 | |
279 | while (!expired_tables.empty()) |
280 | { |
281 | String table_name = expired_tables.front().table_name; |
282 | auto it = tables_cache.find(table_name); |
283 | |
284 | if (!it->second.table || it->second.table.unique()) |
285 | { |
286 | LOG_DEBUG(log, "Drop table " << backQuote(it->first) << " from cache." ); |
287 | it->second.table.reset(); |
288 | expired_tables.erase(it->second.expiration_iterator); |
289 | it->second.expiration_iterator = cache_expiration_queue.end(); |
290 | } |
291 | else |
292 | { |
293 | LOG_DEBUG(log, "Table " << backQuote(it->first) << " is busy." ); |
294 | busy_tables.splice(busy_tables.end(), expired_tables, it->second.expiration_iterator); |
295 | } |
296 | } |
297 | |
298 | cache_expiration_queue.splice(cache_expiration_queue.begin(), busy_tables, busy_tables.begin(), busy_tables.end()); |
299 | } |
300 | |
301 | |
302 | DatabaseLazyIterator::DatabaseLazyIterator(DatabaseLazy & database_, const Context & context_, Strings && table_names_) |
303 | : database(database_) |
304 | , table_names(std::move(table_names_)) |
305 | , context(context_) |
306 | , iterator(table_names.begin()) |
307 | , current_storage(nullptr) |
308 | { |
309 | } |
310 | |
311 | void DatabaseLazyIterator::next() |
312 | { |
313 | current_storage.reset(); |
314 | ++iterator; |
315 | while (isValid() && !database.isTableExist(context, *iterator)) |
316 | ++iterator; |
317 | } |
318 | |
319 | bool DatabaseLazyIterator::isValid() const |
320 | { |
321 | return iterator != table_names.end(); |
322 | } |
323 | |
324 | const String & DatabaseLazyIterator::name() const |
325 | { |
326 | return *iterator; |
327 | } |
328 | |
329 | const StoragePtr & DatabaseLazyIterator::table() const |
330 | { |
331 | if (!current_storage) |
332 | current_storage = database.tryGetTable(context, *iterator); |
333 | return current_storage; |
334 | } |
335 | |
336 | } |
337 | |