| 1 | #include <Storages/StorageFactory.h> |
| 2 | #include <Storages/StorageMergeTree.h> |
| 3 | #include <Storages/StorageReplicatedMergeTree.h> |
| 4 | #include <Storages/MergeTree/MergeTreeIndices.h> |
| 5 | #include <Storages/MergeTree/MergeTreeIndexMinMax.h> |
| 6 | #include <Storages/MergeTree/MergeTreeIndexSet.h> |
| 7 | |
| 8 | #include <Common/typeid_cast.h> |
| 9 | #include <Common/OptimizedRegularExpression.h> |
| 10 | |
| 11 | #include <Parsers/ASTFunction.h> |
| 12 | #include <Parsers/ASTIdentifier.h> |
| 13 | #include <Parsers/ASTExpressionList.h> |
| 14 | #include <Parsers/ASTCreateQuery.h> |
| 15 | #include <Parsers/ASTSetQuery.h> |
| 16 | |
| 17 | #include <AggregateFunctions/AggregateFunctionFactory.h> |
| 18 | #include <AggregateFunctions/parseAggregateFunctionParameters.h> |
| 19 | |
| 20 | |
| 21 | namespace DB |
| 22 | { |
| 23 | |
| 24 | namespace ErrorCodes |
| 25 | { |
| 26 | extern const int BAD_ARGUMENTS; |
| 27 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
| 28 | extern const int UNKNOWN_ELEMENT_IN_CONFIG; |
| 29 | extern const int NO_ELEMENTS_IN_CONFIG; |
| 30 | extern const int UNKNOWN_STORAGE; |
| 31 | extern const int NO_REPLICA_NAME_GIVEN; |
| 32 | } |
| 33 | |
| 34 | |
| 35 | /** Get the list of column names. |
| 36 | * It can be specified in the tuple: (Clicks, Cost), |
| 37 | * or as one column: Clicks. |
| 38 | */ |
| 39 | static Names extractColumnNames(const ASTPtr & node) |
| 40 | { |
| 41 | const auto * expr_func = node->as<ASTFunction>(); |
| 42 | |
| 43 | if (expr_func && expr_func->name == "tuple" ) |
| 44 | { |
| 45 | const auto & elements = expr_func->children.at(0)->children; |
| 46 | Names res; |
| 47 | res.reserve(elements.size()); |
| 48 | for (const auto & elem : elements) |
| 49 | res.push_back(getIdentifierName(elem)); |
| 50 | |
| 51 | return res; |
| 52 | } |
| 53 | else |
| 54 | { |
| 55 | return { getIdentifierName(node) }; |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | /** Read the settings for Graphite rollup from config. |
| 60 | * Example |
| 61 | * |
| 62 | * <graphite_rollup> |
| 63 | * <path_column_name>Path</path_column_name> |
| 64 | * <pattern> |
| 65 | * <regexp>click_cost</regexp> |
| 66 | * <function>any</function> |
| 67 | * <retention> |
| 68 | * <age>0</age> |
| 69 | * <precision>3600</precision> |
| 70 | * </retention> |
| 71 | * <retention> |
| 72 | * <age>86400</age> |
| 73 | * <precision>60</precision> |
| 74 | * </retention> |
| 75 | * </pattern> |
| 76 | * <default> |
| 77 | * <function>max</function> |
| 78 | * <retention> |
| 79 | * <age>0</age> |
| 80 | * <precision>60</precision> |
| 81 | * </retention> |
| 82 | * <retention> |
| 83 | * <age>3600</age> |
| 84 | * <precision>300</precision> |
| 85 | * </retention> |
| 86 | * <retention> |
| 87 | * <age>86400</age> |
| 88 | * <precision>3600</precision> |
| 89 | * </retention> |
| 90 | * </default> |
| 91 | * </graphite_rollup> |
| 92 | */ |
| 93 | static void appendGraphitePattern( |
| 94 | const Poco::Util::AbstractConfiguration & config, const String & config_element, Graphite::Patterns & patterns) |
| 95 | { |
| 96 | Graphite::Pattern pattern; |
| 97 | |
| 98 | Poco::Util::AbstractConfiguration::Keys keys; |
| 99 | config.keys(config_element, keys); |
| 100 | |
| 101 | for (const auto & key : keys) |
| 102 | { |
| 103 | if (key == "regexp" ) |
| 104 | { |
| 105 | pattern.regexp_str = config.getString(config_element + ".regexp" ); |
| 106 | pattern.regexp = std::make_shared<OptimizedRegularExpression>(pattern.regexp_str); |
| 107 | } |
| 108 | else if (key == "function" ) |
| 109 | { |
| 110 | String aggregate_function_name_with_params = config.getString(config_element + ".function" ); |
| 111 | String aggregate_function_name; |
| 112 | Array params_row; |
| 113 | getAggregateFunctionNameAndParametersArray(aggregate_function_name_with_params, |
| 114 | aggregate_function_name, params_row, "GraphiteMergeTree storage initialization" ); |
| 115 | |
| 116 | /// TODO Not only Float64 |
| 117 | pattern.function = AggregateFunctionFactory::instance().get(aggregate_function_name, {std::make_shared<DataTypeFloat64>()}, |
| 118 | params_row); |
| 119 | } |
| 120 | else if (startsWith(key, "retention" )) |
| 121 | { |
| 122 | pattern.retentions.emplace_back( |
| 123 | Graphite::Retention{ |
| 124 | .age = config.getUInt(config_element + "." + key + ".age" ), |
| 125 | .precision = config.getUInt(config_element + "." + key + ".precision" )}); |
| 126 | } |
| 127 | else |
| 128 | throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); |
| 129 | } |
| 130 | |
| 131 | if (!pattern.function && pattern.retentions.empty()) |
| 132 | throw Exception("At least one of an aggregate function or retention rules is mandatory for rollup patterns in GraphiteMergeTree" , |
| 133 | ErrorCodes::NO_ELEMENTS_IN_CONFIG); |
| 134 | |
| 135 | if (!pattern.function) |
| 136 | { |
| 137 | pattern.type = pattern.TypeRetention; |
| 138 | } |
| 139 | else if (pattern.retentions.empty()) |
| 140 | { |
| 141 | pattern.type = pattern.TypeAggregation; |
| 142 | } |
| 143 | else |
| 144 | { |
| 145 | pattern.type = pattern.TypeAll; |
| 146 | } |
| 147 | |
| 148 | if (pattern.type & pattern.TypeAggregation) /// TypeAggregation or TypeAll |
| 149 | if (pattern.function->allocatesMemoryInArena()) |
| 150 | throw Exception("Aggregate function " + pattern.function->getName() + " isn't supported in GraphiteMergeTree" , |
| 151 | ErrorCodes::NOT_IMPLEMENTED); |
| 152 | |
| 153 | /// retention should be in descending order of age. |
| 154 | if (pattern.type & pattern.TypeRetention) /// TypeRetention or TypeAll |
| 155 | std::sort(pattern.retentions.begin(), pattern.retentions.end(), |
| 156 | [] (const Graphite::Retention & a, const Graphite::Retention & b) { return a.age > b.age; }); |
| 157 | |
| 158 | patterns.emplace_back(pattern); |
| 159 | } |
| 160 | |
| 161 | static void setGraphitePatternsFromConfig(const Context & context, |
| 162 | const String & config_element, Graphite::Params & params) |
| 163 | { |
| 164 | const auto & config = context.getConfigRef(); |
| 165 | |
| 166 | if (!config.has(config_element)) |
| 167 | throw Exception("No '" + config_element + "' element in configuration file" , |
| 168 | ErrorCodes::NO_ELEMENTS_IN_CONFIG); |
| 169 | |
| 170 | params.config_name = config_element; |
| 171 | params.path_column_name = config.getString(config_element + ".path_column_name" , "Path" ); |
| 172 | params.time_column_name = config.getString(config_element + ".time_column_name" , "Time" ); |
| 173 | params.value_column_name = config.getString(config_element + ".value_column_name" , "Value" ); |
| 174 | params.version_column_name = config.getString(config_element + ".version_column_name" , "Timestamp" ); |
| 175 | |
| 176 | Poco::Util::AbstractConfiguration::Keys keys; |
| 177 | config.keys(config_element, keys); |
| 178 | |
| 179 | for (const auto & key : keys) |
| 180 | { |
| 181 | if (startsWith(key, "pattern" )) |
| 182 | { |
| 183 | appendGraphitePattern(config, config_element + "." + key, params.patterns); |
| 184 | } |
| 185 | else if (key == "default" ) |
| 186 | { |
| 187 | /// See below. |
| 188 | } |
| 189 | else if (key == "path_column_name" |
| 190 | || key == "time_column_name" |
| 191 | || key == "value_column_name" |
| 192 | || key == "version_column_name" ) |
| 193 | { |
| 194 | /// See above. |
| 195 | } |
| 196 | else |
| 197 | throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); |
| 198 | } |
| 199 | |
| 200 | if (config.has(config_element + ".default" )) |
| 201 | appendGraphitePattern(config, config_element + "." + ".default" , params.patterns); |
| 202 | } |
| 203 | |
| 204 | |
| 205 | static String getMergeTreeVerboseHelp(bool is_extended_syntax) |
| 206 | { |
| 207 | using namespace std::string_literals; |
| 208 | |
| 209 | String help = R"( |
| 210 | |
| 211 | MergeTree is a family of storage engines. |
| 212 | |
| 213 | MergeTrees are different in two ways: |
| 214 | - they may be replicated and non-replicated; |
| 215 | - they may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions. |
| 216 | |
| 217 | So we have 14 combinations: |
| 218 | MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, GraphiteMergeTree, VersionedCollapsingMergeTree |
| 219 | ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedGraphiteMergeTree, ReplicatedVersionedCollapsingMergeTree |
| 220 | |
| 221 | In most of cases, you need MergeTree or ReplicatedMergeTree. |
| 222 | |
| 223 | For replicated merge trees, you need to supply a path in ZooKeeper and a replica name as the first two parameters. |
| 224 | Path in ZooKeeper is like '/clickhouse/tables/01/' where /clickhouse/tables/ is a common prefix and 01 is a shard name. |
| 225 | Replica name is like 'mtstat01-1' - it may be the hostname or any suitable string identifying replica. |
| 226 | You may use macro substitutions for these parameters. It's like ReplicatedMergeTree('/clickhouse/tables/{shard}/', '{replica}'... |
| 227 | Look at the <macros> section in server configuration file. |
| 228 | )" ; |
| 229 | |
| 230 | if (!is_extended_syntax) |
| 231 | help += R"( |
| 232 | Next parameter (which is the first for unreplicated tables and the third for replicated tables) is the name of date column. |
| 233 | Date column must exist in the table and have type Date (not DateTime). |
| 234 | It is used for internal data partitioning and works like some kind of index. |
| 235 | |
| 236 | If your source data doesn't have a column of type Date, but has a DateTime column, you may add values for Date column while loading, |
| 237 | or you may INSERT your source data to a table of type Log and then transform it with INSERT INTO t SELECT toDate(time) AS date, * FROM ... |
| 238 | If your source data doesn't have any date or time, you may just pass any constant for a date column while loading. |
| 239 | |
| 240 | Next parameter is optional sampling expression. Sampling expression is used to implement SAMPLE clause in query for approximate query execution. |
| 241 | If you don't need approximate query execution, simply omit this parameter. |
| 242 | Sample expression must be one of the elements of the primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID). |
| 243 | |
| 244 | Next parameter is the primary key tuple. It's like (CounterID, EventDate, intHash64(UserID)) - a list of column names or functional expressions in round brackets. If your primary key has just one element, you may omit round brackets. |
| 245 | |
| 246 | Careful choice of the primary key is extremely important for processing short-time queries. |
| 247 | |
| 248 | Next parameter is index (primary key) granularity. Good value is 8192. You have no reasons to use any other value. |
| 249 | )" ; |
| 250 | |
| 251 | help += R"( |
| 252 | For the Collapsing mode, the )" + (is_extended_syntax ? "only"s : "last"s ) + R"( parameter is the name of a sign column - a special column that is used to 'collapse' rows with the same primary key while merging. |
| 253 | |
| 254 | For the Summing mode, the optional )" + (is_extended_syntax ? ""s : "last "s ) + R"(parameter is a list of columns to sum while merging. This list is passed in round brackets, like (PageViews, Cost). |
| 255 | If this parameter is omitted, the storage will sum all numeric columns except columns participating in the primary key. |
| 256 | |
| 257 | For the Replacing mode, the optional )" + (is_extended_syntax ? ""s : "last "s ) + R"(parameter is the name of a 'version' column. While merging, for all rows with the same primary key, only one row is selected: the last row, if the version column was not specified, or the last row with the maximum version value, if specified. |
| 258 | |
| 259 | For VersionedCollapsing mode, the )" + (is_extended_syntax ? ""s : "last "s ) + R"(2 parameters are the name of a sign column and the name of a 'version' column. Version column must be in primary key. While merging, a pair of rows with the same primary key and different sign may collapse. |
| 260 | )" ; |
| 261 | |
| 262 | if (is_extended_syntax) |
| 263 | help += R"( |
| 264 | You can specify a partitioning expression in the PARTITION BY clause. It is optional but highly recommended. |
| 265 | A common partitioning expression is some function of the event date column e.g. PARTITION BY toYYYYMM(EventDate) will partition the table by month. |
| 266 | Rows with different partition expression values are never merged together. That allows manipulating partitions with ALTER commands. |
| 267 | Also it acts as a kind of index. |
| 268 | |
| 269 | Sorting key is specified in the ORDER BY clause. It is mandatory for all MergeTree types. |
| 270 | It is like (CounterID, EventDate, intHash64(UserID)) - a list of column names or functional expressions |
| 271 | in round brackets. |
| 272 | If your sorting key has just one element, you may omit round brackets. |
| 273 | |
| 274 | By default primary key is equal to the sorting key. You can specify a primary key that is a prefix of the |
| 275 | sorting key in the PRIMARY KEY clause. |
| 276 | |
| 277 | Careful choice of the primary key is extremely important for processing short-time queries. |
| 278 | |
| 279 | Optional sampling expression can be specified in the SAMPLE BY clause. It is used to implement the SAMPLE clause in a SELECT query for approximate query execution. |
| 280 | Sampling expression must be one of the elements of the primary key tuple. For example, if your primary key is (CounterID, EventDate, intHash64(UserID)), your sampling expression might be intHash64(UserID). |
| 281 | |
| 282 | Engine settings can be specified in the SETTINGS clause. Full list is in the source code in the 'dbms/src/Storages/MergeTree/MergeTreeSettings.h' file. |
| 283 | E.g. you can specify the index (primary key) granularity with SETTINGS index_granularity = 8192. |
| 284 | |
| 285 | Examples: |
| 286 | |
| 287 | MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity = 8192 |
| 288 | |
| 289 | MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime) SAMPLE BY intHash32(UserID) |
| 290 | |
| 291 | MergeTree PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime) PRIMARY KEY (CounterID, EventDate) SAMPLE BY intHash32(UserID) |
| 292 | |
| 293 | CollapsingMergeTree(Sign) PARTITION BY StartDate SAMPLE BY intHash32(UserID) ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID) |
| 294 | |
| 295 | SummingMergeTree PARTITION BY toMonday(EventDate) ORDER BY (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo) |
| 296 | |
| 297 | SummingMergeTree((Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth)) PARTITION BY toYYYYMM(EventDate) ORDER BY (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo) |
| 298 | |
| 299 | ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY EventDate ORDER BY (CounterID, EventDate, intHash32(UserID), EventTime) SAMPLE BY intHash32(UserID) |
| 300 | )" ; |
| 301 | else |
| 302 | help += R"( |
| 303 | Examples: |
| 304 | |
| 305 | MergeTree(EventDate, (CounterID, EventDate), 8192) |
| 306 | |
| 307 | MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192) |
| 308 | |
| 309 | CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign) |
| 310 | |
| 311 | SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192) |
| 312 | |
| 313 | SummingMergeTree(EventDate, (OrderID, EventDate, BannerID, PhraseID, ContextType, RegionID, PageID, IsFlat, TypeID, ResourceNo), 8192, (Shows, Clicks, Cost, CostCur, ShowsSumPosition, ClicksSumPosition, SessionNum, SessionLen, SessionCost, GoalsNum, SessionDepth)) |
| 314 | |
| 315 | ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192) |
| 316 | )" ; |
| 317 | |
| 318 | help += R"( |
| 319 | For further info please read the documentation: https://clickhouse.yandex/ |
| 320 | )" ; |
| 321 | |
| 322 | return help; |
| 323 | } |
| 324 | |
| 325 | |
| 326 | static StoragePtr create(const StorageFactory::Arguments & args) |
| 327 | { |
| 328 | /** [Replicated][|Summing|Collapsing|Aggregating|Replacing|Graphite]MergeTree (2 * 7 combinations) engines |
| 329 | * The argument for the engine should be: |
| 330 | * - (for Replicated) The path to the table in ZooKeeper |
| 331 | * - (for Replicated) Replica name in ZooKeeper |
| 332 | * - the name of the column with the date; |
| 333 | * - (optional) expression for sampling |
| 334 | * (the query with `SAMPLE x` will select rows that have a lower value in this column than `x * UINT32_MAX`); |
| 335 | * - an expression for sorting (either a scalar expression or a tuple of several); |
| 336 | * - index_granularity; |
| 337 | * - (for Collapsing) the name of Int8 column that contains `sign` type with the change of "visit" (taking values 1 and -1). |
| 338 | * For example: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign). |
| 339 | * - (for Summing, optional) a tuple of columns to be summed. If not specified, all numeric columns that are not included in the primary key are used. |
| 340 | * - (for Replacing, optional) the column name of one of the UInt types, which stands for "version" |
| 341 | * For example: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign). |
| 342 | * - (for Graphite) the parameter name in config file with settings of thinning rules. |
| 343 | * |
| 344 | * MergeTree(date, [sample_key], primary_key, index_granularity) |
| 345 | * CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign) |
| 346 | * SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum]) |
| 347 | * AggregatingMergeTree(date, [sample_key], primary_key, index_granularity) |
| 348 | * ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column]) |
| 349 | * GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, 'config_element') |
| 350 | * |
| 351 | * Alternatively, you can specify: |
| 352 | * - Partitioning expression in the PARTITION BY clause; |
| 353 | * - Sorting key in the ORDER BY clause; |
| 354 | * - Primary key (if it is different from the sorting key) in the PRIMARY KEY clause; |
| 355 | * - Sampling expression in the SAMPLE BY clause; |
| 356 | * - Additional MergeTreeSettings in the SETTINGS clause; |
| 357 | */ |
| 358 | |
| 359 | bool is_extended_storage_def = |
| 360 | args.storage_def->partition_by || args.storage_def->primary_key || args.storage_def->order_by |
| 361 | || args.storage_def->sample_by || (args.query.columns_list->indices && !args.query.columns_list->indices->children.empty()) || args.storage_def->settings; |
| 362 | |
| 363 | String name_part = args.engine_name.substr(0, args.engine_name.size() - strlen("MergeTree" )); |
| 364 | |
| 365 | bool replicated = startsWith(name_part, "Replicated" ); |
| 366 | if (replicated) |
| 367 | name_part = name_part.substr(strlen("Replicated" )); |
| 368 | |
| 369 | MergeTreeData::MergingParams merging_params; |
| 370 | merging_params.mode = MergeTreeData::MergingParams::Ordinary; |
| 371 | |
| 372 | if (name_part == "Collapsing" ) |
| 373 | merging_params.mode = MergeTreeData::MergingParams::Collapsing; |
| 374 | else if (name_part == "Summing" ) |
| 375 | merging_params.mode = MergeTreeData::MergingParams::Summing; |
| 376 | else if (name_part == "Aggregating" ) |
| 377 | merging_params.mode = MergeTreeData::MergingParams::Aggregating; |
| 378 | else if (name_part == "Replacing" ) |
| 379 | merging_params.mode = MergeTreeData::MergingParams::Replacing; |
| 380 | else if (name_part == "Graphite" ) |
| 381 | merging_params.mode = MergeTreeData::MergingParams::Graphite; |
| 382 | else if (name_part == "VersionedCollapsing" ) |
| 383 | merging_params.mode = MergeTreeData::MergingParams::VersionedCollapsing; |
| 384 | else if (!name_part.empty()) |
| 385 | throw Exception( |
| 386 | "Unknown storage " + args.engine_name + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 387 | ErrorCodes::UNKNOWN_STORAGE); |
| 388 | |
| 389 | /// NOTE Quite complicated. |
| 390 | |
| 391 | size_t min_num_params = 0; |
| 392 | size_t max_num_params = 0; |
| 393 | String needed_params; |
| 394 | |
| 395 | auto add_mandatory_param = [&](const char * desc) |
| 396 | { |
| 397 | ++min_num_params; |
| 398 | ++max_num_params; |
| 399 | needed_params += needed_params.empty() ? "\n" : ",\n" ; |
| 400 | needed_params += desc; |
| 401 | }; |
| 402 | auto add_optional_param = [&](const char * desc) |
| 403 | { |
| 404 | ++max_num_params; |
| 405 | needed_params += needed_params.empty() ? "\n" : ",\n[" ; |
| 406 | needed_params += desc; |
| 407 | needed_params += "]" ; |
| 408 | }; |
| 409 | |
| 410 | if (replicated) |
| 411 | { |
| 412 | add_mandatory_param("path in ZooKeeper" ); |
| 413 | add_mandatory_param("replica name" ); |
| 414 | } |
| 415 | |
| 416 | if (!is_extended_storage_def) |
| 417 | { |
| 418 | add_mandatory_param("name of column with date" ); |
| 419 | add_optional_param("sampling element of primary key" ); |
| 420 | add_mandatory_param("primary key expression" ); |
| 421 | add_mandatory_param("index granularity" ); |
| 422 | } |
| 423 | |
| 424 | switch (merging_params.mode) |
| 425 | { |
| 426 | default: |
| 427 | break; |
| 428 | case MergeTreeData::MergingParams::Summing: |
| 429 | add_optional_param("list of columns to sum" ); |
| 430 | break; |
| 431 | case MergeTreeData::MergingParams::Replacing: |
| 432 | add_optional_param("version" ); |
| 433 | break; |
| 434 | case MergeTreeData::MergingParams::Collapsing: |
| 435 | add_mandatory_param("sign column" ); |
| 436 | break; |
| 437 | case MergeTreeData::MergingParams::Graphite: |
| 438 | add_mandatory_param("'config_element_for_graphite_schema'" ); |
| 439 | break; |
| 440 | case MergeTreeData::MergingParams::VersionedCollapsing: |
| 441 | { |
| 442 | add_mandatory_param("sign column" ); |
| 443 | add_mandatory_param("version" ); |
| 444 | break; |
| 445 | } |
| 446 | } |
| 447 | |
| 448 | ASTs & engine_args = args.engine_args; |
| 449 | |
| 450 | if (engine_args.size() < min_num_params || engine_args.size() > max_num_params) |
| 451 | { |
| 452 | String msg; |
| 453 | if (is_extended_storage_def) |
| 454 | msg += "With extended storage definition syntax storage " + args.engine_name + " requires " ; |
| 455 | else |
| 456 | msg += "Storage " + args.engine_name + " requires " ; |
| 457 | |
| 458 | if (max_num_params) |
| 459 | { |
| 460 | if (min_num_params == max_num_params) |
| 461 | msg += toString(min_num_params) + " parameters: " ; |
| 462 | else |
| 463 | msg += toString(min_num_params) + " to " + toString(max_num_params) + " parameters: " ; |
| 464 | msg += needed_params; |
| 465 | } |
| 466 | else |
| 467 | msg += "no parameters" ; |
| 468 | |
| 469 | msg += getMergeTreeVerboseHelp(is_extended_storage_def); |
| 470 | |
| 471 | throw Exception(msg, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
| 472 | } |
| 473 | |
| 474 | /// For Replicated. |
| 475 | String zookeeper_path; |
| 476 | String replica_name; |
| 477 | |
| 478 | if (replicated) |
| 479 | { |
| 480 | const auto * ast = engine_args[0]->as<ASTLiteral>(); |
| 481 | if (ast && ast->value.getType() == Field::Types::String) |
| 482 | zookeeper_path = safeGet<String>(ast->value); |
| 483 | else |
| 484 | throw Exception( |
| 485 | "Path in ZooKeeper must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 486 | ErrorCodes::BAD_ARGUMENTS); |
| 487 | |
| 488 | ast = engine_args[1]->as<ASTLiteral>(); |
| 489 | if (ast && ast->value.getType() == Field::Types::String) |
| 490 | replica_name = safeGet<String>(ast->value); |
| 491 | else |
| 492 | throw Exception( |
| 493 | "Replica name must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 494 | ErrorCodes::BAD_ARGUMENTS); |
| 495 | |
| 496 | if (replica_name.empty()) |
| 497 | throw Exception( |
| 498 | "No replica name in config" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 499 | ErrorCodes::NO_REPLICA_NAME_GIVEN); |
| 500 | |
| 501 | engine_args.erase(engine_args.begin(), engine_args.begin() + 2); |
| 502 | } |
| 503 | |
| 504 | if (merging_params.mode == MergeTreeData::MergingParams::Collapsing) |
| 505 | { |
| 506 | if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.sign_column)) |
| 507 | throw Exception( |
| 508 | "Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 509 | ErrorCodes::BAD_ARGUMENTS); |
| 510 | |
| 511 | engine_args.pop_back(); |
| 512 | } |
| 513 | else if (merging_params.mode == MergeTreeData::MergingParams::Replacing) |
| 514 | { |
| 515 | /// If the last element is not index_granularity or replica_name (a literal), then this is the name of the version column. |
| 516 | if (!engine_args.empty() && !engine_args.back()->as<ASTLiteral>()) |
| 517 | { |
| 518 | if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.version_column)) |
| 519 | throw Exception( |
| 520 | "Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 521 | ErrorCodes::BAD_ARGUMENTS); |
| 522 | |
| 523 | engine_args.pop_back(); |
| 524 | } |
| 525 | } |
| 526 | else if (merging_params.mode == MergeTreeData::MergingParams::Summing) |
| 527 | { |
| 528 | /// If the last element is not index_granularity or replica_name (a literal), then this is a list of summable columns. |
| 529 | if (!engine_args.empty() && !engine_args.back()->as<ASTLiteral>()) |
| 530 | { |
| 531 | merging_params.columns_to_sum = extractColumnNames(engine_args.back()); |
| 532 | engine_args.pop_back(); |
| 533 | } |
| 534 | } |
| 535 | else if (merging_params.mode == MergeTreeData::MergingParams::Graphite) |
| 536 | { |
| 537 | String graphite_config_name; |
| 538 | String error_msg = "Last parameter of GraphiteMergeTree must be name (in single quotes) of element in configuration file with Graphite options" ; |
| 539 | error_msg += getMergeTreeVerboseHelp(is_extended_storage_def); |
| 540 | |
| 541 | if (const auto * ast = engine_args.back()->as<ASTLiteral>()) |
| 542 | { |
| 543 | if (ast->value.getType() != Field::Types::String) |
| 544 | throw Exception(error_msg, ErrorCodes::BAD_ARGUMENTS); |
| 545 | |
| 546 | graphite_config_name = ast->value.get<String>(); |
| 547 | } |
| 548 | else |
| 549 | throw Exception(error_msg, ErrorCodes::BAD_ARGUMENTS); |
| 550 | |
| 551 | engine_args.pop_back(); |
| 552 | setGraphitePatternsFromConfig(args.context, graphite_config_name, merging_params.graphite_params); |
| 553 | } |
| 554 | else if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) |
| 555 | { |
| 556 | if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.version_column)) |
| 557 | throw Exception( |
| 558 | "Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 559 | ErrorCodes::BAD_ARGUMENTS); |
| 560 | |
| 561 | engine_args.pop_back(); |
| 562 | |
| 563 | if (!tryGetIdentifierNameInto(engine_args.back(), merging_params.sign_column)) |
| 564 | throw Exception( |
| 565 | "Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 566 | ErrorCodes::BAD_ARGUMENTS); |
| 567 | |
| 568 | engine_args.pop_back(); |
| 569 | } |
| 570 | |
| 571 | String date_column_name; |
| 572 | ASTPtr partition_by_ast; |
| 573 | ASTPtr order_by_ast; |
| 574 | ASTPtr primary_key_ast; |
| 575 | ASTPtr sample_by_ast; |
| 576 | ASTPtr ttl_table_ast; |
| 577 | ASTPtr settings_ast; |
| 578 | IndicesDescription indices_description; |
| 579 | ConstraintsDescription constraints_description; |
| 580 | |
| 581 | std::unique_ptr<MergeTreeSettings> storage_settings = std::make_unique<MergeTreeSettings>(args.context.getMergeTreeSettings()); |
| 582 | |
| 583 | if (is_extended_storage_def) |
| 584 | { |
| 585 | if (args.storage_def->partition_by) |
| 586 | partition_by_ast = args.storage_def->partition_by->ptr(); |
| 587 | |
| 588 | if (!args.storage_def->order_by) |
| 589 | throw Exception("You must provide an ORDER BY expression in the table definition. " |
| 590 | "If you don't want this table to be sorted, use ORDER BY tuple()" , |
| 591 | ErrorCodes::BAD_ARGUMENTS); |
| 592 | |
| 593 | order_by_ast = args.storage_def->order_by->ptr(); |
| 594 | |
| 595 | if (args.storage_def->primary_key) |
| 596 | primary_key_ast = args.storage_def->primary_key->ptr(); |
| 597 | |
| 598 | if (args.storage_def->sample_by) |
| 599 | sample_by_ast = args.storage_def->sample_by->ptr(); |
| 600 | |
| 601 | if (args.storage_def->ttl_table) |
| 602 | ttl_table_ast = args.storage_def->ttl_table->ptr(); |
| 603 | |
| 604 | |
| 605 | if (args.query.columns_list && args.query.columns_list->indices) |
| 606 | for (const auto & index : args.query.columns_list->indices->children) |
| 607 | indices_description.indices.push_back( |
| 608 | std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone())); |
| 609 | |
| 610 | storage_settings->loadFromQuery(*args.storage_def); |
| 611 | |
| 612 | if (args.storage_def->settings) |
| 613 | settings_ast = args.storage_def->settings->ptr(); |
| 614 | } |
| 615 | else |
| 616 | { |
| 617 | /// If there is an expression for sampling. MergeTree(date, [sample_key], primary_key, index_granularity) |
| 618 | if (engine_args.size() == 4) |
| 619 | { |
| 620 | sample_by_ast = engine_args[1]; |
| 621 | engine_args.erase(engine_args.begin() + 1); |
| 622 | } |
| 623 | |
| 624 | /// Now only three parameters remain - date (or partitioning expression), primary_key, index_granularity. |
| 625 | |
| 626 | if (!tryGetIdentifierNameInto(engine_args[0], date_column_name)) |
| 627 | throw Exception( |
| 628 | "Date column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 629 | ErrorCodes::BAD_ARGUMENTS); |
| 630 | |
| 631 | order_by_ast = engine_args[1]; |
| 632 | |
| 633 | const auto * ast = engine_args.back()->as<ASTLiteral>(); |
| 634 | if (ast && ast->value.getType() == Field::Types::UInt64) |
| 635 | storage_settings->index_granularity = safeGet<UInt64>(ast->value); |
| 636 | else |
| 637 | throw Exception( |
| 638 | "Index granularity must be a positive integer" + getMergeTreeVerboseHelp(is_extended_storage_def), |
| 639 | ErrorCodes::BAD_ARGUMENTS); |
| 640 | } |
| 641 | |
| 642 | if (!args.attach && !indices_description.empty() && !args.local_context.getSettingsRef().allow_experimental_data_skipping_indices) |
| 643 | throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \ |
| 644 | "before using data skipping indices." , ErrorCodes::BAD_ARGUMENTS); |
| 645 | |
| 646 | StorageInMemoryMetadata metadata{ |
| 647 | .columns = args.columns, |
| 648 | .indices = indices_description, |
| 649 | .constraints = args.constraints, |
| 650 | .partition_by_ast = partition_by_ast, |
| 651 | .order_by_ast = order_by_ast, |
| 652 | .primary_key_ast = primary_key_ast, |
| 653 | .ttl_for_table_ast = ttl_table_ast, |
| 654 | .sample_by_ast = sample_by_ast, |
| 655 | .settings_ast = settings_ast, |
| 656 | }; |
| 657 | if (replicated) |
| 658 | return StorageReplicatedMergeTree::create( |
| 659 | zookeeper_path, replica_name, args.attach, args.database_name, args.table_name, args.relative_data_path, |
| 660 | metadata, args.context, date_column_name, merging_params, std::move(storage_settings), |
| 661 | args.has_force_restore_data_flag); |
| 662 | else |
| 663 | return StorageMergeTree::create( |
| 664 | args.database_name, args.table_name, args.relative_data_path, metadata, args.attach, args.context, |
| 665 | date_column_name, merging_params, std::move(storage_settings), |
| 666 | args.has_force_restore_data_flag); |
| 667 | } |
| 668 | |
| 669 | |
| 670 | void registerStorageMergeTree(StorageFactory & factory) |
| 671 | { |
| 672 | factory.registerStorage("MergeTree" , create); |
| 673 | factory.registerStorage("CollapsingMergeTree" , create); |
| 674 | factory.registerStorage("ReplacingMergeTree" , create); |
| 675 | factory.registerStorage("AggregatingMergeTree" , create); |
| 676 | factory.registerStorage("SummingMergeTree" , create); |
| 677 | factory.registerStorage("GraphiteMergeTree" , create); |
| 678 | factory.registerStorage("VersionedCollapsingMergeTree" , create); |
| 679 | |
| 680 | factory.registerStorage("ReplicatedMergeTree" , create); |
| 681 | factory.registerStorage("ReplicatedCollapsingMergeTree" , create); |
| 682 | factory.registerStorage("ReplicatedReplacingMergeTree" , create); |
| 683 | factory.registerStorage("ReplicatedAggregatingMergeTree" , create); |
| 684 | factory.registerStorage("ReplicatedSummingMergeTree" , create); |
| 685 | factory.registerStorage("ReplicatedGraphiteMergeTree" , create); |
| 686 | factory.registerStorage("ReplicatedVersionedCollapsingMergeTree" , create); |
| 687 | } |
| 688 | |
| 689 | } |
| 690 | |