1#include "config_core.h"
2#if USE_MYSQL
3
4#include <DataTypes/DataTypesNumber.h>
5#include <DataTypes/DataTypeString.h>
6#include <Formats/MySQLBlockInputStream.h>
7#include <Interpreters/evaluateConstantExpression.h>
8#include <Parsers/ASTFunction.h>
9#include <Parsers/ASTLiteral.h>
10#include <Storages/StorageMySQL.h>
11#include <TableFunctions/ITableFunction.h>
12#include <TableFunctions/TableFunctionFactory.h>
13#include <TableFunctions/TableFunctionMySQL.h>
14#include <Core/Defines.h>
15#include <Common/Exception.h>
16#include <Common/parseAddress.h>
17#include <Common/quoteString.h>
18#include <DataTypes/convertMySQLDataType.h>
19#include <IO/Operators.h>
20#include "registerTableFunctions.h"
21
22#include <mysqlxx/Pool.h>
23
24
25namespace DB
26{
27
28namespace ErrorCodes
29{
30 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
31 extern const int BAD_ARGUMENTS;
32 extern const int UNKNOWN_TABLE;
33}
34
35
36StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
37{
38 const auto & args_func = ast_function->as<ASTFunction &>();
39
40 if (!args_func.arguments)
41 throw Exception("Table function 'mysql' must have arguments.", ErrorCodes::LOGICAL_ERROR);
42
43 ASTs & args = args_func.arguments->children;
44
45 if (args.size() < 5 || args.size() > 7)
46 throw Exception("Table function 'mysql' requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).",
47 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
48
49 for (size_t i = 0; i < args.size(); ++i)
50 args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
51
52 std::string host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
53 std::string remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
54 std::string remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
55 std::string user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
56 std::string password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
57
58 bool replace_query = false;
59 std::string on_duplicate_clause;
60 if (args.size() >= 6)
61 replace_query = args[5]->as<ASTLiteral &>().value.safeGet<UInt64>() > 0;
62 if (args.size() == 7)
63 on_duplicate_clause = args[6]->as<ASTLiteral &>().value.safeGet<String>();
64
65 if (replace_query && !on_duplicate_clause.empty())
66 throw Exception(
67 "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them",
68 ErrorCodes::BAD_ARGUMENTS);
69
70 /// 3306 is the default MySQL port number
71 auto parsed_host_port = parseAddress(host_port, 3306);
72
73 mysqlxx::Pool pool(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
74
75 /// Determine table definition by running a query to INFORMATION_SCHEMA.
76
77 Block sample_block
78 {
79 { std::make_shared<DataTypeString>(), "name" },
80 { std::make_shared<DataTypeString>(), "type" },
81 { std::make_shared<DataTypeUInt8>(), "is_nullable" },
82 { std::make_shared<DataTypeUInt8>(), "is_unsigned" },
83 { std::make_shared<DataTypeUInt64>(), "length" },
84 };
85
86 WriteBufferFromOwnString query;
87 query << "SELECT"
88 " COLUMN_NAME AS name,"
89 " DATA_TYPE AS type,"
90 " IS_NULLABLE = 'YES' AS is_nullable,"
91 " COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
92 " CHARACTER_MAXIMUM_LENGTH AS length"
93 " FROM INFORMATION_SCHEMA.COLUMNS"
94 " WHERE TABLE_SCHEMA = " << quote << remote_database_name
95 << " AND TABLE_NAME = " << quote << remote_table_name
96 << " ORDER BY ORDINAL_POSITION";
97
98 NamesAndTypesList columns;
99 MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE);
100 while (Block block = result.read())
101 {
102 size_t rows = block.rows();
103 for (size_t i = 0; i < rows; ++i)
104 columns.emplace_back(
105 (*block.getByPosition(0).column)[i].safeGet<String>(),
106 convertMySQLDataType(
107 (*block.getByPosition(1).column)[i].safeGet<String>(),
108 (*block.getByPosition(2).column)[i].safeGet<UInt64>() && context.getSettings().external_table_functions_use_nulls,
109 (*block.getByPosition(3).column)[i].safeGet<UInt64>(),
110 (*block.getByPosition(4).column)[i].safeGet<UInt64>()));
111
112 }
113
114 if (columns.empty())
115 throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
116
117 auto res = StorageMySQL::create(
118 getDatabaseName(),
119 table_name,
120 std::move(pool),
121 remote_database_name,
122 remote_table_name,
123 replace_query,
124 on_duplicate_clause,
125 ColumnsDescription{columns},
126 ConstraintsDescription{},
127 context);
128
129 res->startup();
130 return res;
131}
132
133
134void registerTableFunctionMySQL(TableFunctionFactory & factory)
135{
136 factory.registerFunction<TableFunctionMySQL>();
137}
138}
139
140#endif
141