1 | #include "StorageMySQL.h" |
2 | |
3 | #if USE_MYSQL |
4 | |
5 | #include <Storages/StorageFactory.h> |
6 | #include <Storages/transformQueryForExternalDatabase.h> |
7 | #include <Formats/MySQLBlockInputStream.h> |
8 | #include <Interpreters/evaluateConstantExpression.h> |
9 | #include <Core/Settings.h> |
10 | #include <Interpreters/Context.h> |
11 | #include <DataStreams/IBlockOutputStream.h> |
12 | #include <Formats/FormatFactory.h> |
13 | #include <Common/parseAddress.h> |
14 | #include <IO/Operators.h> |
15 | #include <IO/WriteHelpers.h> |
16 | #include <Parsers/ASTLiteral.h> |
17 | #include <mysqlxx/Transaction.h> |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | namespace ErrorCodes |
24 | { |
25 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
26 | extern const int BAD_ARGUMENTS; |
27 | } |
28 | |
29 | static String backQuoteMySQL(const String & x) |
30 | { |
31 | String res(x.size(), '\0'); |
32 | { |
33 | WriteBufferFromString wb(res); |
34 | writeBackQuotedStringMySQL(x, wb); |
35 | } |
36 | return res; |
37 | } |
38 | |
39 | StorageMySQL::StorageMySQL( |
40 | const std::string & database_name_, |
41 | const std::string & table_name_, |
42 | mysqlxx::Pool && pool_, |
43 | const std::string & remote_database_name_, |
44 | const std::string & remote_table_name_, |
45 | const bool replace_query_, |
46 | const std::string & on_duplicate_clause_, |
47 | const ColumnsDescription & columns_, |
48 | const ConstraintsDescription & constraints_, |
49 | const Context & context_) |
50 | : table_name(table_name_) |
51 | , database_name(database_name_) |
52 | , remote_database_name(remote_database_name_) |
53 | , remote_table_name(remote_table_name_) |
54 | , replace_query{replace_query_} |
55 | , on_duplicate_clause{on_duplicate_clause_} |
56 | , pool(std::move(pool_)) |
57 | , global_context(context_) |
58 | { |
59 | setColumns(columns_); |
60 | setConstraints(constraints_); |
61 | } |
62 | |
63 | |
64 | BlockInputStreams StorageMySQL::read( |
65 | const Names & column_names_, |
66 | const SelectQueryInfo & query_info_, |
67 | const Context & context_, |
68 | QueryProcessingStage::Enum /*processed_stage*/, |
69 | size_t max_block_size_, |
70 | unsigned) |
71 | { |
72 | check(column_names_); |
73 | String query = transformQueryForExternalDatabase( |
74 | *query_info_.query, getColumns().getOrdinary(), IdentifierQuotingStyle::BackticksMySQL, remote_database_name, remote_table_name, context_); |
75 | |
76 | Block sample_block; |
77 | for (const String & column_name : column_names_) |
78 | { |
79 | auto column_data = getColumn(column_name); |
80 | sample_block.insert({ column_data.type, column_data.name }); |
81 | } |
82 | |
83 | return { std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size_) }; |
84 | } |
85 | |
86 | |
87 | class StorageMySQLBlockOutputStream : public IBlockOutputStream |
88 | { |
89 | public: |
90 | explicit StorageMySQLBlockOutputStream(const StorageMySQL & storage_, |
91 | const std::string & remote_database_name_, |
92 | const std::string & remote_table_name_, |
93 | const mysqlxx::PoolWithFailover::Entry & entry_, |
94 | const size_t & mysql_max_rows_to_insert) |
95 | : storage{storage_} |
96 | , remote_database_name{remote_database_name_} |
97 | , remote_table_name{remote_table_name_} |
98 | , entry{entry_} |
99 | , max_batch_rows{mysql_max_rows_to_insert} |
100 | { |
101 | } |
102 | |
103 | Block () const override { return storage.getSampleBlock(); } |
104 | |
105 | void write(const Block & block) override |
106 | { |
107 | auto blocks = splitBlocks(block, max_batch_rows); |
108 | mysqlxx::Transaction trans(entry); |
109 | try |
110 | { |
111 | for (const Block & batch_data : blocks) |
112 | { |
113 | writeBlockData(batch_data); |
114 | } |
115 | trans.commit(); |
116 | } |
117 | catch (...) |
118 | { |
119 | trans.rollback(); |
120 | throw; |
121 | } |
122 | } |
123 | |
124 | void writeBlockData(const Block & block) |
125 | { |
126 | WriteBufferFromOwnString sqlbuf; |
127 | sqlbuf << (storage.replace_query ? "REPLACE" : "INSERT" ) << " INTO " ; |
128 | sqlbuf << backQuoteMySQL(remote_database_name) << "." << backQuoteMySQL(remote_table_name); |
129 | sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES " ; |
130 | |
131 | auto writer = FormatFactory::instance().getOutput("Values" , sqlbuf, storage.getSampleBlock(), storage.global_context); |
132 | writer->write(block); |
133 | |
134 | if (!storage.on_duplicate_clause.empty()) |
135 | sqlbuf << " ON DUPLICATE KEY " << storage.on_duplicate_clause; |
136 | |
137 | sqlbuf << ";" ; |
138 | |
139 | auto query = this->entry->query(sqlbuf.str()); |
140 | query.execute(); |
141 | } |
142 | |
143 | Blocks splitBlocks(const Block & block, const size_t & max_rows) const |
144 | { |
145 | /// Avoid Excessive copy when block is small enough |
146 | if (block.rows() <= max_rows) |
147 | return Blocks{std::move(block)}; |
148 | |
149 | const size_t splited_block_size = ceil(block.rows() * 1.0 / max_rows); |
150 | Blocks splitted_blocks(splited_block_size); |
151 | |
152 | for (size_t idx = 0; idx < splited_block_size; ++idx) |
153 | splitted_blocks[idx] = block.cloneEmpty(); |
154 | |
155 | const size_t columns = block.columns(); |
156 | const size_t rows = block.rows(); |
157 | size_t offsets = 0; |
158 | UInt64 limits = max_batch_rows; |
159 | for (size_t idx = 0; idx < splited_block_size; ++idx) |
160 | { |
161 | /// For last batch, limits should be the remain size |
162 | if (idx == splited_block_size - 1) limits = rows - offsets; |
163 | for (size_t col_idx = 0; col_idx < columns; ++col_idx) |
164 | { |
165 | splitted_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); |
166 | } |
167 | offsets += max_batch_rows; |
168 | } |
169 | |
170 | return splitted_blocks; |
171 | } |
172 | |
173 | std::string dumpNamesWithBackQuote(const Block & block) const |
174 | { |
175 | WriteBufferFromOwnString out; |
176 | for (auto it = block.begin(); it != block.end(); ++it) |
177 | { |
178 | if (it != block.begin()) |
179 | out << ", " ; |
180 | out << backQuoteMySQL(it->name); |
181 | } |
182 | return out.str(); |
183 | } |
184 | |
185 | |
186 | private: |
187 | const StorageMySQL & storage; |
188 | std::string remote_database_name; |
189 | std::string remote_table_name; |
190 | mysqlxx::PoolWithFailover::Entry entry; |
191 | size_t max_batch_rows; |
192 | }; |
193 | |
194 | |
195 | BlockOutputStreamPtr StorageMySQL::write( |
196 | const ASTPtr & /*query*/, const Context & context) |
197 | { |
198 | return std::make_shared<StorageMySQLBlockOutputStream>(*this, remote_database_name, remote_table_name, pool.Get(), context.getSettingsRef().mysql_max_rows_to_insert); |
199 | } |
200 | |
201 | void registerStorageMySQL(StorageFactory & factory) |
202 | { |
203 | factory.registerStorage("MySQL" , [](const StorageFactory::Arguments & args) |
204 | { |
205 | ASTs & engine_args = args.engine_args; |
206 | |
207 | if (engine_args.size() < 5 || engine_args.size() > 7) |
208 | throw Exception( |
209 | "Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause'])." , |
210 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
211 | |
212 | for (size_t i = 0; i < engine_args.size(); ++i) |
213 | engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context); |
214 | |
215 | /// 3306 is the default MySQL port. |
216 | auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 3306); |
217 | |
218 | const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
219 | const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
220 | const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(); |
221 | const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>(); |
222 | |
223 | mysqlxx::Pool pool(remote_database, parsed_host_port.first, username, password, parsed_host_port.second); |
224 | |
225 | bool replace_query = false; |
226 | std::string on_duplicate_clause; |
227 | if (engine_args.size() >= 6) |
228 | replace_query = engine_args[5]->as<ASTLiteral &>().value.safeGet<UInt64>(); |
229 | if (engine_args.size() == 7) |
230 | on_duplicate_clause = engine_args[6]->as<ASTLiteral &>().value.safeGet<String>(); |
231 | |
232 | if (replace_query && !on_duplicate_clause.empty()) |
233 | throw Exception( |
234 | "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them" , |
235 | ErrorCodes::BAD_ARGUMENTS); |
236 | |
237 | return StorageMySQL::create( |
238 | args.database_name, |
239 | args.table_name, |
240 | std::move(pool), |
241 | remote_database, |
242 | remote_table, |
243 | replace_query, |
244 | on_duplicate_clause, |
245 | args.columns, |
246 | args.constraints, |
247 | args.context); |
248 | }); |
249 | } |
250 | |
251 | } |
252 | |
253 | #endif |
254 | |