1 | #include "getStructureOfRemoteTable.h" |
2 | #include <Interpreters/Cluster.h> |
3 | #include <Interpreters/Context.h> |
4 | #include <Interpreters/ClusterProxy/executeQuery.h> |
5 | #include <Interpreters/InterpreterDescribeQuery.h> |
6 | #include <DataStreams/RemoteBlockInputStream.h> |
7 | #include <DataTypes/DataTypeFactory.h> |
8 | #include <DataTypes/DataTypeString.h> |
9 | #include <Columns/ColumnString.h> |
10 | #include <Storages/IStorage.h> |
11 | #include <Parsers/ExpressionListParsers.h> |
12 | #include <Parsers/parseQuery.h> |
13 | #include <Parsers/ASTFunction.h> |
14 | #include <Common/quoteString.h> |
15 | #include <TableFunctions/TableFunctionFactory.h> |
16 | |
17 | |
18 | namespace DB |
19 | { |
20 | |
21 | namespace ErrorCodes |
22 | { |
23 | extern const int NO_REMOTE_SHARD_FOUND; |
24 | } |
25 | |
26 | |
27 | ColumnsDescription getStructureOfRemoteTable( |
28 | const Cluster & cluster, |
29 | const std::string & database, |
30 | const std::string & table, |
31 | const Context & context, |
32 | const ASTPtr & table_func_ptr) |
33 | { |
34 | /// Send to the first any remote shard. |
35 | const auto & shard_info = cluster.getAnyShardInfo(); |
36 | |
37 | String query; |
38 | |
39 | if (table_func_ptr) |
40 | { |
41 | if (shard_info.isLocal()) |
42 | { |
43 | const auto * table_function = table_func_ptr->as<ASTFunction>(); |
44 | TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context); |
45 | return table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName())->getColumns(); |
46 | } |
47 | |
48 | auto table_func_name = queryToString(table_func_ptr); |
49 | query = "DESC TABLE " + table_func_name; |
50 | } |
51 | else |
52 | { |
53 | if (shard_info.isLocal()) |
54 | return context.getTable(database, table)->getColumns(); |
55 | |
56 | /// Request for a table description |
57 | query = "DESC TABLE " + backQuoteIfNeed(database) + "." + backQuoteIfNeed(table); |
58 | } |
59 | |
60 | ColumnsDescription res; |
61 | |
62 | auto new_context = ClusterProxy::removeUserRestrictionsFromSettings(context, context.getSettingsRef()); |
63 | |
64 | /// Expect only needed columns from the result of DESC TABLE. NOTE 'comment' column is ignored for compatibility reasons. |
65 | Block sample_block |
66 | { |
67 | { ColumnString::create(), std::make_shared<DataTypeString>(), "name" }, |
68 | { ColumnString::create(), std::make_shared<DataTypeString>(), "type" }, |
69 | { ColumnString::create(), std::make_shared<DataTypeString>(), "default_type" }, |
70 | { ColumnString::create(), std::make_shared<DataTypeString>(), "default_expression" }, |
71 | }; |
72 | |
73 | /// Execute remote query without restrictions (because it's not real user query, but part of implementation) |
74 | auto input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, sample_block, new_context); |
75 | input->setPoolMode(PoolMode::GET_ONE); |
76 | if (!table_func_ptr) |
77 | input->setMainTable(QualifiedTableName{database, table}); |
78 | input->readPrefix(); |
79 | |
80 | const DataTypeFactory & data_type_factory = DataTypeFactory::instance(); |
81 | |
82 | ParserExpression expr_parser; |
83 | |
84 | while (Block current = input->read()) |
85 | { |
86 | ColumnPtr name = current.getByName("name" ).column; |
87 | ColumnPtr type = current.getByName("type" ).column; |
88 | ColumnPtr default_kind = current.getByName("default_type" ).column; |
89 | ColumnPtr default_expr = current.getByName("default_expression" ).column; |
90 | size_t size = name->size(); |
91 | |
92 | for (size_t i = 0; i < size; ++i) |
93 | { |
94 | ColumnDescription column; |
95 | |
96 | column.name = (*name)[i].get<const String &>(); |
97 | |
98 | String data_type_name = (*type)[i].get<const String &>(); |
99 | column.type = data_type_factory.get(data_type_name); |
100 | |
101 | String kind_name = (*default_kind)[i].get<const String &>(); |
102 | if (!kind_name.empty()) |
103 | { |
104 | column.default_desc.kind = columnDefaultKindFromString(kind_name); |
105 | String expr_str = (*default_expr)[i].get<const String &>(); |
106 | column.default_desc.expression = parseQuery( |
107 | expr_parser, expr_str.data(), expr_str.data() + expr_str.size(), "default expression" , 0); |
108 | } |
109 | |
110 | res.add(column); |
111 | } |
112 | } |
113 | |
114 | return res; |
115 | } |
116 | |
117 | } |
118 | |