1 | #include <Databases/DatabaseWithDictionaries.h> |
2 | #include <Interpreters/ExternalDictionariesLoader.h> |
3 | #include <Interpreters/ExternalLoaderTempConfigRepository.h> |
4 | #include <Interpreters/ExternalLoaderDatabaseConfigRepository.h> |
5 | #include <Dictionaries/getDictionaryConfigurationFromAST.h> |
6 | #include <Interpreters/Context.h> |
7 | #include <Storages/StorageDictionary.h> |
8 | #include <IO/WriteBufferFromFile.h> |
9 | #include <Poco/File.h> |
10 | #include <ext/scope_guard.h> |
11 | |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | namespace ErrorCodes |
17 | { |
18 | extern const int EMPTY_LIST_OF_COLUMNS_PASSED; |
19 | extern const int TABLE_ALREADY_EXISTS; |
20 | extern const int UNKNOWN_TABLE; |
21 | extern const int LOGICAL_ERROR; |
22 | extern const int DICTIONARY_ALREADY_EXISTS; |
23 | } |
24 | |
25 | |
26 | void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name, const Context & context) |
27 | { |
28 | String full_name = getDatabaseName() + "." + dictionary_name; |
29 | { |
30 | std::lock_guard lock(mutex); |
31 | if (!dictionaries.emplace(dictionary_name).second) |
32 | throw Exception("Dictionary " + full_name + " already exists." , ErrorCodes::DICTIONARY_ALREADY_EXISTS); |
33 | } |
34 | |
35 | /// ExternalLoader::reloadConfig() will find out that the dictionary's config has been added |
36 | /// and in case `dictionaries_lazy_load == false` it will load the dictionary. |
37 | const auto & external_loader = context.getExternalDictionariesLoader(); |
38 | external_loader.reloadConfig(getDatabaseName(), full_name); |
39 | } |
40 | |
41 | void DatabaseWithDictionaries::detachDictionary(const String & dictionary_name, const Context & context) |
42 | { |
43 | String full_name = getDatabaseName() + "." + dictionary_name; |
44 | { |
45 | std::lock_guard lock(mutex); |
46 | auto it = dictionaries.find(dictionary_name); |
47 | if (it == dictionaries.end()) |
48 | throw Exception("Dictionary " + full_name + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
49 | dictionaries.erase(it); |
50 | } |
51 | |
52 | /// ExternalLoader::reloadConfig() will find out that the dictionary's config has been removed |
53 | /// and therefore it will unload the dictionary. |
54 | const auto & external_loader = context.getExternalDictionariesLoader(); |
55 | external_loader.reloadConfig(getDatabaseName(), full_name); |
56 | |
57 | } |
58 | |
59 | void DatabaseWithDictionaries::createDictionary(const Context & context, const String & dictionary_name, const ASTPtr & query) |
60 | { |
61 | const auto & settings = context.getSettingsRef(); |
62 | |
63 | /** The code is based on the assumption that all threads share the same order of operations: |
64 | * - create the .sql.tmp file; |
65 | * - add the dictionary to ExternalDictionariesLoader; |
66 | * - load the dictionary in case dictionaries_lazy_load == false; |
67 | * - attach the dictionary; |
68 | * - rename .sql.tmp to .sql. |
69 | */ |
70 | |
71 | /// A race condition would be possible if a dictionary with the same name is simultaneously created using CREATE and using ATTACH. |
72 | /// But there is protection from it - see using DDLGuard in InterpreterCreateQuery. |
73 | if (isDictionaryExist(context, dictionary_name)) |
74 | throw Exception("Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists." , ErrorCodes::DICTIONARY_ALREADY_EXISTS); |
75 | |
76 | /// A dictionary with the same full name could be defined in *.xml config files. |
77 | String full_name = getDatabaseName() + "." + dictionary_name; |
78 | const auto & external_loader = context.getExternalDictionariesLoader(); |
79 | if (external_loader.getCurrentStatus(full_name) != ExternalLoader::Status::NOT_EXIST) |
80 | throw Exception( |
81 | "Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists." , |
82 | ErrorCodes::DICTIONARY_ALREADY_EXISTS); |
83 | |
84 | if (isTableExist(context, dictionary_name)) |
85 | throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists." , ErrorCodes::TABLE_ALREADY_EXISTS); |
86 | |
87 | |
88 | String dictionary_metadata_path = getObjectMetadataPath(dictionary_name); |
89 | String dictionary_metadata_tmp_path = dictionary_metadata_path + ".tmp" ; |
90 | String statement = getObjectDefinitionFromCreateQuery(query); |
91 | |
92 | { |
93 | /// Exclusive flags guarantees, that table is not created right now in another thread. Otherwise, exception will be thrown. |
94 | WriteBufferFromFile out(dictionary_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL); |
95 | writeString(statement, out); |
96 | out.next(); |
97 | if (settings.fsync_metadata) |
98 | out.sync(); |
99 | out.close(); |
100 | } |
101 | |
102 | bool succeeded = false; |
103 | SCOPE_EXIT({ |
104 | if (!succeeded) |
105 | Poco::File(dictionary_metadata_tmp_path).remove(); |
106 | }); |
107 | |
108 | /// Add a temporary repository containing the dictionary. |
109 | /// We need this temp repository to try loading the dictionary before actually attaching it to the database. |
110 | auto temp_repository |
111 | = const_cast<ExternalDictionariesLoader &>(external_loader) /// the change of ExternalDictionariesLoader is temporary |
112 | .addConfigRepository(std::make_unique<ExternalLoaderTempConfigRepository>( |
113 | getDatabaseName(), dictionary_metadata_tmp_path, getDictionaryConfigurationFromAST(query->as<const ASTCreateQuery &>()))); |
114 | |
115 | bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load" , true); |
116 | if (!lazy_load) |
117 | { |
118 | /// load() is called here to force loading the dictionary, wait until the loading is finished, |
119 | /// and throw an exception if the loading is failed. |
120 | external_loader.load(full_name); |
121 | } |
122 | |
123 | attachDictionary(dictionary_name, context); |
124 | SCOPE_EXIT({ |
125 | if (!succeeded) |
126 | detachDictionary(dictionary_name, context); |
127 | }); |
128 | |
129 | /// If it was ATTACH query and file with dictionary metadata already exist |
130 | /// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one. |
131 | Poco::File(dictionary_metadata_tmp_path).renameTo(dictionary_metadata_path); |
132 | |
133 | /// ExternalDictionariesLoader doesn't know we renamed the metadata path. |
134 | /// So we have to manually call reloadConfig() here. |
135 | external_loader.reloadConfig(getDatabaseName(), full_name); |
136 | |
137 | /// Everything's ok. |
138 | succeeded = true; |
139 | } |
140 | |
141 | void DatabaseWithDictionaries::removeDictionary(const Context & context, const String & dictionary_name) |
142 | { |
143 | detachDictionary(dictionary_name, context); |
144 | |
145 | String dictionary_metadata_path = getObjectMetadataPath(dictionary_name); |
146 | |
147 | try |
148 | { |
149 | Poco::File(dictionary_metadata_path).remove(); |
150 | } |
151 | catch (...) |
152 | { |
153 | /// If remove was not possible for some reason |
154 | attachDictionary(dictionary_name, context); |
155 | throw; |
156 | } |
157 | } |
158 | |
159 | StoragePtr DatabaseWithDictionaries::tryGetTable(const Context & context, const String & table_name) const |
160 | { |
161 | if (auto table_ptr = DatabaseWithOwnTablesBase::tryGetTable(context, table_name)) |
162 | return table_ptr; |
163 | |
164 | if (isDictionaryExist(context, table_name)) |
165 | /// We don't need lock database here, because database doesn't store dictionary itself |
166 | /// just metadata |
167 | return getDictionaryStorage(context, table_name); |
168 | |
169 | return {}; |
170 | } |
171 | |
172 | DatabaseTablesIteratorPtr DatabaseWithDictionaries::getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_name) |
173 | { |
174 | /// NOTE: it's not atomic |
175 | auto tables_it = getTablesIterator(context, filter_by_name); |
176 | auto dictionaries_it = getDictionariesIterator(context, filter_by_name); |
177 | |
178 | Tables result; |
179 | while (tables_it && tables_it->isValid()) |
180 | { |
181 | result.emplace(tables_it->name(), tables_it->table()); |
182 | tables_it->next(); |
183 | } |
184 | |
185 | while (dictionaries_it && dictionaries_it->isValid()) |
186 | { |
187 | auto table_name = dictionaries_it->name(); |
188 | auto table_ptr = getDictionaryStorage(context, table_name); |
189 | if (table_ptr) |
190 | result.emplace(table_name, table_ptr); |
191 | dictionaries_it->next(); |
192 | } |
193 | |
194 | return std::make_unique<DatabaseTablesSnapshotIterator>(result); |
195 | } |
196 | |
197 | DatabaseDictionariesIteratorPtr DatabaseWithDictionaries::getDictionariesIterator(const Context & /*context*/, const FilterByNameFunction & filter_by_dictionary_name) |
198 | { |
199 | std::lock_guard lock(mutex); |
200 | if (!filter_by_dictionary_name) |
201 | return std::make_unique<DatabaseDictionariesSnapshotIterator>(dictionaries); |
202 | |
203 | Dictionaries filtered_dictionaries; |
204 | for (const auto & dictionary_name : dictionaries) |
205 | if (filter_by_dictionary_name(dictionary_name)) |
206 | filtered_dictionaries.emplace(dictionary_name); |
207 | return std::make_unique<DatabaseDictionariesSnapshotIterator>(std::move(filtered_dictionaries)); |
208 | } |
209 | |
210 | bool DatabaseWithDictionaries::isDictionaryExist(const Context & /*context*/, const String & dictionary_name) const |
211 | { |
212 | std::lock_guard lock(mutex); |
213 | return dictionaries.find(dictionary_name) != dictionaries.end(); |
214 | } |
215 | |
216 | StoragePtr DatabaseWithDictionaries::getDictionaryStorage(const Context & context, const String & table_name) const |
217 | { |
218 | auto dict_name = database_name + "." + table_name; |
219 | const auto & external_loader = context.getExternalDictionariesLoader(); |
220 | auto dict_ptr = external_loader.tryGetDictionary(dict_name); |
221 | if (dict_ptr) |
222 | { |
223 | const DictionaryStructure & dictionary_structure = dict_ptr->getStructure(); |
224 | auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure); |
225 | return StorageDictionary::create(database_name, table_name, ColumnsDescription{columns}, context, true, dict_name); |
226 | } |
227 | return nullptr; |
228 | } |
229 | |
230 | ASTPtr DatabaseWithDictionaries::getCreateDictionaryQueryImpl( |
231 | const Context & context, |
232 | const String & dictionary_name, |
233 | bool throw_on_error) const |
234 | { |
235 | ASTPtr ast; |
236 | |
237 | auto dictionary_metadata_path = getObjectMetadataPath(dictionary_name); |
238 | ast = getCreateQueryFromMetadata(dictionary_metadata_path, throw_on_error); |
239 | if (!ast && throw_on_error) |
240 | { |
241 | /// Handle system.* tables for which there are no table.sql files. |
242 | bool has_dictionary = isDictionaryExist(context, dictionary_name); |
243 | |
244 | auto msg = has_dictionary ? "There is no CREATE DICTIONARY query for table " : "There is no metadata file for dictionary " ; |
245 | |
246 | throw Exception(msg + backQuote(dictionary_name), ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY); |
247 | } |
248 | |
249 | return ast; |
250 | } |
251 | |
252 | void DatabaseWithDictionaries::shutdown() |
253 | { |
254 | detachFromExternalDictionariesLoader(); |
255 | DatabaseOnDisk::shutdown(); |
256 | } |
257 | |
258 | DatabaseWithDictionaries::~DatabaseWithDictionaries() = default; |
259 | |
260 | void DatabaseWithDictionaries::attachToExternalDictionariesLoader(Context & context) |
261 | { |
262 | database_as_config_repo_for_external_loader = context.getExternalDictionariesLoader().addConfigRepository( |
263 | std::make_unique<ExternalLoaderDatabaseConfigRepository>(*this, context)); |
264 | } |
265 | |
266 | void DatabaseWithDictionaries::detachFromExternalDictionariesLoader() |
267 | { |
268 | database_as_config_repo_for_external_loader = {}; |
269 | } |
270 | |
271 | } |
272 | |