| 1 | #include "StorageMySQL.h" |
| 2 | |
| 3 | #if USE_MYSQL |
| 4 | |
| 5 | #include <Storages/StorageFactory.h> |
| 6 | #include <Storages/transformQueryForExternalDatabase.h> |
| 7 | #include <Formats/MySQLBlockInputStream.h> |
| 8 | #include <Interpreters/evaluateConstantExpression.h> |
| 9 | #include <Core/Settings.h> |
| 10 | #include <Interpreters/Context.h> |
| 11 | #include <DataStreams/IBlockOutputStream.h> |
| 12 | #include <Formats/FormatFactory.h> |
| 13 | #include <Common/parseAddress.h> |
| 14 | #include <IO/Operators.h> |
| 15 | #include <IO/WriteHelpers.h> |
| 16 | #include <Parsers/ASTLiteral.h> |
| 17 | #include <mysqlxx/Transaction.h> |
| 18 | |
| 19 | |
| 20 | namespace DB |
| 21 | { |
| 22 | |
| 23 | namespace ErrorCodes |
| 24 | { |
| 25 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 26 | extern const int BAD_ARGUMENTS; |
| 27 | } |
| 28 | |
| 29 | static String backQuoteMySQL(const String & x) |
| 30 | { |
| 31 | String res(x.size(), '\0'); |
| 32 | { |
| 33 | WriteBufferFromString wb(res); |
| 34 | writeBackQuotedStringMySQL(x, wb); |
| 35 | } |
| 36 | return res; |
| 37 | } |
| 38 | |
| 39 | StorageMySQL::StorageMySQL( |
| 40 | const std::string & database_name_, |
| 41 | const std::string & table_name_, |
| 42 | mysqlxx::Pool && pool_, |
| 43 | const std::string & remote_database_name_, |
| 44 | const std::string & remote_table_name_, |
| 45 | const bool replace_query_, |
| 46 | const std::string & on_duplicate_clause_, |
| 47 | const ColumnsDescription & columns_, |
| 48 | const ConstraintsDescription & constraints_, |
| 49 | const Context & context_) |
| 50 | : table_name(table_name_) |
| 51 | , database_name(database_name_) |
| 52 | , remote_database_name(remote_database_name_) |
| 53 | , remote_table_name(remote_table_name_) |
| 54 | , replace_query{replace_query_} |
| 55 | , on_duplicate_clause{on_duplicate_clause_} |
| 56 | , pool(std::move(pool_)) |
| 57 | , global_context(context_) |
| 58 | { |
| 59 | setColumns(columns_); |
| 60 | setConstraints(constraints_); |
| 61 | } |
| 62 | |
| 63 | |
| 64 | BlockInputStreams StorageMySQL::read( |
| 65 | const Names & column_names_, |
| 66 | const SelectQueryInfo & query_info_, |
| 67 | const Context & context_, |
| 68 | QueryProcessingStage::Enum /*processed_stage*/, |
| 69 | size_t max_block_size_, |
| 70 | unsigned) |
| 71 | { |
| 72 | check(column_names_); |
| 73 | String query = transformQueryForExternalDatabase( |
| 74 | *query_info_.query, getColumns().getOrdinary(), IdentifierQuotingStyle::BackticksMySQL, remote_database_name, remote_table_name, context_); |
| 75 | |
| 76 | Block sample_block; |
| 77 | for (const String & column_name : column_names_) |
| 78 | { |
| 79 | auto column_data = getColumn(column_name); |
| 80 | sample_block.insert({ column_data.type, column_data.name }); |
| 81 | } |
| 82 | |
| 83 | return { std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size_) }; |
| 84 | } |
| 85 | |
| 86 | |
| 87 | class StorageMySQLBlockOutputStream : public IBlockOutputStream |
| 88 | { |
| 89 | public: |
| 90 | explicit StorageMySQLBlockOutputStream(const StorageMySQL & storage_, |
| 91 | const std::string & remote_database_name_, |
| 92 | const std::string & remote_table_name_, |
| 93 | const mysqlxx::PoolWithFailover::Entry & entry_, |
| 94 | const size_t & mysql_max_rows_to_insert) |
| 95 | : storage{storage_} |
| 96 | , remote_database_name{remote_database_name_} |
| 97 | , remote_table_name{remote_table_name_} |
| 98 | , entry{entry_} |
| 99 | , max_batch_rows{mysql_max_rows_to_insert} |
| 100 | { |
| 101 | } |
| 102 | |
| 103 | Block () const override { return storage.getSampleBlock(); } |
| 104 | |
| 105 | void write(const Block & block) override |
| 106 | { |
| 107 | auto blocks = splitBlocks(block, max_batch_rows); |
| 108 | mysqlxx::Transaction trans(entry); |
| 109 | try |
| 110 | { |
| 111 | for (const Block & batch_data : blocks) |
| 112 | { |
| 113 | writeBlockData(batch_data); |
| 114 | } |
| 115 | trans.commit(); |
| 116 | } |
| 117 | catch (...) |
| 118 | { |
| 119 | trans.rollback(); |
| 120 | throw; |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | void writeBlockData(const Block & block) |
| 125 | { |
| 126 | WriteBufferFromOwnString sqlbuf; |
| 127 | sqlbuf << (storage.replace_query ? "REPLACE" : "INSERT" ) << " INTO " ; |
| 128 | sqlbuf << backQuoteMySQL(remote_database_name) << "." << backQuoteMySQL(remote_table_name); |
| 129 | sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES " ; |
| 130 | |
| 131 | auto writer = FormatFactory::instance().getOutput("Values" , sqlbuf, storage.getSampleBlock(), storage.global_context); |
| 132 | writer->write(block); |
| 133 | |
| 134 | if (!storage.on_duplicate_clause.empty()) |
| 135 | sqlbuf << " ON DUPLICATE KEY " << storage.on_duplicate_clause; |
| 136 | |
| 137 | sqlbuf << ";" ; |
| 138 | |
| 139 | auto query = this->entry->query(sqlbuf.str()); |
| 140 | query.execute(); |
| 141 | } |
| 142 | |
| 143 | Blocks splitBlocks(const Block & block, const size_t & max_rows) const |
| 144 | { |
| 145 | /// Avoid Excessive copy when block is small enough |
| 146 | if (block.rows() <= max_rows) |
| 147 | return Blocks{std::move(block)}; |
| 148 | |
| 149 | const size_t splited_block_size = ceil(block.rows() * 1.0 / max_rows); |
| 150 | Blocks splitted_blocks(splited_block_size); |
| 151 | |
| 152 | for (size_t idx = 0; idx < splited_block_size; ++idx) |
| 153 | splitted_blocks[idx] = block.cloneEmpty(); |
| 154 | |
| 155 | const size_t columns = block.columns(); |
| 156 | const size_t rows = block.rows(); |
| 157 | size_t offsets = 0; |
| 158 | UInt64 limits = max_batch_rows; |
| 159 | for (size_t idx = 0; idx < splited_block_size; ++idx) |
| 160 | { |
| 161 | /// For last batch, limits should be the remain size |
| 162 | if (idx == splited_block_size - 1) limits = rows - offsets; |
| 163 | for (size_t col_idx = 0; col_idx < columns; ++col_idx) |
| 164 | { |
| 165 | splitted_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); |
| 166 | } |
| 167 | offsets += max_batch_rows; |
| 168 | } |
| 169 | |
| 170 | return splitted_blocks; |
| 171 | } |
| 172 | |
| 173 | std::string dumpNamesWithBackQuote(const Block & block) const |
| 174 | { |
| 175 | WriteBufferFromOwnString out; |
| 176 | for (auto it = block.begin(); it != block.end(); ++it) |
| 177 | { |
| 178 | if (it != block.begin()) |
| 179 | out << ", " ; |
| 180 | out << backQuoteMySQL(it->name); |
| 181 | } |
| 182 | return out.str(); |
| 183 | } |
| 184 | |
| 185 | |
| 186 | private: |
| 187 | const StorageMySQL & storage; |
| 188 | std::string remote_database_name; |
| 189 | std::string remote_table_name; |
| 190 | mysqlxx::PoolWithFailover::Entry entry; |
| 191 | size_t max_batch_rows; |
| 192 | }; |
| 193 | |
| 194 | |
| 195 | BlockOutputStreamPtr StorageMySQL::write( |
| 196 | const ASTPtr & /*query*/, const Context & context) |
| 197 | { |
| 198 | return std::make_shared<StorageMySQLBlockOutputStream>(*this, remote_database_name, remote_table_name, pool.Get(), context.getSettingsRef().mysql_max_rows_to_insert); |
| 199 | } |
| 200 | |
| 201 | void registerStorageMySQL(StorageFactory & factory) |
| 202 | { |
| 203 | factory.registerStorage("MySQL" , [](const StorageFactory::Arguments & args) |
| 204 | { |
| 205 | ASTs & engine_args = args.engine_args; |
| 206 | |
| 207 | if (engine_args.size() < 5 || engine_args.size() > 7) |
| 208 | throw Exception( |
| 209 | "Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause'])." , |
| 210 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 211 | |
| 212 | for (size_t i = 0; i < engine_args.size(); ++i) |
| 213 | engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context); |
| 214 | |
| 215 | /// 3306 is the default MySQL port. |
| 216 | auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 3306); |
| 217 | |
| 218 | const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
| 219 | const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
| 220 | const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(); |
| 221 | const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>(); |
| 222 | |
| 223 | mysqlxx::Pool pool(remote_database, parsed_host_port.first, username, password, parsed_host_port.second); |
| 224 | |
| 225 | bool replace_query = false; |
| 226 | std::string on_duplicate_clause; |
| 227 | if (engine_args.size() >= 6) |
| 228 | replace_query = engine_args[5]->as<ASTLiteral &>().value.safeGet<UInt64>(); |
| 229 | if (engine_args.size() == 7) |
| 230 | on_duplicate_clause = engine_args[6]->as<ASTLiteral &>().value.safeGet<String>(); |
| 231 | |
| 232 | if (replace_query && !on_duplicate_clause.empty()) |
| 233 | throw Exception( |
| 234 | "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them" , |
| 235 | ErrorCodes::BAD_ARGUMENTS); |
| 236 | |
| 237 | return StorageMySQL::create( |
| 238 | args.database_name, |
| 239 | args.table_name, |
| 240 | std::move(pool), |
| 241 | remote_database, |
| 242 | remote_table, |
| 243 | replace_query, |
| 244 | on_duplicate_clause, |
| 245 | args.columns, |
| 246 | args.constraints, |
| 247 | args.context); |
| 248 | }); |
| 249 | } |
| 250 | |
| 251 | } |
| 252 | |
| 253 | #endif |
| 254 | |