1#include "config_core.h"
2
3#if USE_MYSQL
4
5#include <string>
6#include <Databases/DatabaseMySQL.h>
7#include <Common/parseAddress.h>
8#include <IO/Operators.h>
9#include <Formats/MySQLBlockInputStream.h>
10#include <DataTypes/DataTypeString.h>
11#include <DataTypes/DataTypesNumber.h>
12#include <DataTypes/DataTypeDateTime.h>
13#include <DataTypes/DataTypeNullable.h>
14#include <Storages/StorageMySQL.h>
15#include <Parsers/ASTFunction.h>
16#include <Parsers/ParserCreateQuery.h>
17#include <Parsers/parseQuery.h>
18#include <Common/setThreadName.h>
19#include <Common/escapeForFileName.h>
20#include <Parsers/queryToString.h>
21#include <Parsers/ASTCreateQuery.h>
22#include <DataTypes/convertMySQLDataType.h>
23
24#include <Poco/File.h>
25#include <Poco/DirectoryIterator.h>
26
27
28
29namespace DB
30{
31
32namespace ErrorCodes
33{
34 extern const int UNKNOWN_TABLE;
35 extern const int TABLE_IS_DROPPED;
36 extern const int TABLE_WAS_NOT_DROPPED;
37 extern const int TABLE_ALREADY_EXISTS;
38 extern const int UNEXPECTED_AST_STRUCTURE;
39}
40
41constexpr static const auto suffix = ".remove_flag";
42static constexpr const std::chrono::seconds cleaner_sleep_time{30};
43
44static String toQueryStringWithQuote(const std::vector<String> & quote_list)
45{
46 WriteBufferFromOwnString quote_list_query;
47 quote_list_query << "(";
48
49 for (size_t index = 0; index < quote_list.size(); ++index)
50 {
51 if (index)
52 quote_list_query << ",";
53
54 quote_list_query << quote << quote_list[index];
55 }
56
57 quote_list_query << ")";
58 return quote_list_query.str();
59}
60
61DatabaseMySQL::DatabaseMySQL(
62 const Context & global_context_, const String & database_name_, const String & metadata_path_,
63 const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, mysqlxx::Pool && pool)
64 : IDatabase(database_name_)
65 , global_context(global_context_)
66 , metadata_path(metadata_path_)
67 , database_engine_define(database_engine_define_->clone())
68 , database_name_in_mysql(database_name_in_mysql_)
69 , mysql_pool(std::move(pool))
70{
71}
72
73bool DatabaseMySQL::empty(const Context &) const
74{
75 std::lock_guard<std::mutex> lock(mutex);
76
77 fetchTablesIntoLocalCache();
78
79 if (local_tables_cache.empty())
80 return true;
81
82 for (const auto & [table_name, storage_info] : local_tables_cache)
83 if (!remove_or_detach_tables.count(table_name))
84 return false;
85
86 return true;
87}
88
89DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
90{
91 Tables tables;
92 std::lock_guard<std::mutex> lock(mutex);
93
94 fetchTablesIntoLocalCache();
95
96 for (const auto & [table_name, modify_time_and_storage] : local_tables_cache)
97 if (!remove_or_detach_tables.count(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
98 tables[table_name] = modify_time_and_storage.second;
99
100 return std::make_unique<DatabaseTablesSnapshotIterator>(tables);
101}
102
103bool DatabaseMySQL::isTableExist(const Context & context, const String & name) const
104{
105 return bool(tryGetTable(context, name));
106}
107
108StoragePtr DatabaseMySQL::tryGetTable(const Context &, const String & mysql_table_name) const
109{
110 std::lock_guard<std::mutex> lock(mutex);
111
112 fetchTablesIntoLocalCache();
113
114 if (!remove_or_detach_tables.count(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
115 return local_tables_cache[mysql_table_name].second;
116
117 return StoragePtr{};
118}
119
120static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr & database_engine_define)
121{
122 auto create_table_query = std::make_shared<ASTCreateQuery>();
123
124 auto table_storage_define = database_engine_define->clone();
125 create_table_query->set(create_table_query->storage, table_storage_define);
126
127 auto columns_declare_list = std::make_shared<ASTColumns>();
128 auto columns_expression_list = std::make_shared<ASTExpressionList>();
129
130 columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
131 create_table_query->set(create_table_query->columns_list, columns_declare_list);
132
133 {
134 /// init create query.
135
136 create_table_query->table = storage->getTableName();
137 create_table_query->database = storage->getDatabaseName();
138
139 for (const auto & column_type_and_name : storage->getColumns().getOrdinary())
140 {
141 const auto & column_declaration = std::make_shared<ASTColumnDeclaration>();
142 column_declaration->name = column_type_and_name.name;
143 column_declaration->type = dataTypeConvertToQuery(column_type_and_name.type);
144 columns_expression_list->children.emplace_back(column_declaration);
145 }
146
147 auto mysql_table_name = std::make_shared<ASTLiteral>(storage->getTableName());
148 auto storage_engine_arguments = table_storage_define->as<ASTStorage>()->engine->arguments;
149 storage_engine_arguments->children.insert(storage_engine_arguments->children.begin() + 2, mysql_table_name);
150 }
151
152 return create_table_query;
153}
154
155ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const Context &, const String & table_name, bool throw_on_error) const
156{
157 std::lock_guard<std::mutex> lock(mutex);
158
159 fetchTablesIntoLocalCache();
160
161 if (local_tables_cache.find(table_name) == local_tables_cache.end())
162 {
163 if (throw_on_error)
164 throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist..",
165 ErrorCodes::UNKNOWN_TABLE);
166 return nullptr;
167 }
168
169 return getCreateQueryFromStorage(local_tables_cache[table_name].second, database_engine_define);
170}
171
172time_t DatabaseMySQL::getObjectMetadataModificationTime(const String & table_name) const
173{
174 std::lock_guard<std::mutex> lock(mutex);
175
176 fetchTablesIntoLocalCache();
177
178 if (local_tables_cache.find(table_name) == local_tables_cache.end())
179 throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
180
181 return time_t(local_tables_cache[table_name].first);
182}
183
184ASTPtr DatabaseMySQL::getCreateDatabaseQuery() const
185{
186 const auto & create_query = std::make_shared<ASTCreateQuery>();
187 create_query->database = database_name;
188 create_query->set(create_query->storage, database_engine_define);
189 return create_query;
190}
191
192void DatabaseMySQL::fetchTablesIntoLocalCache() const
193{
194 const auto & tables_with_modification_time = fetchTablesWithModificationTime();
195
196 destroyLocalCacheExtraTables(tables_with_modification_time);
197 fetchLatestTablesStructureIntoCache(tables_with_modification_time);
198}
199
200void DatabaseMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const
201{
202 for (auto iterator = local_tables_cache.begin(); iterator != local_tables_cache.end();)
203 {
204 if (tables_with_modification_time.find(iterator->first) != tables_with_modification_time.end())
205 ++iterator;
206 else
207 {
208 outdated_tables.emplace_back(iterator->second.second);
209 iterator = local_tables_cache.erase(iterator);
210 }
211 }
212}
213
214void DatabaseMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> &tables_modification_time) const
215{
216 std::vector<String> wait_update_tables_name;
217 for (const auto & table_modification_time : tables_modification_time)
218 {
219 const auto & it = local_tables_cache.find(table_modification_time.first);
220
221 /// Outdated or new table structures
222 if (it == local_tables_cache.end() || table_modification_time.second > it->second.first)
223 wait_update_tables_name.emplace_back(table_modification_time.first);
224 }
225
226 std::map<String, NamesAndTypesList> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name);
227
228 for (const auto & table_and_columns : tables_and_columns)
229 {
230 const auto & table_name = table_and_columns.first;
231 const auto & columns_name_and_type = table_and_columns.second;
232 const auto & table_modification_time = tables_modification_time.at(table_name);
233
234 const auto & iterator = local_tables_cache.find(table_name);
235 if (iterator != local_tables_cache.end())
236 {
237 outdated_tables.emplace_back(iterator->second.second);
238 local_tables_cache.erase(iterator);
239 }
240
241 local_tables_cache[table_name] = std::make_pair(table_modification_time, StorageMySQL::create(
242 database_name, table_name, std::move(mysql_pool), database_name_in_mysql, table_name,
243 false, "", ColumnsDescription{columns_name_and_type}, ConstraintsDescription{}, global_context));
244 }
245}
246
247std::map<String, UInt64> DatabaseMySQL::fetchTablesWithModificationTime() const
248{
249 Block tables_status_sample_block
250 {
251 { std::make_shared<DataTypeString>(), "table_name" },
252 { std::make_shared<DataTypeDateTime>(), "modification_time" },
253 };
254
255 WriteBufferFromOwnString query;
256 query << "SELECT"
257 " TABLE_NAME AS table_name, "
258 " CREATE_TIME AS modification_time "
259 " FROM INFORMATION_SCHEMA.TABLES "
260 " WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql;
261
262 std::map<String, UInt64> tables_with_modification_time;
263 MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_status_sample_block, DEFAULT_BLOCK_SIZE);
264
265 while (Block block = result.read())
266 {
267 size_t rows = block.rows();
268 for (size_t index = 0; index < rows; ++index)
269 {
270 String table_name = (*block.getByPosition(0).column)[index].safeGet<String>();
271 tables_with_modification_time[table_name] = (*block.getByPosition(1).column)[index].safeGet<UInt64>();
272 }
273 }
274
275 return tables_with_modification_time;
276}
277
278std::map<String, NamesAndTypesList> DatabaseMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name) const
279{
280 std::map<String, NamesAndTypesList> tables_and_columns;
281
282 if (tables_name.empty())
283 return tables_and_columns;
284
285 Block tables_columns_sample_block
286 {
287 { std::make_shared<DataTypeString>(), "table_name" },
288 { std::make_shared<DataTypeString>(), "column_name" },
289 { std::make_shared<DataTypeString>(), "column_type" },
290 { std::make_shared<DataTypeUInt8>(), "is_nullable" },
291 { std::make_shared<DataTypeUInt8>(), "is_unsigned" },
292 { std::make_shared<DataTypeUInt64>(), "length" },
293 };
294
295 WriteBufferFromOwnString query;
296 query << "SELECT "
297 " TABLE_NAME AS table_name,"
298 " COLUMN_NAME AS column_name,"
299 " DATA_TYPE AS column_type,"
300 " IS_NULLABLE = 'YES' AS is_nullable,"
301 " COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
302 " CHARACTER_MAXIMUM_LENGTH AS length"
303 " FROM INFORMATION_SCHEMA.COLUMNS"
304 " WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql
305 << " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
306
307 const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls;
308 MySQLBlockInputStream result(mysql_pool.Get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE);
309 while (Block block = result.read())
310 {
311 size_t rows = block.rows();
312 for (size_t i = 0; i < rows; ++i)
313 {
314 String table_name = (*block.getByPosition(0).column)[i].safeGet<String>();
315 tables_and_columns[table_name].emplace_back((*block.getByPosition(1).column)[i].safeGet<String>(),
316 convertMySQLDataType(
317 (*block.getByPosition(2).column)[i].safeGet<String>(),
318 (*block.getByPosition(3).column)[i].safeGet<UInt64>() &&
319 external_table_functions_use_nulls,
320 (*block.getByPosition(4).column)[i].safeGet<UInt64>(),
321 (*block.getByPosition(5).column)[i].safeGet<UInt64>()));
322 }
323 }
324 return tables_and_columns;
325}
326
327void DatabaseMySQL::shutdown()
328{
329 std::map<String, ModifyTimeAndStorage> tables_snapshot;
330 {
331 std::lock_guard lock(mutex);
332 tables_snapshot = local_tables_cache;
333 }
334
335 for (const auto & [table_name, modify_time_and_storage] : tables_snapshot)
336 modify_time_and_storage.second->shutdown();
337
338 std::lock_guard lock(mutex);
339 local_tables_cache.clear();
340}
341
342void DatabaseMySQL::drop(const Context & /*context*/)
343{
344 Poco::File(getMetadataPath()).remove(true);
345}
346
347void DatabaseMySQL::cleanOutdatedTables()
348{
349 setThreadName("MySQLDBCleaner");
350
351 std::unique_lock lock{mutex};
352
353 while (!quit.load(std::memory_order_relaxed))
354 {
355 for (auto iterator = outdated_tables.begin(); iterator != outdated_tables.end();)
356 {
357 if (!iterator->unique())
358 ++iterator;
359 else
360 {
361 const auto table_lock = (*iterator)->lockAlterIntention(RWLockImpl::NO_QUERY);
362
363 (*iterator)->shutdown();
364 (*iterator)->is_dropped = true;
365 iterator = outdated_tables.erase(iterator);
366 }
367 }
368
369 cond.wait_for(lock, cleaner_sleep_time);
370 }
371}
372
373void DatabaseMySQL::attachTable(const String & table_name, const StoragePtr & storage)
374{
375 std::lock_guard<std::mutex> lock{mutex};
376
377 if (!local_tables_cache.count(table_name))
378 throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
379 " because it does not exist.", ErrorCodes::UNKNOWN_TABLE);
380
381 if (!remove_or_detach_tables.count(table_name))
382 throw Exception("Cannot attach table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) +
383 " because it already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
384
385 /// We use the new storage to replace the original storage, because the original storage may have been dropped
386 /// Although we still keep its
387 local_tables_cache[table_name].second = storage;
388
389 remove_or_detach_tables.erase(table_name);
390 Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
391
392 if (remove_flag.exists())
393 remove_flag.remove();
394}
395
396StoragePtr DatabaseMySQL::detachTable(const String & table_name)
397{
398 std::lock_guard<std::mutex> lock{mutex};
399
400 if (remove_or_detach_tables.count(table_name))
401 throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
402 ErrorCodes::TABLE_IS_DROPPED);
403
404 if (!local_tables_cache.count(table_name))
405 throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
406 ErrorCodes::UNKNOWN_TABLE);
407
408 remove_or_detach_tables.emplace(table_name);
409 return local_tables_cache[table_name].second;
410}
411
412String DatabaseMySQL::getMetadataPath() const
413{
414 return metadata_path;
415}
416
417void DatabaseMySQL::loadStoredObjects(Context &, bool)
418{
419
420 std::lock_guard<std::mutex> lock{mutex};
421 Poco::DirectoryIterator iterator(getMetadataPath());
422
423 for (Poco::DirectoryIterator end; iterator != end; ++iterator)
424 {
425 if (iterator->isFile() && endsWith(iterator.name(), suffix))
426 {
427 const auto & filename = iterator.name();
428 const auto & table_name = unescapeForFileName(filename.substr(0, filename.size() - strlen(suffix)));
429 remove_or_detach_tables.emplace(table_name);
430 }
431 }
432}
433
434void DatabaseMySQL::removeTable(const Context &, const String & table_name)
435{
436 std::lock_guard<std::mutex> lock{mutex};
437
438 Poco::File remove_flag(getMetadataPath() + '/' + escapeForFileName(table_name) + suffix);
439
440 if (remove_or_detach_tables.count(table_name))
441 throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " is dropped",
442 ErrorCodes::TABLE_IS_DROPPED);
443
444 if (remove_flag.exists())
445 throw Exception("The remove flag file already exists but the " + backQuoteIfNeed(getDatabaseName()) +
446 "." + backQuoteIfNeed(table_name) + " does not exists remove tables, it is bug.", ErrorCodes::LOGICAL_ERROR);
447
448 if (!local_tables_cache.count(table_name))
449 throw Exception("Table " + backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name) + " doesn't exist.",
450 ErrorCodes::UNKNOWN_TABLE);
451
452 remove_or_detach_tables.emplace(table_name);
453
454 try
455 {
456 remove_flag.createFile();
457 }
458 catch (...)
459 {
460 remove_or_detach_tables.erase(table_name);
461 throw;
462 }
463}
464
465DatabaseMySQL::~DatabaseMySQL()
466{
467 try
468 {
469 if (!quit)
470 {
471 {
472 quit = true;
473 std::lock_guard lock{mutex};
474 }
475 cond.notify_one();
476 thread.join();
477 }
478
479 shutdown();
480 }
481 catch (...)
482 {
483 tryLogCurrentException(__PRETTY_FUNCTION__);
484 }
485}
486
487void DatabaseMySQL::createTable(const Context & context, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query)
488{
489 const auto & create = create_query->as<ASTCreateQuery>();
490
491 if (!create->attach)
492 throw Exception("MySQL database engine does not support create table. for tables that were detach or dropped before, "
493 "you can use attach to add them back to the MySQL database", ErrorCodes::NOT_IMPLEMENTED);
494
495 /// XXX: hack
496 /// In order to prevent users from broken the table structure by executing attach table database_name.table_name (...)
497 /// we should compare the old and new create_query to make them completely consistent
498 const auto & origin_create_query = getCreateTableQuery(context, table_name);
499 origin_create_query->as<ASTCreateQuery>()->attach = true;
500
501 if (queryToString(origin_create_query) != queryToString(create_query))
502 throw Exception("The MySQL database engine can only execute attach statements of type attach table database_name.table_name",
503 ErrorCodes::UNEXPECTED_AST_STRUCTURE);
504
505 attachTable(table_name, storage);
506}
507
508}
509
510#endif
511