1#include <Storages/StorageDistributed.h>
2
3#include <DataStreams/OneBlockInputStream.h>
4
5#include <Databases/IDatabase.h>
6
7#include <DataTypes/DataTypeFactory.h>
8#include <DataTypes/DataTypesNumber.h>
9
10#include <Storages/Distributed/DirectoryMonitor.h>
11#include <Storages/Distributed/DistributedBlockOutputStream.h>
12#include <Storages/StorageFactory.h>
13#include <Storages/AlterCommands.h>
14
15#include <Common/Macros.h>
16#include <Common/escapeForFileName.h>
17#include <Common/typeid_cast.h>
18
19#include <Parsers/ASTDropQuery.h>
20#include <Parsers/ASTExpressionList.h>
21#include <Parsers/ASTIdentifier.h>
22#include <Parsers/ASTInsertQuery.h>
23#include <Parsers/ASTLiteral.h>
24#include <Parsers/ASTSelectQuery.h>
25#include <Parsers/ASTTablesInSelectQuery.h>
26#include <Parsers/ParserAlterQuery.h>
27#include <Parsers/TablePropertiesQueriesASTs.h>
28#include <Parsers/parseQuery.h>
29
30#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
31#include <Interpreters/ClusterProxy/executeQuery.h>
32#include <Interpreters/ExpressionAnalyzer.h>
33#include <Interpreters/InterpreterAlterQuery.h>
34#include <Interpreters/InterpreterDescribeQuery.h>
35#include <Interpreters/InterpreterSelectQuery.h>
36#include <Interpreters/TranslateQualifiedNamesVisitor.h>
37#include <Interpreters/SyntaxAnalyzer.h>
38#include <Interpreters/createBlockSelector.h>
39#include <Interpreters/evaluateConstantExpression.h>
40#include <Interpreters/getClusterName.h>
41
42#include <Core/Field.h>
43
44#include <IO/ReadHelpers.h>
45
46#include <Poco/DirectoryIterator.h>
47
48#include <memory>
49#include <filesystem>
50
51
52namespace DB
53{
54
55namespace ErrorCodes
56{
57 extern const int STORAGE_REQUIRES_PARAMETER;
58 extern const int BAD_ARGUMENTS;
59 extern const int READONLY;
60 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
61 extern const int INCORRECT_NUMBER_OF_COLUMNS;
62 extern const int INFINITE_LOOP;
63 extern const int TYPE_MISMATCH;
64 extern const int NO_SUCH_COLUMN_IN_TABLE;
65 extern const int TOO_MANY_ROWS;
66}
67
68namespace ActionLocks
69{
70 extern const StorageActionBlockType DistributedSend;
71}
72
73namespace
74{
75
76/// select query has database, table and table function names as AST pointers
77/// Creates a copy of query, changes database, table and table function names.
78ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table, ASTPtr table_function_ptr = nullptr)
79{
80 auto modified_query_ast = query->clone();
81
82 ASTSelectQuery & select_query = modified_query_ast->as<ASTSelectQuery &>();
83
84 /// restore long column names in JOIN ON expressions
85 if (auto tables = select_query.tables())
86 {
87 RestoreQualifiedNamesVisitor::Data data;
88 RestoreQualifiedNamesVisitor(data).visit(tables);
89 }
90
91 if (table_function_ptr)
92 select_query.addTableFunction(table_function_ptr);
93 else
94 select_query.replaceDatabaseAndTable(database, table);
95 return modified_query_ast;
96}
97
98/// The columns list in the original INSERT query is incorrect because inserted blocks are transformed
99/// to the form of the sample block of the Distributed table. So we rewrite it and add all columns from
100/// the sample block instead.
101ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block_non_materialized)
102{
103 auto query = std::make_shared<ASTInsertQuery>();
104 query->database = database;
105 query->table = table;
106
107 auto columns = std::make_shared<ASTExpressionList>();
108 query->columns = columns;
109 query->children.push_back(columns);
110 for (const auto & col : sample_block_non_materialized)
111 columns->children.push_back(std::make_shared<ASTIdentifier>(col.name));
112
113 return query;
114}
115
116/// Calculate maximum number in file names in directory and all subdirectories.
117/// To ensure global order of data blocks yet to be sent across server restarts.
118UInt64 getMaximumFileNumber(const std::string & dir_path)
119{
120 UInt64 res = 0;
121
122 std::filesystem::recursive_directory_iterator begin(dir_path);
123 std::filesystem::recursive_directory_iterator end;
124 for (auto it = begin; it != end; ++it)
125 {
126 const auto & file_path = it->path();
127
128 if (!std::filesystem::is_regular_file(*it) || !endsWith(file_path.filename().string(), ".bin"))
129 continue;
130
131 UInt64 num = 0;
132 try
133 {
134 num = parse<UInt64>(file_path.filename().stem().string());
135 }
136 catch (Exception & e)
137 {
138 e.addMessage("Unexpected file name " + file_path.filename().string() + " found at " + file_path.parent_path().string() + ", should have numeric base name.");
139 throw;
140 }
141
142 if (num > res)
143 res = num;
144 }
145
146 return res;
147}
148
149void initializeFileNamesIncrement(const std::string & path, SimpleIncrement & increment)
150{
151 if (!path.empty())
152 increment.set(getMaximumFileNumber(path));
153}
154
155/// the same as DistributedBlockOutputStream::createSelector, should it be static?
156IColumn::Selector createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
157{
158 const auto & slot_to_shard = cluster->getSlotToShard();
159
160#define CREATE_FOR_TYPE(TYPE) \
161 if (typeid_cast<const DataType##TYPE *>(result.type.get())) \
162 return createBlockSelector<TYPE>(*result.column, slot_to_shard);
163
164 CREATE_FOR_TYPE(UInt8)
165 CREATE_FOR_TYPE(UInt16)
166 CREATE_FOR_TYPE(UInt32)
167 CREATE_FOR_TYPE(UInt64)
168 CREATE_FOR_TYPE(Int8)
169 CREATE_FOR_TYPE(Int16)
170 CREATE_FOR_TYPE(Int32)
171 CREATE_FOR_TYPE(Int64)
172
173#undef CREATE_FOR_TYPE
174
175 throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
176}
177
178std::string makeFormattedListOfShards(const ClusterPtr & cluster)
179{
180 std::ostringstream os;
181
182 bool head = true;
183 os << "[";
184 for (const auto & shard_info : cluster->getShardsInfo())
185 {
186 (head ? os : os << ", ") << shard_info.shard_num;
187 head = false;
188 }
189 os << "]";
190
191 return os.str();
192}
193
194}
195
196
197/// For destruction of std::unique_ptr of type that is incomplete in class definition.
198StorageDistributed::~StorageDistributed() = default;
199
200static ExpressionActionsPtr buildShardingKeyExpression(const ASTPtr & sharding_key, const Context & context, NamesAndTypesList columns, bool project)
201{
202 ASTPtr query = sharding_key;
203 auto syntax_result = SyntaxAnalyzer(context).analyze(query, columns);
204 return ExpressionAnalyzer(query, syntax_result, context).getActions(project);
205}
206
207StorageDistributed::StorageDistributed(
208 const String & database_name_,
209 const String & table_name_,
210 const ColumnsDescription & columns_,
211 const ConstraintsDescription & constraints_,
212 const String & remote_database_,
213 const String & remote_table_,
214 const String & cluster_name_,
215 const Context & context_,
216 const ASTPtr & sharding_key_,
217 const String & relative_data_path_,
218 bool attach_)
219 : IStorage(ColumnsDescription({
220 {"_shard_num", std::make_shared<DataTypeUInt32>()},
221 }, true)),
222 table_name(table_name_), database_name(database_name_),
223 remote_database(remote_database_), remote_table(remote_table_),
224 global_context(context_), cluster_name(global_context.getMacros()->expand(cluster_name_)), has_sharding_key(sharding_key_),
225 path(relative_data_path_.empty() ? "" : (context_.getPath() + relative_data_path_))
226{
227 setColumns(columns_);
228 setConstraints(constraints_);
229
230 if (sharding_key_)
231 {
232 sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, getColumns().getAllPhysical(), false);
233 sharding_key_column_name = sharding_key_->getColumnName();
234 }
235
236 /// Sanity check. Skip check if the table is already created to allow the server to start.
237 if (!attach_ && !cluster_name.empty())
238 {
239 size_t num_local_shards = global_context.getCluster(cluster_name)->getLocalShardCount();
240 if (num_local_shards && remote_database == database_name && remote_table == table_name)
241 throw Exception("Distributed table " + table_name + " looks at itself", ErrorCodes::INFINITE_LOOP);
242 }
243}
244
245
246StorageDistributed::StorageDistributed(
247 const String & database_name_,
248 const String & table_name_,
249 const ColumnsDescription & columns_,
250 const ConstraintsDescription & constraints_,
251 ASTPtr remote_table_function_ptr_,
252 const String & cluster_name_,
253 const Context & context_,
254 const ASTPtr & sharding_key_,
255 const String & relative_data_path_,
256 bool attach)
257 : StorageDistributed(database_name_, table_name_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, relative_data_path_, attach)
258{
259 remote_table_function_ptr = remote_table_function_ptr_;
260}
261
262
263StoragePtr StorageDistributed::createWithOwnCluster(
264 const std::string & table_name_,
265 const ColumnsDescription & columns_,
266 const String & remote_database_, /// database on remote servers.
267 const String & remote_table_, /// The name of the table on the remote servers.
268 ClusterPtr owned_cluster_,
269 const Context & context_)
270{
271 auto res = create(String{}, table_name_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), false);
272 res->owned_cluster = owned_cluster_;
273 return res;
274}
275
276
277StoragePtr StorageDistributed::createWithOwnCluster(
278 const std::string & table_name_,
279 const ColumnsDescription & columns_,
280 ASTPtr & remote_table_function_ptr_,
281 ClusterPtr & owned_cluster_,
282 const Context & context_)
283{
284 auto res = create(String{}, table_name_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), false);
285 res->owned_cluster = owned_cluster_;
286 return res;
287}
288
289QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context) const
290{
291 auto cluster = getCluster();
292 return getQueryProcessingStage(context, cluster);
293}
294
295QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, const ClusterPtr & cluster) const
296{
297 const Settings & settings = context.getSettingsRef();
298
299 size_t num_local_shards = cluster->getLocalShardCount();
300 size_t num_remote_shards = cluster->getRemoteShardCount();
301 size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards;
302
303 if (settings.distributed_group_by_no_merge)
304 return QueryProcessingStage::Complete;
305 else /// Normal mode.
306 return result_size == 1 ? QueryProcessingStage::Complete
307 : QueryProcessingStage::WithMergeableState;
308}
309
310BlockInputStreams StorageDistributed::read(
311 const Names & column_names,
312 const SelectQueryInfo & query_info,
313 const Context & context,
314 QueryProcessingStage::Enum processed_stage,
315 const size_t /*max_block_size*/,
316 const unsigned /*num_streams*/)
317{
318 auto cluster = getCluster();
319
320 const Settings & settings = context.getSettingsRef();
321
322 const auto & modified_query_ast = rewriteSelectQuery(
323 query_info.query, remote_database, remote_table, remote_table_function_ptr);
324
325 Block header =
326 InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock();
327
328 const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
329
330 bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
331 if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num"))
332 has_virtual_shard_num_column = false;
333
334 ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
335 ? ClusterProxy::SelectStreamFactory(
336 header, processed_stage, remote_table_function_ptr, scalars, has_virtual_shard_num_column, context.getExternalTables())
337 : ClusterProxy::SelectStreamFactory(
338 header, processed_stage, QualifiedTableName{remote_database, remote_table}, scalars, has_virtual_shard_num_column, context.getExternalTables());
339
340 if (settings.optimize_skip_unused_shards)
341 {
342 if (has_sharding_key)
343 {
344 auto smaller_cluster = skipUnusedShards(cluster, query_info);
345
346 if (smaller_cluster)
347 {
348 cluster = smaller_cluster;
349 LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": "
350 "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): "
351 " " << makeFormattedListOfShards(cluster));
352 }
353 else
354 {
355 LOG_DEBUG(log, "Reading from " << database_name << "." << table_name << ": "
356 "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster");
357 }
358 }
359 }
360
361 return ClusterProxy::executeQuery(
362 select_stream_factory, cluster, modified_query_ast, context, settings);
363}
364
365
366BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & context)
367{
368 auto cluster = getCluster();
369 const auto & settings = context.getSettingsRef();
370
371 /// Ban an attempt to make async insert into the table belonging to DatabaseMemory
372 if (path.empty() && !owned_cluster && !settings.insert_distributed_sync)
373 {
374 throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
375 ErrorCodes::BAD_ARGUMENTS);
376 }
377
378 /// If sharding key is not specified, then you can only write to a shard containing only one shard
379 if (!has_sharding_key && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
380 {
381 throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
382 ErrorCodes::STORAGE_REQUIRES_PARAMETER);
383 }
384
385 /// Force sync insertion if it is remote() table function
386 bool insert_sync = settings.insert_distributed_sync || owned_cluster;
387 auto timeout = settings.insert_distributed_timeout;
388
389 /// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
390 return std::make_shared<DistributedBlockOutputStream>(
391 context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlockNonMaterialized()), cluster,
392 insert_sync, timeout);
393}
394
395
396void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
397{
398 for (const auto & command : commands)
399 {
400 if (command.type != AlterCommand::Type::ADD_COLUMN
401 && command.type != AlterCommand::Type::MODIFY_COLUMN
402 && command.type != AlterCommand::Type::DROP_COLUMN
403 && command.type != AlterCommand::Type::COMMENT_COLUMN)
404
405 throw Exception("Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
406 ErrorCodes::NOT_IMPLEMENTED);
407 }
408}
409
410void StorageDistributed::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
411{
412 lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
413
414 const String current_database_name = getDatabaseName();
415 const String current_table_name = getTableName();
416
417 StorageInMemoryMetadata metadata = getInMemoryMetadata();
418 params.apply(metadata);
419 context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
420 setColumns(std::move(metadata.columns));
421}
422
423
424void StorageDistributed::startup()
425{
426 createDirectoryMonitors();
427 initializeFileNamesIncrement(path, file_names_increment);
428}
429
430
431void StorageDistributed::shutdown()
432{
433 cluster_nodes_data.clear();
434}
435
436
437void StorageDistributed::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
438{
439 std::lock_guard lock(cluster_nodes_mutex);
440
441 for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
442 {
443 it->second.shutdownAndDropAllData();
444 it = cluster_nodes_data.erase(it);
445 }
446}
447
448
449namespace
450{
451 /// NOTE This is weird. Get rid of this.
452 std::map<String, String> virtual_columns =
453 {
454 {"_table", "String"},
455 {"_part", "String"},
456 {"_part_index", "UInt64"},
457 {"_partition_id", "String"},
458 {"_sample_factor", "Float64"},
459 };
460}
461
462
463NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
464{
465 if (getColumns().hasPhysical(column_name))
466 return getColumns().getPhysical(column_name);
467
468 auto it = virtual_columns.find(column_name);
469 if (it != virtual_columns.end())
470 return { it->first, DataTypeFactory::instance().get(it->second) };
471
472 throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
473}
474
475
476bool StorageDistributed::hasColumn(const String & column_name) const
477{
478 return virtual_columns.count(column_name) || getColumns().hasPhysical(column_name);
479}
480
481void StorageDistributed::createDirectoryMonitors()
482{
483 if (path.empty())
484 return;
485
486 Poco::File{path}.createDirectories();
487
488 std::filesystem::directory_iterator begin(path);
489 std::filesystem::directory_iterator end;
490 for (auto it = begin; it != end; ++it)
491 if (std::filesystem::is_directory(*it))
492 requireDirectoryMonitor(it->path().filename().string());
493}
494
495
496void StorageDistributed::requireDirectoryMonitor(const std::string & name)
497{
498 std::lock_guard lock(cluster_nodes_mutex);
499 cluster_nodes_data[name].requireDirectoryMonitor(name, *this, monitors_blocker);
500}
501
502ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
503{
504 std::lock_guard lock(cluster_nodes_mutex);
505 auto & node_data = cluster_nodes_data[name];
506 node_data.requireConnectionPool(name, *this);
507 return node_data.conneciton_pool;
508}
509
510size_t StorageDistributed::getShardCount() const
511{
512 return getCluster()->getShardCount();
513}
514
515ClusterPtr StorageDistributed::getCluster() const
516{
517 return owned_cluster ? owned_cluster : global_context.getCluster(cluster_name);
518}
519
520void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::string & name, const StorageDistributed & storage)
521{
522 if (!conneciton_pool)
523 conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage);
524}
525
526void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(
527 const std::string & name, StorageDistributed & storage, ActionBlocker & monitor_blocker)
528{
529 requireConnectionPool(name, storage);
530 if (!directory_monitor)
531 directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool, monitor_blocker);
532}
533
534void StorageDistributed::ClusterNodeData::flushAllData()
535{
536 directory_monitor->flushAllData();
537}
538
539void StorageDistributed::ClusterNodeData::shutdownAndDropAllData()
540{
541 directory_monitor->shutdownAndDropAllData();
542}
543
544/// Returns a new cluster with fewer shards if constant folding for `sharding_key_expr` is possible
545/// using constraints from "PREWHERE" and "WHERE" conditions, otherwise returns `nullptr`
546ClusterPtr StorageDistributed::skipUnusedShards(ClusterPtr cluster, const SelectQueryInfo & query_info)
547{
548 if (!has_sharding_key)
549 {
550 throw Exception("Internal error: cannot determine shards of a distributed table if no sharding expression is supplied", ErrorCodes::LOGICAL_ERROR);
551 }
552
553 const auto & select = query_info.query->as<ASTSelectQuery &>();
554
555 if (!select.prewhere() && !select.where())
556 {
557 return nullptr;
558 }
559
560 ASTPtr condition_ast;
561 if (select.prewhere() && select.where())
562 {
563 condition_ast = makeASTFunction("and", select.prewhere()->clone(), select.where()->clone());
564 }
565 else
566 {
567 condition_ast = select.prewhere() ? select.prewhere()->clone() : select.where()->clone();
568 }
569
570 const auto blocks = evaluateExpressionOverConstantCondition(condition_ast, sharding_key_expr);
571
572 // Can't get definite answer if we can skip any shards
573 if (!blocks)
574 {
575 return nullptr;
576 }
577
578 std::set<int> shards;
579
580 for (const auto & block : *blocks)
581 {
582 if (!block.has(sharding_key_column_name))
583 throw Exception("sharding_key_expr should evaluate as a single row", ErrorCodes::TOO_MANY_ROWS);
584
585 const auto result = block.getByName(sharding_key_column_name);
586 const auto selector = createSelector(cluster, result);
587
588 shards.insert(selector.begin(), selector.end());
589 }
590
591 return cluster->getClusterWithMultipleShards({shards.begin(), shards.end()});
592}
593
594ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
595{
596 if (type == ActionLocks::DistributedSend)
597 return monitors_blocker.cancel();
598 return {};
599}
600
601void StorageDistributed::flushClusterNodesAllData()
602{
603 std::lock_guard lock(cluster_nodes_mutex);
604
605 /// TODO: Maybe it should be executed in parallel
606 for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end(); ++it)
607 it->second.flushAllData();
608}
609
610void StorageDistributed::rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name,
611 TableStructureWriteLockHolder &)
612{
613 table_name = new_table_name;
614 database_name = new_database_name;
615 if (!path.empty())
616 {
617 auto new_path = global_context.getPath() + new_path_to_table_data;
618 Poco::File(path).renameTo(new_path);
619 path = new_path;
620 std::lock_guard lock(cluster_nodes_mutex);
621 for (auto & node : cluster_nodes_data)
622 node.second.directory_monitor->updatePath();
623 }
624}
625
626
627void registerStorageDistributed(StorageFactory & factory)
628{
629 factory.registerStorage("Distributed", [](const StorageFactory::Arguments & args)
630 {
631 /** Arguments of engine is following:
632 * - name of cluster in configuration;
633 * - name of remote database;
634 * - name of remote table;
635 *
636 * Remote database may be specified in following form:
637 * - identifier;
638 * - constant expression with string result, like currentDatabase();
639 * -- string literal as specific case;
640 * - empty string means 'use default database from cluster'.
641 */
642
643 ASTs & engine_args = args.engine_args;
644
645 if (!(engine_args.size() == 3 || engine_args.size() == 4))
646 throw Exception("Storage Distributed requires 3 or 4 parameters"
647 " - name of configuration section with list of remote servers, name of remote database, name of remote table,"
648 " sharding key expression (optional).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
649
650 String cluster_name = getClusterName(*engine_args[0]);
651
652 engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context);
653 engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
654
655 String remote_database = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
656 String remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
657
658 const auto & sharding_key = engine_args.size() == 4 ? engine_args[3] : nullptr;
659
660 /// Check that sharding_key exists in the table and has numeric type.
661 if (sharding_key)
662 {
663 auto sharding_expr = buildShardingKeyExpression(sharding_key, args.context, args.columns.getAllPhysical(), true);
664 const Block & block = sharding_expr->getSampleBlock();
665
666 if (block.columns() != 1)
667 throw Exception("Sharding expression must return exactly one column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
668
669 auto type = block.getByPosition(0).type;
670
671 if (!type->isValueRepresentedByInteger())
672 throw Exception("Sharding expression has type " + type->getName() +
673 ", but should be one of integer type", ErrorCodes::TYPE_MISMATCH);
674 }
675
676 return StorageDistributed::create(
677 args.database_name, args.table_name, args.columns, args.constraints,
678 remote_database, remote_table, cluster_name,
679 args.context, sharding_key, args.relative_data_path,
680 args.attach);
681 });
682}
683
684}
685