1 | #include "config_core.h" |
2 | |
3 | #if USE_MYSQL |
4 | |
5 | #include <string> |
6 | #include <Databases/DatabaseMySQL.h> |
7 | #include <Common/parseAddress.h> |
8 | #include <IO/Operators.h> |
9 | #include <Formats/MySQLBlockInputStream.h> |
10 | #include <DataTypes/DataTypeString.h> |
11 | #include <DataTypes/DataTypesNumber.h> |
12 | #include <DataTypes/DataTypeDateTime.h> |
13 | #include <DataTypes/DataTypeNullable.h> |
14 | #include <Storages/StorageMySQL.h> |
15 | #include <Parsers/ASTFunction.h> |
16 | #include <Parsers/ParserCreateQuery.h> |
17 | #include <Parsers/parseQuery.h> |
18 | #include <Common/setThreadName.h> |
19 | #include <Common/escapeForFileName.h> |
20 | #include <Parsers/queryToString.h> |
21 | #include <Parsers/ASTCreateQuery.h> |
22 | #include <DataTypes/convertMySQLDataType.h> |
23 | |
24 | #include <Poco/File.h> |
25 | #include <Poco/DirectoryIterator.h> |
26 | |
27 | |
28 | |
29 | namespace DB |
30 | { |
31 | |
32 | namespace ErrorCodes |
33 | { |
34 | extern const int UNKNOWN_TABLE; |
35 | extern const int TABLE_IS_DROPPED; |
36 | extern const int TABLE_WAS_NOT_DROPPED; |
37 | extern const int TABLE_ALREADY_EXISTS; |
38 | extern const int UNEXPECTED_AST_STRUCTURE; |
39 | } |
40 | |
41 | constexpr static const auto suffix = ".remove_flag" ; |
42 | static constexpr const std::chrono::seconds cleaner_sleep_time{30}; |
43 | |
44 | static String toQueryStringWithQuote(const std::vector<String> & quote_list) |
45 | { |
46 | WriteBufferFromOwnString quote_list_query; |
47 | quote_list_query << "(" ; |
48 | |
49 | for (size_t index = 0; index < quote_list.size(); ++index) |
50 | { |
51 | if (index) |
52 | quote_list_query << "," ; |
53 | |
54 | quote_list_query << quote << quote_list[index]; |
55 | } |
56 | |
57 | quote_list_query << ")" ; |
58 | return quote_list_query.str(); |
59 | } |
60 | |
61 | DatabaseMySQL::DatabaseMySQL( |
62 | const Context & global_context_, const String & database_name_, const String & metadata_path_, |
63 | const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, mysqlxx::Pool && pool) |
64 | : IDatabase(database_name_) |
65 | , global_context(global_context_) |
66 | , metadata_path(metadata_path_) |
67 | , database_engine_define(database_engine_define_->clone()) |
68 | , database_name_in_mysql(database_name_in_mysql_) |
69 | , mysql_pool(std::move(pool)) |
70 | { |
71 | } |
72 | |
73 | bool DatabaseMySQL::empty(const Context &) const |
74 | { |
75 | std::lock_guard<std::mutex> lock(mutex); |
76 | |
77 | fetchTablesIntoLocalCache(); |
78 | |
79 | if (local_tables_cache.empty()) |
80 | return true; |
81 | |
82 | for (const auto & [table_name, storage_info] : local_tables_cache) |
83 | if (!remove_or_detach_tables.count(table_name)) |
84 | return false; |
85 | |
86 | return true; |
87 | } |
88 | |
89 | DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name) |
90 | { |
91 | Tables tables; |
92 | std::lock_guard<std::mutex> lock(mutex); |
93 | |
94 | fetchTablesIntoLocalCache(); |
95 | |
96 | for (const auto & [table_name, modify_time_and_storage] : local_tables_cache) |
97 | if (!remove_or_detach_tables.count(table_name) && (!filter_by_table_name || filter_by_table_name(table_name))) |
98 | tables[table_name] = modify_time_and_storage.second; |
99 | |
100 | return std::make_unique<DatabaseTablesSnapshotIterator>(tables); |
101 | } |
102 | |
103 | bool DatabaseMySQL::isTableExist(const Context & context, const String & name) const |
104 | { |
105 | return bool(tryGetTable(context, name)); |
106 | } |
107 | |
108 | StoragePtr DatabaseMySQL::tryGetTable(const Context &, const String & mysql_table_name) const |
109 | { |
110 | std::lock_guard<std::mutex> lock(mutex); |
111 | |
112 | fetchTablesIntoLocalCache(); |
113 | |
114 | if (!remove_or_detach_tables.count(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end()) |
115 | return local_tables_cache[mysql_table_name].second; |
116 | |
117 | return StoragePtr{}; |
118 | } |
119 | |
120 | static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & database_engine_define) |
121 | { |
122 | auto create_table_query = std::make_shared<ASTCreateQuery>(); |
123 | |
124 | auto table_storage_define = database_engine_define->clone(); |
125 | create_table_query->set(create_table_query->storage, table_storage_define); |
126 | |
127 | auto columns_declare_list = std::make_shared<ASTColumns>(); |
128 | auto columns_expression_list = std::make_shared<ASTExpressionList>(); |
129 | |
130 | columns_declare_list->set(columns_declare_list->columns, columns_expression_list); |
131 | create_table_query->set(create_table_query->columns_list, columns_declare_list); |
132 | |
133 | { |
134 | /// init create query. |
135 | |
136 | create_table_query->table = storage->getTableName(); |
137 | create_table_query->database = storage->getDatabaseName(); |
138 | |
139 | for (const auto & column_type_and_name : storage->getColumns().getOrdinary()) |
140 | { |
141 | const auto & column_declaration = std::make_shared<ASTColumnDeclaration>(); |
142 | column_declaration->name = column_type_and_name.name; |
143 | column_declaration->type = dataTypeConvertToQuery(column_type_and_name.type); |
144 | columns_expression_list->children.emplace_back(column_declaration); |
145 | } |
146 | |
147 | auto mysql_table_name = std::make_shared<ASTLiteral>(storage->getTableName()); |
148 | auto storage_engine_arguments = table_storage_define->as<ASTStorage>()->engine->arguments; |
149 | storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name); |
150 | } |
151 | |
152 | return create_table_query; |
153 | } |
154 | |
155 | ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const Context &, const String & table_name, bool throw_on_error) const |
156 | { |
157 | std::lock_guard<std::mutex> lock(mutex); |
158 | |
159 | fetchTablesIntoLocalCache(); |
160 | |
161 | if (local_tables_cache.find(table_name) == local_tables_cache.end()) |
162 | { |
163 | if (throw_on_error) |
164 | throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.." , |
165 | ErrorCodes::UNKNOWN_TABLE); |
166 | return nullptr; |
167 | } |
168 | |
169 | return getCreateQueryFromStorage(local_tables_cache[table_name].second, database_engine_define); |
170 | } |
171 | |
172 | time_t DatabaseMySQL::getObjectMetadataModificationTime(const String & table_name) const |
173 | { |
174 | std::lock_guard<std::mutex> lock(mutex); |
175 | |
176 | fetchTablesIntoLocalCache(); |
177 | |
178 | if (local_tables_cache.find(table_name) == local_tables_cache.end()) |
179 | throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
180 | |
181 | return time_t(local_tables_cache[table_name].first); |
182 | } |
183 | |
184 | ASTPtr DatabaseMySQL::getCreateDatabaseQuery() const |
185 | { |
186 | const auto & create_query = std::make_shared<ASTCreateQuery>(); |
187 | create_query->database = database_name; |
188 | create_query->set(create_query->storage, database_engine_define); |
189 | return create_query; |
190 | } |
191 | |
192 | void DatabaseMySQL::fetchTablesIntoLocalCache() const |
193 | { |
194 | const auto & tables_with_modification_time = fetchTablesWithModificationTime(); |
195 | |
196 | destroyLocalCacheExtraTables(tables_with_modification_time); |
197 | fetchLatestTablesStructureIntoCache(tables_with_modification_time); |
198 | } |
199 | |
200 | void DatabaseMySQL::(const std::map<String, UInt64> & tables_with_modification_time) const |
201 | { |
202 | for (auto iterator = local_tables_cache.begin(); iterator != local_tables_cache.end();) |
203 | { |
204 | if (tables_with_modification_time.find(iterator->first) != tables_with_modification_time.end()) |
205 | ++iterator; |
206 | else |
207 | { |
208 | outdated_tables.emplace_back(iterator->second.second); |
209 | iterator = local_tables_cache.erase(iterator); |
210 | } |
211 | } |
212 | } |
213 | |
214 | void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> &tables_modification_time) const |
215 | { |
216 | std::vector<String> wait_update_tables_name; |
217 | for (const auto & table_modification_time : tables_modification_time) |
218 | { |
219 | const auto & it = local_tables_cache.find(table_modification_time.first); |
220 | |
221 | /// Outdated or new table structures |
222 | if (it == local_tables_cache.end() || table_modification_time.second > it->second.first) |
223 | wait_update_tables_name.emplace_back(table_modification_time.first); |
224 | } |
225 | |
226 | std::map<String, NamesAndTypesList> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name); |
227 | |
228 | for (const auto & table_and_columns : tables_and_columns) |
229 | { |
230 | const auto & table_name = table_and_columns.first; |
231 | const auto & columns_name_and_type = table_and_columns.second; |
232 | const auto & table_modification_time = tables_modification_time.at(table_name); |
233 | |
234 | const auto & iterator = local_tables_cache.find(table_name); |
235 | if (iterator != local_tables_cache.end()) |
236 | { |
237 | outdated_tables.emplace_back(iterator->second.second); |
238 | local_tables_cache.erase(iterator); |
239 | } |
240 | |
241 | local_tables_cache[table_name] = std::make_pair(table_modification_time, StorageMySQL::create( |
242 | database_name, table_name, std::move(mysql_pool), database_name_in_mysql, table_name, |
243 | false, "" , ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context)); |
244 | } |
245 | } |
246 | |
247 | std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime() const |
248 | { |
249 | Block tables_status_sample_block |
250 | { |
251 | { std::make_shared<DataTypeString>(), "table_name" }, |
252 | { std::make_shared<DataTypeDateTime>(), "modification_time" }, |
253 | }; |
254 | |
255 | WriteBufferFromOwnString query; |
256 | query << "SELECT" |
257 | " TABLE_NAME AS table_name, " |
258 | " CREATE_TIME AS modification_time " |
259 | " FROM INFORMATION_SCHEMA.TABLES " |
260 | " WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql; |
261 | |
262 | std::map<String, UInt64> tables_with_modification_time; |
263 | MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE); |
264 | |
265 | while (Block block = result.read()) |
266 | { |
267 | size_t rows = block.rows(); |
268 | for (size_t index = 0; index < rows; ++index) |
269 | { |
270 | String table_name = (*block.getByPosition(0).column)[index].safeGet<String>(); |
271 | tables_with_modification_time[table_name] = (*block.getByPosition(1).column)[index].safeGet<UInt64>(); |
272 | } |
273 | } |
274 | |
275 | return tables_with_modification_time; |
276 | } |
277 | |
278 | std::map<String, NamesAndTypesList> DatabaseMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name) const |
279 | { |
280 | std::map<String, NamesAndTypesList> tables_and_columns; |
281 | |
282 | if (tables_name.empty()) |
283 | return tables_and_columns; |
284 | |
285 | Block tables_columns_sample_block |
286 | { |
287 | { std::make_shared<DataTypeString>(), "table_name" }, |
288 | { std::make_shared<DataTypeString>(), "column_name" }, |
289 | { std::make_shared<DataTypeString>(), "column_type" }, |
290 | { std::make_shared<DataTypeUInt8>(), "is_nullable" }, |
291 | { std::make_shared<DataTypeUInt8>(), "is_unsigned" }, |
292 | { std::make_shared<DataTypeUInt64>(), "length" }, |
293 | }; |
294 | |
295 | WriteBufferFromOwnString query; |
296 | query << "SELECT " |
297 | " TABLE_NAME AS table_name," |
298 | " COLUMN_NAME AS column_name," |
299 | " DATA_TYPE AS column_type," |
300 | " IS_NULLABLE = 'YES' AS is_nullable," |
301 | " COLUMN_TYPE LIKE '%unsigned' AS is_unsigned," |
302 | " CHARACTER_MAXIMUM_LENGTH AS length" |
303 | " FROM INFORMATION_SCHEMA.COLUMNS" |
304 | " WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql |
305 | << " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION" ; |
306 | |
307 | const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls; |
308 | MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE); |
309 | while (Block block = result.read()) |
310 | { |
311 | size_t rows = block.rows(); |
312 | for (size_t i = 0; i < rows; ++i) |
313 | { |
314 | String table_name = (*block.getByPosition(0).column)[i].safeGet<String>(); |
315 | tables_and_columns[table_name].emplace_back((*block.getByPosition(1).column)[i].safeGet<String>(), |
316 | convertMySQLDataType( |
317 | (*block.getByPosition(2).column)[i].safeGet<String>(), |
318 | (*block.getByPosition(3).column)[i].safeGet<UInt64>() && |
319 | external_table_functions_use_nulls, |
320 | (*block.getByPosition(4).column)[i].safeGet<UInt64>(), |
321 | (*block.getByPosition(5).column)[i].safeGet<UInt64>())); |
322 | } |
323 | } |
324 | return tables_and_columns; |
325 | } |
326 | |
327 | void DatabaseMySQL::shutdown() |
328 | { |
329 | std::map<String, ModifyTimeAndStorage> tables_snapshot; |
330 | { |
331 | std::lock_guard lock(mutex); |
332 | tables_snapshot = local_tables_cache; |
333 | } |
334 | |
335 | for (const auto & [table_name, modify_time_and_storage] : tables_snapshot) |
336 | modify_time_and_storage.second->shutdown(); |
337 | |
338 | std::lock_guard lock(mutex); |
339 | local_tables_cache.clear(); |
340 | } |
341 | |
342 | void DatabaseMySQL::drop(const Context & /*context*/) |
343 | { |
344 | Poco::File(getMetadataPath()).remove(true); |
345 | } |
346 | |
347 | void DatabaseMySQL::cleanOutdatedTables() |
348 | { |
349 | setThreadName("MySQLDBCleaner" ); |
350 | |
351 | std::unique_lock lock{mutex}; |
352 | |
353 | while (!quit.load(std::memory_order_relaxed)) |
354 | { |
355 | for (auto iterator = outdated_tables.begin(); iterator != outdated_tables.end();) |
356 | { |
357 | if (!iterator->unique()) |
358 | ++iterator; |
359 | else |
360 | { |
361 | const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY); |
362 | |
363 | (*iterator)->shutdown(); |
364 | (*iterator)->is_dropped = true; |
365 | iterator = outdated_tables.erase(iterator); |
366 | } |
367 | } |
368 | |
369 | cond.wait_for(lock, cleaner_sleep_time); |
370 | } |
371 | } |
372 | |
373 | void DatabaseMySQL::attachTable(const String & table_name, const StoragePtr & storage) |
374 | { |
375 | std::lock_guard<std::mutex> lock{mutex}; |
376 | |
377 | if (!local_tables_cache.count(table_name)) |
378 | throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + |
379 | " because it does not exist." , ErrorCodes::UNKNOWN_TABLE); |
380 | |
381 | if (!remove_or_detach_tables.count(table_name)) |
382 | throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + |
383 | " because it already exists." , ErrorCodes::TABLE_ALREADY_EXISTS); |
384 | |
385 | /// We use the new storage to replace the original storage, because the original storage may have been dropped |
386 | /// Although we still keep its |
387 | local_tables_cache[table_name].second = storage; |
388 | |
389 | remove_or_detach_tables.erase(table_name); |
390 | Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix); |
391 | |
392 | if (remove_flag.exists()) |
393 | remove_flag.remove(); |
394 | } |
395 | |
396 | StoragePtr DatabaseMySQL::detachTable(const String & table_name) |
397 | { |
398 | std::lock_guard<std::mutex> lock{mutex}; |
399 | |
400 | if (remove_or_detach_tables.count(table_name)) |
401 | throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped" , |
402 | ErrorCodes::TABLE_IS_DROPPED); |
403 | |
404 | if (!local_tables_cache.count(table_name)) |
405 | throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist." , |
406 | ErrorCodes::UNKNOWN_TABLE); |
407 | |
408 | remove_or_detach_tables.emplace(table_name); |
409 | return local_tables_cache[table_name].second; |
410 | } |
411 | |
412 | String DatabaseMySQL::getMetadataPath() const |
413 | { |
414 | return metadata_path; |
415 | } |
416 | |
417 | void DatabaseMySQL::loadStoredObjects(Context &, bool) |
418 | { |
419 | |
420 | std::lock_guard<std::mutex> lock{mutex}; |
421 | Poco::DirectoryIterator iterator(getMetadataPath()); |
422 | |
423 | for (Poco::DirectoryIterator end; iterator != end; ++iterator) |
424 | { |
425 | if (iterator->isFile() && endsWith(iterator.name(), suffix)) |
426 | { |
427 | const auto & filename = iterator.name(); |
428 | const auto & table_name = unescapeForFileName(filename.substr(0, filename.size() - strlen(suffix))); |
429 | remove_or_detach_tables.emplace(table_name); |
430 | } |
431 | } |
432 | } |
433 | |
434 | void DatabaseMySQL::removeTable(const Context &, const String & table_name) |
435 | { |
436 | std::lock_guard<std::mutex> lock{mutex}; |
437 | |
438 | Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix); |
439 | |
440 | if (remove_or_detach_tables.count(table_name)) |
441 | throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped" , |
442 | ErrorCodes::TABLE_IS_DROPPED); |
443 | |
444 | if (remove_flag.exists()) |
445 | throw Exception("The remove flag file already exists but the " + backQuoteIfNeed(getDatabaseName()) + |
446 | "." + backQuoteIfNeed(table_name) + " does not exists remove tables, it is bug." , ErrorCodes::LOGICAL_ERROR); |
447 | |
448 | if (!local_tables_cache.count(table_name)) |
449 | throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist." , |
450 | ErrorCodes::UNKNOWN_TABLE); |
451 | |
452 | remove_or_detach_tables.emplace(table_name); |
453 | |
454 | try |
455 | { |
456 | remove_flag.createFile(); |
457 | } |
458 | catch (...) |
459 | { |
460 | remove_or_detach_tables.erase(table_name); |
461 | throw; |
462 | } |
463 | } |
464 | |
465 | DatabaseMySQL::~DatabaseMySQL() |
466 | { |
467 | try |
468 | { |
469 | if (!quit) |
470 | { |
471 | { |
472 | quit = true; |
473 | std::lock_guard lock{mutex}; |
474 | } |
475 | cond.notify_one(); |
476 | thread.join(); |
477 | } |
478 | |
479 | shutdown(); |
480 | } |
481 | catch (...) |
482 | { |
483 | tryLogCurrentException(__PRETTY_FUNCTION__); |
484 | } |
485 | } |
486 | |
487 | void DatabaseMySQL::createTable(const Context & context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) |
488 | { |
489 | const auto & create = create_query->as<ASTCreateQuery>(); |
490 | |
491 | if (!create->attach) |
492 | throw Exception("MySQL database engine does not support create table. for tables that were detach or dropped before, " |
493 | "you can use attach to add them back to the MySQL database" , ErrorCodes::NOT_IMPLEMENTED); |
494 | |
495 | /// XXX: hack |
496 | /// In order to prevent users from broken the table structure by executing attach table database_name.table_name (...) |
497 | /// we should compare the old and new create_query to make them completely consistent |
498 | const auto & origin_create_query = getCreateTableQuery(context, table_name); |
499 | origin_create_query->as<ASTCreateQuery>()->attach = true; |
500 | |
501 | if (queryToString(origin_create_query) != queryToString(create_query)) |
502 | throw Exception("The MySQL database engine can only execute attach statements of type attach table database_name.table_name" , |
503 | ErrorCodes::UNEXPECTED_AST_STRUCTURE); |
504 | |
505 | attachTable(table_name, storage); |
506 | } |
507 | |
508 | } |
509 | |
510 | #endif |
511 | |