1 | #include <sstream> |
2 | #include <DataTypes/DataTypesNumber.h> |
3 | #include <DataTypes/DataTypeDate.h> |
4 | #include <Dictionaries/IDictionarySource.h> |
5 | #include <Dictionaries/DictionaryStructure.h> |
6 | #include <Storages/StorageDictionary.h> |
7 | #include <Storages/StorageFactory.h> |
8 | #include <Interpreters/Context.h> |
9 | #include <Interpreters/evaluateConstantExpression.h> |
10 | #include <Interpreters/ExternalDictionariesLoader.h> |
11 | #include <Parsers/ASTLiteral.h> |
12 | #include <common/logger_useful.h> |
13 | #include <Common/typeid_cast.h> |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | namespace ErrorCodes |
20 | { |
21 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
22 | extern const int THERE_IS_NO_COLUMN; |
23 | } |
24 | |
25 | |
26 | StorageDictionary::StorageDictionary( |
27 | const String & database_name_, |
28 | const String & table_name_, |
29 | const ColumnsDescription & columns_, |
30 | const Context & context, |
31 | bool attach, |
32 | const String & dictionary_name_) |
33 | : table_name(table_name_), |
34 | database_name(database_name_), |
35 | dictionary_name(dictionary_name_), |
36 | logger(&Poco::Logger::get("StorageDictionary" )) |
37 | { |
38 | setColumns(columns_); |
39 | |
40 | if (!attach) |
41 | { |
42 | const auto & dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name); |
43 | const DictionaryStructure & dictionary_structure = dictionary->getStructure(); |
44 | checkNamesAndTypesCompatibleWithDictionary(dictionary_structure); |
45 | } |
46 | } |
47 | |
48 | BlockInputStreams StorageDictionary::read( |
49 | const Names & column_names, |
50 | const SelectQueryInfo & /*query_info*/, |
51 | const Context & context, |
52 | QueryProcessingStage::Enum /*processed_stage*/, |
53 | const size_t max_block_size, |
54 | const unsigned /*threads*/) |
55 | { |
56 | auto dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name); |
57 | return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)}; |
58 | } |
59 | |
60 | NamesAndTypesList StorageDictionary::getNamesAndTypes(const DictionaryStructure & dictionary_structure) |
61 | { |
62 | NamesAndTypesList dictionary_names_and_types; |
63 | |
64 | if (dictionary_structure.id) |
65 | dictionary_names_and_types.emplace_back(dictionary_structure.id->name, std::make_shared<DataTypeUInt64>()); |
66 | if (dictionary_structure.range_min) |
67 | dictionary_names_and_types.emplace_back(dictionary_structure.range_min->name, dictionary_structure.range_min->type); |
68 | if (dictionary_structure.range_max) |
69 | dictionary_names_and_types.emplace_back(dictionary_structure.range_max->name, dictionary_structure.range_max->type); |
70 | if (dictionary_structure.key) |
71 | for (const auto & attribute : *dictionary_structure.key) |
72 | dictionary_names_and_types.emplace_back(attribute.name, attribute.type); |
73 | |
74 | for (const auto & attribute : dictionary_structure.attributes) |
75 | dictionary_names_and_types.emplace_back(attribute.name, attribute.type); |
76 | |
77 | return dictionary_names_and_types; |
78 | } |
79 | |
80 | void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionary_structure) const |
81 | { |
82 | auto dictionary_names_and_types = getNamesAndTypes(dictionary_structure); |
83 | std::set<NameAndTypePair> names_and_types_set(dictionary_names_and_types.begin(), dictionary_names_and_types.end()); |
84 | |
85 | for (const auto & column : getColumns().getOrdinary()) |
86 | { |
87 | if (names_and_types_set.find(column) == names_and_types_set.end()) |
88 | { |
89 | std::string message = "Not found column " ; |
90 | message += column.name + " " + column.type->getName(); |
91 | message += " in dictionary " + dictionary_name + ". " ; |
92 | message += "There are only columns " ; |
93 | message += generateNamesAndTypesDescription(dictionary_names_and_types.begin(), dictionary_names_and_types.end()); |
94 | throw Exception(message, ErrorCodes::THERE_IS_NO_COLUMN); |
95 | } |
96 | } |
97 | } |
98 | |
99 | void registerStorageDictionary(StorageFactory & factory) |
100 | { |
101 | factory.registerStorage("Dictionary" , [](const StorageFactory::Arguments & args) |
102 | { |
103 | if (args.engine_args.size() != 1) |
104 | throw Exception("Storage Dictionary requires single parameter: name of dictionary" , |
105 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
106 | |
107 | args.engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args.engine_args[0], args.local_context); |
108 | String dictionary_name = args.engine_args[0]->as<ASTLiteral &>().value.safeGet<String>(); |
109 | |
110 | return StorageDictionary::create( |
111 | args.database_name, args.table_name, args.columns, args.context, args.attach, dictionary_name); |
112 | }); |
113 | } |
114 | |
115 | } |
116 | |