1#include "StorageMySQL.h"
2
3#if USE_MYSQL
4
5#include <Storages/StorageFactory.h>
6#include <Storages/transformQueryForExternalDatabase.h>
7#include <Formats/MySQLBlockInputStream.h>
8#include <Interpreters/evaluateConstantExpression.h>
9#include <Core/Settings.h>
10#include <Interpreters/Context.h>
11#include <DataStreams/IBlockOutputStream.h>
12#include <Formats/FormatFactory.h>
13#include <Common/parseAddress.h>
14#include <IO/Operators.h>
15#include <IO/WriteHelpers.h>
16#include <Parsers/ASTLiteral.h>
17#include <mysqlxx/Transaction.h>
18
19
20namespace DB
21{
22
23namespace ErrorCodes
24{
25 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
26 extern const int BAD_ARGUMENTS;
27}
28
29static String backQuoteMySQL(const String & x)
30{
31 String res(x.size(), '\0');
32 {
33 WriteBufferFromString wb(res);
34 writeBackQuotedStringMySQL(x, wb);
35 }
36 return res;
37}
38
39StorageMySQL::StorageMySQL(
40 const std::string & database_name_,
41 const std::string & table_name_,
42 mysqlxx::Pool && pool_,
43 const std::string & remote_database_name_,
44 const std::string & remote_table_name_,
45 const bool replace_query_,
46 const std::string & on_duplicate_clause_,
47 const ColumnsDescription & columns_,
48 const ConstraintsDescription & constraints_,
49 const Context & context_)
50 : table_name(table_name_)
51 , database_name(database_name_)
52 , remote_database_name(remote_database_name_)
53 , remote_table_name(remote_table_name_)
54 , replace_query{replace_query_}
55 , on_duplicate_clause{on_duplicate_clause_}
56 , pool(std::move(pool_))
57 , global_context(context_)
58{
59 setColumns(columns_);
60 setConstraints(constraints_);
61}
62
63
64BlockInputStreams StorageMySQL::read(
65 const Names & column_names_,
66 const SelectQueryInfo & query_info_,
67 const Context & context_,
68 QueryProcessingStage::Enum /*processed_stage*/,
69 size_t max_block_size_,
70 unsigned)
71{
72 check(column_names_);
73 String query = transformQueryForExternalDatabase(
74 *query_info_.query, getColumns().getOrdinary(), IdentifierQuotingStyle::BackticksMySQL, remote_database_name, remote_table_name, context_);
75
76 Block sample_block;
77 for (const String & column_name : column_names_)
78 {
79 auto column_data = getColumn(column_name);
80 sample_block.insert({ column_data.type, column_data.name });
81 }
82
83 return { std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size_) };
84}
85
86
87class StorageMySQLBlockOutputStream : public IBlockOutputStream
88{
89public:
90 explicit StorageMySQLBlockOutputStream(const StorageMySQL & storage_,
91 const std::string & remote_database_name_,
92 const std::string & remote_table_name_,
93 const mysqlxx::PoolWithFailover::Entry & entry_,
94 const size_t & mysql_max_rows_to_insert)
95 : storage{storage_}
96 , remote_database_name{remote_database_name_}
97 , remote_table_name{remote_table_name_}
98 , entry{entry_}
99 , max_batch_rows{mysql_max_rows_to_insert}
100 {
101 }
102
103 Block getHeader() const override { return storage.getSampleBlock(); }
104
105 void write(const Block & block) override
106 {
107 auto blocks = splitBlocks(block, max_batch_rows);
108 mysqlxx::Transaction trans(entry);
109 try
110 {
111 for (const Block & batch_data : blocks)
112 {
113 writeBlockData(batch_data);
114 }
115 trans.commit();
116 }
117 catch (...)
118 {
119 trans.rollback();
120 throw;
121 }
122 }
123
124 void writeBlockData(const Block & block)
125 {
126 WriteBufferFromOwnString sqlbuf;
127 sqlbuf << (storage.replace_query ? "REPLACE" : "INSERT") << " INTO ";
128 sqlbuf << backQuoteMySQL(remote_database_name) << "." << backQuoteMySQL(remote_table_name);
129 sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES ";
130
131 auto writer = FormatFactory::instance().getOutput("Values", sqlbuf, storage.getSampleBlock(), storage.global_context);
132 writer->write(block);
133
134 if (!storage.on_duplicate_clause.empty())
135 sqlbuf << " ON DUPLICATE KEY " << storage.on_duplicate_clause;
136
137 sqlbuf << ";";
138
139 auto query = this->entry->query(sqlbuf.str());
140 query.execute();
141 }
142
143 Blocks splitBlocks(const Block & block, const size_t & max_rows) const
144 {
145 /// Avoid Excessive copy when block is small enough
146 if (block.rows() <= max_rows)
147 return Blocks{std::move(block)};
148
149 const size_t splited_block_size = ceil(block.rows() * 1.0 / max_rows);
150 Blocks splitted_blocks(splited_block_size);
151
152 for (size_t idx = 0; idx < splited_block_size; ++idx)
153 splitted_blocks[idx] = block.cloneEmpty();
154
155 const size_t columns = block.columns();
156 const size_t rows = block.rows();
157 size_t offsets = 0;
158 UInt64 limits = max_batch_rows;
159 for (size_t idx = 0; idx < splited_block_size; ++idx)
160 {
161 /// For last batch, limits should be the remain size
162 if (idx == splited_block_size - 1) limits = rows - offsets;
163 for (size_t col_idx = 0; col_idx < columns; ++col_idx)
164 {
165 splitted_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits);
166 }
167 offsets += max_batch_rows;
168 }
169
170 return splitted_blocks;
171 }
172
173 std::string dumpNamesWithBackQuote(const Block & block) const
174 {
175 WriteBufferFromOwnString out;
176 for (auto it = block.begin(); it != block.end(); ++it)
177 {
178 if (it != block.begin())
179 out << ", ";
180 out << backQuoteMySQL(it->name);
181 }
182 return out.str();
183 }
184
185
186private:
187 const StorageMySQL & storage;
188 std::string remote_database_name;
189 std::string remote_table_name;
190 mysqlxx::PoolWithFailover::Entry entry;
191 size_t max_batch_rows;
192};
193
194
195BlockOutputStreamPtr StorageMySQL::write(
196 const ASTPtr & /*query*/, const Context & context)
197{
198 return std::make_shared<StorageMySQLBlockOutputStream>(*this, remote_database_name, remote_table_name, pool.Get(), context.getSettingsRef().mysql_max_rows_to_insert);
199}
200
201void registerStorageMySQL(StorageFactory & factory)
202{
203 factory.registerStorage("MySQL", [](const StorageFactory::Arguments & args)
204 {
205 ASTs & engine_args = args.engine_args;
206
207 if (engine_args.size() < 5 || engine_args.size() > 7)
208 throw Exception(
209 "Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).",
210 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
211
212 for (size_t i = 0; i < engine_args.size(); ++i)
213 engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
214
215 /// 3306 is the default MySQL port.
216 auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 3306);
217
218 const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
219 const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
220 const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
221 const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
222
223 mysqlxx::Pool pool(remote_database, parsed_host_port.first, username, password, parsed_host_port.second);
224
225 bool replace_query = false;
226 std::string on_duplicate_clause;
227 if (engine_args.size() >= 6)
228 replace_query = engine_args[5]->as<ASTLiteral &>().value.safeGet<UInt64>();
229 if (engine_args.size() == 7)
230 on_duplicate_clause = engine_args[6]->as<ASTLiteral &>().value.safeGet<String>();
231
232 if (replace_query && !on_duplicate_clause.empty())
233 throw Exception(
234 "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them",
235 ErrorCodes::BAD_ARGUMENTS);
236
237 return StorageMySQL::create(
238 args.database_name,
239 args.table_name,
240 std::move(pool),
241 remote_database,
242 remote_table,
243 replace_query,
244 on_duplicate_clause,
245 args.columns,
246 args.constraints,
247 args.context);
248 });
249}
250
251}
252
253#endif
254