| 1 | #include "StorageMySQL.h" | 
|---|
| 2 |  | 
|---|
| 3 | #if USE_MYSQL | 
|---|
| 4 |  | 
|---|
| 5 | #include <Storages/StorageFactory.h> | 
|---|
| 6 | #include <Storages/transformQueryForExternalDatabase.h> | 
|---|
| 7 | #include <Formats/MySQLBlockInputStream.h> | 
|---|
| 8 | #include <Interpreters/evaluateConstantExpression.h> | 
|---|
| 9 | #include <Core/Settings.h> | 
|---|
| 10 | #include <Interpreters/Context.h> | 
|---|
| 11 | #include <DataStreams/IBlockOutputStream.h> | 
|---|
| 12 | #include <Formats/FormatFactory.h> | 
|---|
| 13 | #include <Common/parseAddress.h> | 
|---|
| 14 | #include <IO/Operators.h> | 
|---|
| 15 | #include <IO/WriteHelpers.h> | 
|---|
| 16 | #include <Parsers/ASTLiteral.h> | 
|---|
| 17 | #include <mysqlxx/Transaction.h> | 
|---|
| 18 |  | 
|---|
| 19 |  | 
|---|
| 20 | namespace DB | 
|---|
| 21 | { | 
|---|
| 22 |  | 
|---|
| 23 | namespace ErrorCodes | 
|---|
| 24 | { | 
|---|
| 25 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; | 
|---|
| 26 | extern const int BAD_ARGUMENTS; | 
|---|
| 27 | } | 
|---|
| 28 |  | 
|---|
| 29 | static String backQuoteMySQL(const String & x) | 
|---|
| 30 | { | 
|---|
| 31 | String res(x.size(), '\0'); | 
|---|
| 32 | { | 
|---|
| 33 | WriteBufferFromString wb(res); | 
|---|
| 34 | writeBackQuotedStringMySQL(x, wb); | 
|---|
| 35 | } | 
|---|
| 36 | return res; | 
|---|
| 37 | } | 
|---|
| 38 |  | 
|---|
| 39 | StorageMySQL::StorageMySQL( | 
|---|
| 40 | const std::string & database_name_, | 
|---|
| 41 | const std::string & table_name_, | 
|---|
| 42 | mysqlxx::Pool && pool_, | 
|---|
| 43 | const std::string & remote_database_name_, | 
|---|
| 44 | const std::string & remote_table_name_, | 
|---|
| 45 | const bool replace_query_, | 
|---|
| 46 | const std::string & on_duplicate_clause_, | 
|---|
| 47 | const ColumnsDescription & columns_, | 
|---|
| 48 | const ConstraintsDescription & constraints_, | 
|---|
| 49 | const Context & context_) | 
|---|
| 50 | : table_name(table_name_) | 
|---|
| 51 | , database_name(database_name_) | 
|---|
| 52 | , remote_database_name(remote_database_name_) | 
|---|
| 53 | , remote_table_name(remote_table_name_) | 
|---|
| 54 | , replace_query{replace_query_} | 
|---|
| 55 | , on_duplicate_clause{on_duplicate_clause_} | 
|---|
| 56 | , pool(std::move(pool_)) | 
|---|
| 57 | , global_context(context_) | 
|---|
| 58 | { | 
|---|
| 59 | setColumns(columns_); | 
|---|
| 60 | setConstraints(constraints_); | 
|---|
| 61 | } | 
|---|
| 62 |  | 
|---|
| 63 |  | 
|---|
| 64 | BlockInputStreams StorageMySQL::read( | 
|---|
| 65 | const Names & column_names_, | 
|---|
| 66 | const SelectQueryInfo & query_info_, | 
|---|
| 67 | const Context & context_, | 
|---|
| 68 | QueryProcessingStage::Enum /*processed_stage*/, | 
|---|
| 69 | size_t max_block_size_, | 
|---|
| 70 | unsigned) | 
|---|
| 71 | { | 
|---|
| 72 | check(column_names_); | 
|---|
| 73 | String query = transformQueryForExternalDatabase( | 
|---|
| 74 | *query_info_.query, getColumns().getOrdinary(), IdentifierQuotingStyle::BackticksMySQL, remote_database_name, remote_table_name, context_); | 
|---|
| 75 |  | 
|---|
| 76 | Block sample_block; | 
|---|
| 77 | for (const String & column_name : column_names_) | 
|---|
| 78 | { | 
|---|
| 79 | auto column_data = getColumn(column_name); | 
|---|
| 80 | sample_block.insert({ column_data.type, column_data.name }); | 
|---|
| 81 | } | 
|---|
| 82 |  | 
|---|
| 83 | return { std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size_) }; | 
|---|
| 84 | } | 
|---|
| 85 |  | 
|---|
| 86 |  | 
|---|
| 87 | class StorageMySQLBlockOutputStream : public IBlockOutputStream | 
|---|
| 88 | { | 
|---|
| 89 | public: | 
|---|
| 90 | explicit StorageMySQLBlockOutputStream(const StorageMySQL & storage_, | 
|---|
| 91 | const std::string & remote_database_name_, | 
|---|
| 92 | const std::string & remote_table_name_, | 
|---|
| 93 | const mysqlxx::PoolWithFailover::Entry & entry_, | 
|---|
| 94 | const size_t & mysql_max_rows_to_insert) | 
|---|
| 95 | : storage{storage_} | 
|---|
| 96 | , remote_database_name{remote_database_name_} | 
|---|
| 97 | , remote_table_name{remote_table_name_} | 
|---|
| 98 | , entry{entry_} | 
|---|
| 99 | , max_batch_rows{mysql_max_rows_to_insert} | 
|---|
| 100 | { | 
|---|
| 101 | } | 
|---|
| 102 |  | 
|---|
| 103 | Block () const override { return storage.getSampleBlock(); } | 
|---|
| 104 |  | 
|---|
| 105 | void write(const Block & block) override | 
|---|
| 106 | { | 
|---|
| 107 | auto blocks = splitBlocks(block, max_batch_rows); | 
|---|
| 108 | mysqlxx::Transaction trans(entry); | 
|---|
| 109 | try | 
|---|
| 110 | { | 
|---|
| 111 | for (const Block & batch_data : blocks) | 
|---|
| 112 | { | 
|---|
| 113 | writeBlockData(batch_data); | 
|---|
| 114 | } | 
|---|
| 115 | trans.commit(); | 
|---|
| 116 | } | 
|---|
| 117 | catch (...) | 
|---|
| 118 | { | 
|---|
| 119 | trans.rollback(); | 
|---|
| 120 | throw; | 
|---|
| 121 | } | 
|---|
| 122 | } | 
|---|
| 123 |  | 
|---|
| 124 | void writeBlockData(const Block & block) | 
|---|
| 125 | { | 
|---|
| 126 | WriteBufferFromOwnString sqlbuf; | 
|---|
| 127 | sqlbuf << (storage.replace_query ? "REPLACE": "INSERT") << " INTO "; | 
|---|
| 128 | sqlbuf << backQuoteMySQL(remote_database_name) << "."<< backQuoteMySQL(remote_table_name); | 
|---|
| 129 | sqlbuf << " ("<< dumpNamesWithBackQuote(block) << ") VALUES "; | 
|---|
| 130 |  | 
|---|
| 131 | auto writer = FormatFactory::instance().getOutput( "Values", sqlbuf, storage.getSampleBlock(), storage.global_context); | 
|---|
| 132 | writer->write(block); | 
|---|
| 133 |  | 
|---|
| 134 | if (!storage.on_duplicate_clause.empty()) | 
|---|
| 135 | sqlbuf << " ON DUPLICATE KEY "<< storage.on_duplicate_clause; | 
|---|
| 136 |  | 
|---|
| 137 | sqlbuf << ";"; | 
|---|
| 138 |  | 
|---|
| 139 | auto query = this->entry->query(sqlbuf.str()); | 
|---|
| 140 | query.execute(); | 
|---|
| 141 | } | 
|---|
| 142 |  | 
|---|
| 143 | Blocks splitBlocks(const Block & block, const size_t & max_rows) const | 
|---|
| 144 | { | 
|---|
| 145 | /// Avoid Excessive copy when block is small enough | 
|---|
| 146 | if (block.rows() <= max_rows) | 
|---|
| 147 | return Blocks{std::move(block)}; | 
|---|
| 148 |  | 
|---|
| 149 | const size_t splited_block_size = ceil(block.rows() * 1.0 / max_rows); | 
|---|
| 150 | Blocks splitted_blocks(splited_block_size); | 
|---|
| 151 |  | 
|---|
| 152 | for (size_t idx = 0; idx < splited_block_size; ++idx) | 
|---|
| 153 | splitted_blocks[idx] = block.cloneEmpty(); | 
|---|
| 154 |  | 
|---|
| 155 | const size_t columns = block.columns(); | 
|---|
| 156 | const size_t rows = block.rows(); | 
|---|
| 157 | size_t offsets = 0; | 
|---|
| 158 | UInt64 limits = max_batch_rows; | 
|---|
| 159 | for (size_t idx = 0; idx < splited_block_size; ++idx) | 
|---|
| 160 | { | 
|---|
| 161 | /// For last batch, limits should be the remain size | 
|---|
| 162 | if (idx == splited_block_size - 1) limits = rows - offsets; | 
|---|
| 163 | for (size_t col_idx = 0; col_idx < columns; ++col_idx) | 
|---|
| 164 | { | 
|---|
| 165 | splitted_blocks[idx].getByPosition(col_idx).column = block.getByPosition(col_idx).column->cut(offsets, limits); | 
|---|
| 166 | } | 
|---|
| 167 | offsets += max_batch_rows; | 
|---|
| 168 | } | 
|---|
| 169 |  | 
|---|
| 170 | return splitted_blocks; | 
|---|
| 171 | } | 
|---|
| 172 |  | 
|---|
| 173 | std::string dumpNamesWithBackQuote(const Block & block) const | 
|---|
| 174 | { | 
|---|
| 175 | WriteBufferFromOwnString out; | 
|---|
| 176 | for (auto it = block.begin(); it != block.end(); ++it) | 
|---|
| 177 | { | 
|---|
| 178 | if (it != block.begin()) | 
|---|
| 179 | out << ", "; | 
|---|
| 180 | out << backQuoteMySQL(it->name); | 
|---|
| 181 | } | 
|---|
| 182 | return out.str(); | 
|---|
| 183 | } | 
|---|
| 184 |  | 
|---|
| 185 |  | 
|---|
| 186 | private: | 
|---|
| 187 | const StorageMySQL & storage; | 
|---|
| 188 | std::string remote_database_name; | 
|---|
| 189 | std::string remote_table_name; | 
|---|
| 190 | mysqlxx::PoolWithFailover::Entry entry; | 
|---|
| 191 | size_t max_batch_rows; | 
|---|
| 192 | }; | 
|---|
| 193 |  | 
|---|
| 194 |  | 
|---|
| 195 | BlockOutputStreamPtr StorageMySQL::write( | 
|---|
| 196 | const ASTPtr & /*query*/, const Context & context) | 
|---|
| 197 | { | 
|---|
| 198 | return std::make_shared<StorageMySQLBlockOutputStream>(*this, remote_database_name, remote_table_name, pool.Get(), context.getSettingsRef().mysql_max_rows_to_insert); | 
|---|
| 199 | } | 
|---|
| 200 |  | 
|---|
| 201 | void registerStorageMySQL(StorageFactory & factory) | 
|---|
| 202 | { | 
|---|
| 203 | factory.registerStorage( "MySQL", [](const StorageFactory::Arguments & args) | 
|---|
| 204 | { | 
|---|
| 205 | ASTs & engine_args = args.engine_args; | 
|---|
| 206 |  | 
|---|
| 207 | if (engine_args.size() < 5 || engine_args.size() > 7) | 
|---|
| 208 | throw Exception( | 
|---|
| 209 | "Storage MySQL requires 5-7 parameters: MySQL('host:port', database, table, 'user', 'password'[, replace_query, 'on_duplicate_clause']).", | 
|---|
| 210 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); | 
|---|
| 211 |  | 
|---|
| 212 | for (size_t i = 0; i < engine_args.size(); ++i) | 
|---|
| 213 | engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context); | 
|---|
| 214 |  | 
|---|
| 215 | /// 3306 is the default MySQL port. | 
|---|
| 216 | auto parsed_host_port = parseAddress(engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(), 3306); | 
|---|
| 217 |  | 
|---|
| 218 | const String & remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); | 
|---|
| 219 | const String & remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); | 
|---|
| 220 | const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>(); | 
|---|
| 221 | const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>(); | 
|---|
| 222 |  | 
|---|
| 223 | mysqlxx::Pool pool(remote_database, parsed_host_port.first, username, password, parsed_host_port.second); | 
|---|
| 224 |  | 
|---|
| 225 | bool replace_query = false; | 
|---|
| 226 | std::string on_duplicate_clause; | 
|---|
| 227 | if (engine_args.size() >= 6) | 
|---|
| 228 | replace_query = engine_args[5]->as<ASTLiteral &>().value.safeGet<UInt64>(); | 
|---|
| 229 | if (engine_args.size() == 7) | 
|---|
| 230 | on_duplicate_clause = engine_args[6]->as<ASTLiteral &>().value.safeGet<String>(); | 
|---|
| 231 |  | 
|---|
| 232 | if (replace_query && !on_duplicate_clause.empty()) | 
|---|
| 233 | throw Exception( | 
|---|
| 234 | "Only one of 'replace_query' and 'on_duplicate_clause' can be specified, or none of them", | 
|---|
| 235 | ErrorCodes::BAD_ARGUMENTS); | 
|---|
| 236 |  | 
|---|
| 237 | return StorageMySQL::create( | 
|---|
| 238 | args.database_name, | 
|---|
| 239 | args.table_name, | 
|---|
| 240 | std::move(pool), | 
|---|
| 241 | remote_database, | 
|---|
| 242 | remote_table, | 
|---|
| 243 | replace_query, | 
|---|
| 244 | on_duplicate_clause, | 
|---|
| 245 | args.columns, | 
|---|
| 246 | args.constraints, | 
|---|
| 247 | args.context); | 
|---|
| 248 | }); | 
|---|
| 249 | } | 
|---|
| 250 |  | 
|---|
| 251 | } | 
|---|
| 252 |  | 
|---|
| 253 | #endif | 
|---|
| 254 |  | 
|---|