1 | #include "config_core.h" |
2 | #if USE_MYSQL |
3 | |
4 | #include <DataTypes/DataTypesNumber.h> |
5 | #include <DataTypes/DataTypeString.h> |
6 | #include <Formats/MySQLBlockInputStream.h> |
7 | #include <Interpreters/evaluateConstantExpression.h> |
8 | #include <Parsers/ASTFunction.h> |
9 | #include <Parsers/ASTLiteral.h> |
10 | #include <Storages/StorageMySQL.h> |
11 | #include <TableFunctions/ITableFunction.h> |
12 | #include <TableFunctions/TableFunctionFactory.h> |
13 | #include <TableFunctions/TableFunctionMySQL.h> |
14 | #include <Core/Defines.h> |
15 | #include <Common/Exception.h> |
16 | #include <Common/parseAddress.h> |
17 | #include <Common/quoteString.h> |
18 | #include <DataTypes/convertMySQLDataType.h> |
19 | #include <IO/Operators.h> |
20 | #include "registerTableFunctions.h" |
21 | |
22 | #include <mysqlxx/Pool.h> |
23 | |
24 | |
25 | namespace DB |
26 | { |
27 | |
28 | namespace ErrorCodes |
29 | { |
30 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
31 | extern const int BAD_ARGUMENTS; |
32 | extern const int UNKNOWN_TABLE; |
33 | } |
34 | |
35 | |
36 | StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const |
37 | { |
38 | const auto & args_func = ast_function->as<ASTFunction &>(); |
39 | |
40 | if (!args_func.arguments) |
41 | throw Exception("Table function 'mysql' must have arguments." , ErrorCodes::LOGICAL_ERROR); |
42 | |
43 | ASTs & args = args_func.arguments->children; |
44 | |
45 | if (args.size() < 5 || args.size() > 7) |
46 | throw Exception("Table function 'mysql' requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause'])." , |
47 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
48 | |
49 | for (size_t i = 0; i < args.size(); ++i) |
50 | args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context); |
51 | |
52 | std::string host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
53 | std::string remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
54 | std::string remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
55 | std::string user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>(); |
56 | std::string password = args[4]->as<ASTLiteral &>().value.safeGet<String>(); |
57 | |
58 | bool replace_query = false; |
59 | std::string on_duplicate_clause; |
60 | if (args.size() >= 6) |
61 | replace_query = args[5]->as<ASTLiteral &>().value.safeGet<UInt64>() > 0; |
62 | if (args.size() == 7) |
63 | on_duplicate_clause = args[6]->as<ASTLiteral &>().value.safeGet<String>(); |
64 | |
65 | if (replace_query && !on_duplicate_clause.empty()) |
66 | throw Exception( |
67 | "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them" , |
68 | ErrorCodes::BAD_ARGUMENTS); |
69 | |
70 | /// 3306 is the default MySQL port number |
71 | auto parsed_host_port = parseAddress(host_port, 3306); |
72 | |
73 | mysqlxx::Pool pool(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second); |
74 | |
75 | /// Determine table definition by running a query to INFORMATION_SCHEMA. |
76 | |
77 | Block sample_block |
78 | { |
79 | { std::make_shared<DataTypeString>(), "name" }, |
80 | { std::make_shared<DataTypeString>(), "type" }, |
81 | { std::make_shared<DataTypeUInt8>(), "is_nullable" }, |
82 | { std::make_shared<DataTypeUInt8>(), "is_unsigned" }, |
83 | { std::make_shared<DataTypeUInt64>(), "length" }, |
84 | }; |
85 | |
86 | WriteBufferFromOwnString query; |
87 | query << "SELECT" |
88 | " COLUMN_NAME AS name," |
89 | " DATA_TYPE AS type," |
90 | " IS_NULLABLE = 'YES' AS is_nullable," |
91 | " COLUMN_TYPE LIKE '%unsigned' AS is_unsigned," |
92 | " CHARACTER_MAXIMUM_LENGTH AS length" |
93 | " FROM INFORMATION_SCHEMA.COLUMNS" |
94 | " WHERE TABLE_SCHEMA = " << quote << remote_database_name |
95 | << " AND TABLE_NAME = " << quote << remote_table_name |
96 | << " ORDER BY ORDINAL_POSITION" ; |
97 | |
98 | NamesAndTypesList columns; |
99 | MySQLBlockInputStream result(pool.Get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE); |
100 | while (Block block = result.read()) |
101 | { |
102 | size_t rows = block.rows(); |
103 | for (size_t i = 0; i < rows; ++i) |
104 | columns.emplace_back( |
105 | (*block.getByPosition(0).column)[i].safeGet<String>(), |
106 | convertMySQLDataType( |
107 | (*block.getByPosition(1).column)[i].safeGet<String>(), |
108 | (*block.getByPosition(2).column)[i].safeGet<UInt64>() && context.getSettings().external_table_functions_use_nulls, |
109 | (*block.getByPosition(3).column)[i].safeGet<UInt64>(), |
110 | (*block.getByPosition(4).column)[i].safeGet<UInt64>())); |
111 | |
112 | } |
113 | |
114 | if (columns.empty()) |
115 | throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
116 | |
117 | auto res = StorageMySQL::create( |
118 | getDatabaseName(), |
119 | table_name, |
120 | std::move(pool), |
121 | remote_database_name, |
122 | remote_table_name, |
123 | replace_query, |
124 | on_duplicate_clause, |
125 | ColumnsDescription{columns}, |
126 | ConstraintsDescription{}, |
127 | context); |
128 | |
129 | res->startup(); |
130 | return res; |
131 | } |
132 | |
133 | |
134 | void registerTableFunctionMySQL(TableFunctionFactory & factory) |
135 | { |
136 | factory.registerFunction<TableFunctionMySQL>(); |
137 | } |
138 | } |
139 | |
140 | #endif |
141 | |