1 | #include "ExternalLoader.h" |
2 | |
3 | #include <mutex> |
4 | #include <pcg_random.hpp> |
5 | #include <Common/Config/AbstractConfigurationComparison.h> |
6 | #include <Common/Exception.h> |
7 | #include <Common/StringUtils/StringUtils.h> |
8 | #include <Common/ThreadPool.h> |
9 | #include <Common/randomSeed.h> |
10 | #include <Common/setThreadName.h> |
11 | #include <ext/chrono_io.h> |
12 | #include <ext/scope_guard.h> |
13 | #include <boost/range/adaptor/map.hpp> |
14 | #include <boost/range/algorithm/copy.hpp> |
15 | #include <unordered_set> |
16 | |
17 | |
18 | namespace DB |
19 | { |
20 | namespace ErrorCodes |
21 | { |
22 | extern const int LOGICAL_ERROR; |
23 | extern const int BAD_ARGUMENTS; |
24 | } |
25 | |
26 | |
27 | namespace |
28 | { |
29 | template <typename ReturnType> |
30 | ReturnType convertTo(ExternalLoader::LoadResult result) |
31 | { |
32 | if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResult>) |
33 | return result; |
34 | else |
35 | { |
36 | static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>); |
37 | return std::move(result.object); |
38 | } |
39 | } |
40 | |
41 | template <typename ReturnType> |
42 | ReturnType convertTo(ExternalLoader::LoadResults results) |
43 | { |
44 | if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResults>) |
45 | return results; |
46 | else |
47 | { |
48 | static_assert(std::is_same_v<ReturnType, ExternalLoader::Loadables>); |
49 | ExternalLoader::Loadables objects; |
50 | objects.reserve(results.size()); |
51 | for (const auto & result : results) |
52 | { |
53 | if (auto object = std::move(result.object)) |
54 | objects.push_back(std::move(object)); |
55 | } |
56 | return objects; |
57 | } |
58 | } |
59 | |
60 | template <typename ReturnType> |
61 | ReturnType notExists(const String & name) |
62 | { |
63 | if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResult>) |
64 | { |
65 | ExternalLoader::LoadResult res; |
66 | res.name = name; |
67 | return res; |
68 | } |
69 | else |
70 | { |
71 | static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>); |
72 | return nullptr; |
73 | } |
74 | } |
75 | |
76 | |
77 | /// Lock mutex only in async mode |
78 | /// In other case does nothing |
79 | struct LoadingGuardForAsyncLoad |
80 | { |
81 | std::unique_lock<std::mutex> lock; |
82 | LoadingGuardForAsyncLoad(bool async, std::mutex & mutex) |
83 | { |
84 | if (async) |
85 | lock = std::unique_lock(mutex); |
86 | } |
87 | }; |
88 | } |
89 | |
90 | struct ExternalLoader::ObjectConfig |
91 | { |
92 | Poco::AutoPtr<Poco::Util::AbstractConfiguration> config; |
93 | String key_in_config; |
94 | String repository_name; |
95 | bool from_temp_repository = false; |
96 | String path; |
97 | }; |
98 | |
99 | |
100 | /** Reads configurations from configuration repository and parses it. |
101 | */ |
102 | class ExternalLoader::LoadablesConfigReader : private boost::noncopyable |
103 | { |
104 | public: |
105 | LoadablesConfigReader(const String & type_name_, Logger * log_) |
106 | : type_name(type_name_), log(log_) |
107 | { |
108 | } |
109 | ~LoadablesConfigReader() = default; |
110 | |
111 | using Repository = IExternalLoaderConfigRepository; |
112 | |
113 | void addConfigRepository(std::unique_ptr<Repository> repository) |
114 | { |
115 | std::lock_guard lock{mutex}; |
116 | auto * ptr = repository.get(); |
117 | repositories.emplace(ptr, RepositoryInfo{std::move(repository), {}}); |
118 | need_collect_object_configs = true; |
119 | } |
120 | |
121 | void removeConfigRepository(Repository * repository) |
122 | { |
123 | std::lock_guard lock{mutex}; |
124 | auto it = repositories.find(repository); |
125 | if (it == repositories.end()) |
126 | return; |
127 | repositories.erase(it); |
128 | need_collect_object_configs = true; |
129 | } |
130 | |
131 | void setConfigSettings(const ExternalLoaderConfigSettings & settings_) |
132 | { |
133 | std::lock_guard lock{mutex}; |
134 | settings = settings_; |
135 | } |
136 | |
137 | using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, ObjectConfig>>; |
138 | |
139 | /// Reads all repositories. |
140 | ObjectConfigsPtr read() |
141 | { |
142 | std::lock_guard lock(mutex); |
143 | readRepositories(); |
144 | collectObjectConfigs(); |
145 | return object_configs; |
146 | } |
147 | |
148 | /// Reads only a specified repository. |
149 | /// This functions checks only a specified repository but returns configs from all repositories. |
150 | ObjectConfigsPtr read(const String & repository_name) |
151 | { |
152 | std::lock_guard lock(mutex); |
153 | readRepositories(repository_name); |
154 | collectObjectConfigs(); |
155 | return object_configs; |
156 | } |
157 | |
158 | /// Reads only a specified path from a specified repository. |
159 | /// This functions checks only a specified repository but returns configs from all repositories. |
160 | ObjectConfigsPtr read(const String & repository_name, const String & path) |
161 | { |
162 | std::lock_guard lock(mutex); |
163 | readRepositories(repository_name, path); |
164 | collectObjectConfigs(); |
165 | return object_configs; |
166 | } |
167 | |
168 | private: |
169 | struct FileInfo |
170 | { |
171 | Poco::Timestamp last_update_time = 0; |
172 | std::vector<std::pair<String, ObjectConfig>> objects; // Parsed contents of the file. |
173 | bool in_use = true; // Whether the `FileInfo` should be destroyed because the correspondent file is deleted. |
174 | }; |
175 | |
176 | struct RepositoryInfo |
177 | { |
178 | std::unique_ptr<Repository> repository; |
179 | std::unordered_map<String /* path */, FileInfo> files; |
180 | }; |
181 | |
182 | /// Reads the repositories. |
183 | /// Checks last modification times of files and read those files which are new or changed. |
184 | void readRepositories(const std::optional<String> & only_repository_name = {}, const std::optional<String> & only_path = {}) |
185 | { |
186 | for (auto & [repository, repository_info] : repositories) |
187 | { |
188 | if (only_repository_name && (repository->getName() != *only_repository_name)) |
189 | continue; |
190 | |
191 | for (auto & file_info : repository_info.files | boost::adaptors::map_values) |
192 | file_info.in_use = false; |
193 | |
194 | Strings existing_paths; |
195 | if (only_path) |
196 | { |
197 | if (repository->exists(*only_path)) |
198 | existing_paths.push_back(*only_path); |
199 | } |
200 | else |
201 | boost::copy(repository->getAllLoadablesDefinitionNames(), std::back_inserter(existing_paths)); |
202 | |
203 | for (const auto & path : existing_paths) |
204 | { |
205 | auto it = repository_info.files.find(path); |
206 | if (it != repository_info.files.end()) |
207 | { |
208 | FileInfo & file_info = it->second; |
209 | if (readFileInfo(file_info, *repository, path)) |
210 | need_collect_object_configs = true; |
211 | } |
212 | else |
213 | { |
214 | FileInfo file_info; |
215 | if (readFileInfo(file_info, *repository, path)) |
216 | { |
217 | repository_info.files.emplace(path, std::move(file_info)); |
218 | need_collect_object_configs = true; |
219 | } |
220 | } |
221 | } |
222 | |
223 | Strings deleted_paths; |
224 | for (auto & [path, file_info] : repository_info.files) |
225 | { |
226 | if (file_info.in_use) |
227 | continue; |
228 | |
229 | if (only_path && (*only_path != path)) |
230 | continue; |
231 | |
232 | deleted_paths.emplace_back(path); |
233 | } |
234 | |
235 | if (!deleted_paths.empty()) |
236 | { |
237 | for (const String & deleted_path : deleted_paths) |
238 | repository_info.files.erase(deleted_path); |
239 | need_collect_object_configs = true; |
240 | } |
241 | } |
242 | } |
243 | |
244 | /// Reads a file, returns true if the file is new or changed. |
245 | bool readFileInfo( |
246 | FileInfo & file_info, |
247 | IExternalLoaderConfigRepository & repository, |
248 | const String & path) const |
249 | { |
250 | try |
251 | { |
252 | if (path.empty() || !repository.exists(path)) |
253 | { |
254 | LOG_WARNING(log, "Config file '" + path + "' does not exist" ); |
255 | return false; |
256 | } |
257 | |
258 | auto update_time_from_repository = repository.getUpdateTime(path); |
259 | |
260 | /// Actually it can't be less, but for sure we check less or equal |
261 | if (update_time_from_repository <= file_info.last_update_time) |
262 | { |
263 | file_info.in_use = true; |
264 | return false; |
265 | } |
266 | |
267 | auto file_contents = repository.load(path); |
268 | |
269 | /// get all objects' definitions |
270 | Poco::Util::AbstractConfiguration::Keys keys; |
271 | file_contents->keys(keys); |
272 | |
273 | /// for each object defined in repositories |
274 | std::vector<std::pair<String, ObjectConfig>> object_configs_from_file; |
275 | for (const auto & key : keys) |
276 | { |
277 | if (!startsWith(key, settings.external_config)) |
278 | { |
279 | if (!startsWith(key, "comment" ) && !startsWith(key, "include_from" )) |
280 | LOG_WARNING(log, path << ": file contains unknown node '" << key << "', expected '" << settings.external_config << "'" ); |
281 | continue; |
282 | } |
283 | |
284 | String object_name = file_contents->getString(key + "." + settings.external_name); |
285 | if (object_name.empty()) |
286 | { |
287 | LOG_WARNING(log, path << ": node '" << key << "' defines " << type_name << " with an empty name. It's not allowed" ); |
288 | continue; |
289 | } |
290 | |
291 | String database; |
292 | if (!settings.external_database.empty()) |
293 | database = file_contents->getString(key + "." + settings.external_database, "" ); |
294 | if (!database.empty()) |
295 | object_name = database + "." + object_name; |
296 | |
297 | object_configs_from_file.emplace_back(object_name, ObjectConfig{file_contents, key, {}, {}, {}}); |
298 | } |
299 | |
300 | file_info.objects = std::move(object_configs_from_file); |
301 | file_info.last_update_time = update_time_from_repository; |
302 | file_info.in_use = true; |
303 | return true; |
304 | } |
305 | catch (...) |
306 | { |
307 | tryLogCurrentException(log, "Failed to load config file '" + path + "'" ); |
308 | return false; |
309 | } |
310 | } |
311 | |
312 | /// Builds a map of current configurations of objects. |
313 | void collectObjectConfigs() |
314 | { |
315 | if (!need_collect_object_configs) |
316 | return; |
317 | need_collect_object_configs = false; |
318 | |
319 | // Generate new result. |
320 | auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, ObjectConfig>>(); |
321 | |
322 | for (const auto & [repository, repository_info] : repositories) |
323 | { |
324 | for (const auto & [path, file_info] : repository_info.files) |
325 | { |
326 | for (const auto & [object_name, object_config] : file_info.objects) |
327 | { |
328 | auto already_added_it = new_configs->find(object_name); |
329 | if (already_added_it == new_configs->end()) |
330 | { |
331 | auto & new_config = new_configs->emplace(object_name, object_config).first->second; |
332 | new_config.from_temp_repository = repository->isTemporary(); |
333 | new_config.repository_name = repository->getName(); |
334 | new_config.path = path; |
335 | } |
336 | else |
337 | { |
338 | const auto & already_added = already_added_it->second; |
339 | if (!already_added.from_temp_repository && !repository->isTemporary()) |
340 | { |
341 | LOG_WARNING( |
342 | log, |
343 | type_name << " '" << object_name << "' is found " |
344 | << (((path == already_added.path) && (repository->getName() == already_added.repository_name)) |
345 | ? ("twice in the same file '" + path + "'" ) |
346 | : ("both in file '" + already_added.path + "' and '" + path + "'" ))); |
347 | } |
348 | } |
349 | } |
350 | } |
351 | } |
352 | |
353 | object_configs = new_configs; |
354 | } |
355 | |
356 | const String type_name; |
357 | Logger * log; |
358 | |
359 | std::mutex mutex; |
360 | ExternalLoaderConfigSettings settings; |
361 | std::unordered_map<Repository *, RepositoryInfo> repositories; |
362 | ObjectConfigsPtr object_configs; |
363 | bool need_collect_object_configs = false; |
364 | }; |
365 | |
366 | |
367 | /** Manages loading and reloading objects. Uses configurations from the class LoadablesConfigReader. |
368 | * Supports parallel loading. |
369 | */ |
370 | class ExternalLoader::LoadingDispatcher : private boost::noncopyable |
371 | { |
372 | public: |
373 | /// Called to load or reload an object. |
374 | using CreateObjectFunction = std::function<LoadablePtr( |
375 | const String & /* name */, const ObjectConfig & /* config */, const LoadablePtr & /* previous_version */)>; |
376 | |
377 | LoadingDispatcher( |
378 | const CreateObjectFunction & create_object_function_, |
379 | const String & type_name_, |
380 | Logger * log_) |
381 | : create_object(create_object_function_) |
382 | , type_name(type_name_) |
383 | , log(log_) |
384 | { |
385 | } |
386 | |
387 | ~LoadingDispatcher() |
388 | { |
389 | std::unique_lock lock{mutex}; |
390 | infos.clear(); /// We clear this map to tell the threads that we don't want any load results anymore. |
391 | |
392 | /// Wait for all the threads to finish. |
393 | while (!loading_threads.empty()) |
394 | { |
395 | auto it = loading_threads.begin(); |
396 | auto thread = std::move(it->second); |
397 | loading_threads.erase(it); |
398 | lock.unlock(); |
399 | event.notify_all(); |
400 | thread.join(); |
401 | lock.lock(); |
402 | } |
403 | } |
404 | |
405 | using ObjectConfigsPtr = LoadablesConfigReader::ObjectConfigsPtr; |
406 | |
407 | /// Sets new configurations for all the objects. |
408 | void setConfiguration(const ObjectConfigsPtr & new_configs) |
409 | { |
410 | std::lock_guard lock{mutex}; |
411 | if (configs == new_configs) |
412 | return; |
413 | |
414 | configs = new_configs; |
415 | |
416 | std::vector<String> removed_names; |
417 | for (auto & [name, info] : infos) |
418 | { |
419 | auto new_config_it = new_configs->find(name); |
420 | if (new_config_it == new_configs->end()) |
421 | removed_names.emplace_back(name); |
422 | else |
423 | { |
424 | const auto & new_config = new_config_it->second; |
425 | bool config_is_same = isSameConfiguration(*info.object_config.config, info.object_config.key_in_config, *new_config.config, new_config.key_in_config); |
426 | info.object_config = new_config; |
427 | if (!config_is_same) |
428 | { |
429 | /// Configuration has been changed. |
430 | info.object_config = new_config; |
431 | |
432 | if (info.triedToLoad()) |
433 | { |
434 | /// The object has been tried to load before, so it is currently in use or was in use |
435 | /// and we should try to reload it with the new config. |
436 | startLoading(info, true); |
437 | } |
438 | } |
439 | } |
440 | } |
441 | |
442 | /// Insert to the map those objects which added to the new configuration. |
443 | for (const auto & [name, config] : *new_configs) |
444 | { |
445 | if (infos.find(name) == infos.end()) |
446 | { |
447 | Info & info = infos.emplace(name, Info{name, config}).first->second; |
448 | if (always_load_everything) |
449 | startLoading(info); |
450 | } |
451 | } |
452 | |
453 | /// Remove from the map those objects which were removed from the configuration. |
454 | for (const String & name : removed_names) |
455 | infos.erase(name); |
456 | |
457 | /// Maybe we have just added new objects which require to be loaded |
458 | /// or maybe we have just removed object which were been loaded, |
459 | /// so we should notify `event` to recheck conditions in load() and loadAll() now. |
460 | event.notify_all(); |
461 | } |
462 | |
463 | /// Sets whether all the objects from the configuration should be always loaded (even if they aren't used). |
464 | void enableAlwaysLoadEverything(bool enable) |
465 | { |
466 | std::lock_guard lock{mutex}; |
467 | if (always_load_everything == enable) |
468 | return; |
469 | |
470 | always_load_everything = enable; |
471 | |
472 | if (enable) |
473 | { |
474 | /// Start loading all the objects which were not loaded yet. |
475 | for (auto & [name, info] : infos) |
476 | if (!info.triedToLoad()) |
477 | startLoading(info); |
478 | } |
479 | } |
480 | |
481 | /// Sets whether the objects should be loaded asynchronously, each loading in a new thread (from the thread pool). |
482 | void enableAsyncLoading(bool enable) |
483 | { |
484 | enable_async_loading = enable; |
485 | } |
486 | |
487 | /// Returns the status of the object. |
488 | /// If the object has not been loaded yet then the function returns Status::NOT_LOADED. |
489 | /// If the specified name isn't found in the configuration then the function returns Status::NOT_EXIST. |
490 | Status getCurrentStatus(const String & name) const |
491 | { |
492 | std::lock_guard lock{mutex}; |
493 | const Info * info = getInfo(name); |
494 | if (!info) |
495 | return Status::NOT_EXIST; |
496 | return info->status(); |
497 | } |
498 | |
499 | /// Returns the load result of the object. |
500 | template <typename ReturnType> |
501 | ReturnType getCurrentLoadResult(const String & name) const |
502 | { |
503 | std::lock_guard lock{mutex}; |
504 | const Info * info = getInfo(name); |
505 | if (!info) |
506 | return notExists<ReturnType>(name); |
507 | return info->getLoadResult<ReturnType>(); |
508 | } |
509 | |
510 | /// Returns all the load results as a map. |
511 | /// The function doesn't load anything, it just returns the current load results as is. |
512 | template <typename ReturnType> |
513 | ReturnType getCurrentLoadResults(const FilterByNameFunction & filter) const |
514 | { |
515 | std::lock_guard lock{mutex}; |
516 | return collectLoadResults<ReturnType>(filter); |
517 | } |
518 | |
519 | size_t getNumberOfCurrentlyLoadedObjects() const |
520 | { |
521 | std::lock_guard lock{mutex}; |
522 | size_t count = 0; |
523 | for (const auto & name_and_info : infos) |
524 | { |
525 | const auto & info = name_and_info.second; |
526 | if (info.loaded()) |
527 | ++count; |
528 | } |
529 | return count; |
530 | } |
531 | |
532 | bool hasCurrentlyLoadedObjects() const |
533 | { |
534 | std::lock_guard lock{mutex}; |
535 | for (auto & name_info : infos) |
536 | if (name_info.second.loaded()) |
537 | return true; |
538 | return false; |
539 | } |
540 | |
541 | Strings getAllTriedToLoadNames() const |
542 | { |
543 | Strings names; |
544 | for (auto & [name, info] : infos) |
545 | if (info.triedToLoad()) |
546 | names.push_back(name); |
547 | return names; |
548 | } |
549 | |
550 | /// Tries to load a specified object during the timeout. |
551 | template <typename ReturnType> |
552 | ReturnType tryLoad(const String & name, Duration timeout) |
553 | { |
554 | std::unique_lock lock{mutex}; |
555 | Info * info = loadImpl(name, timeout, false, lock); |
556 | if (!info) |
557 | return notExists<ReturnType>(name); |
558 | return info->getLoadResult<ReturnType>(); |
559 | } |
560 | |
561 | template <typename ReturnType> |
562 | ReturnType tryLoad(const FilterByNameFunction & filter, Duration timeout) |
563 | { |
564 | std::unique_lock lock{mutex}; |
565 | loadImpl(filter, timeout, false, lock); |
566 | return collectLoadResults<ReturnType>(filter); |
567 | } |
568 | |
569 | /// Tries to load or reload a specified object. |
570 | template <typename ReturnType> |
571 | ReturnType tryLoadOrReload(const String & name, Duration timeout) |
572 | { |
573 | std::unique_lock lock{mutex}; |
574 | Info * info = loadImpl(name, timeout, true, lock); |
575 | if (!info) |
576 | return notExists<ReturnType>(name); |
577 | return info->getLoadResult<ReturnType>(); |
578 | } |
579 | |
580 | template <typename ReturnType> |
581 | ReturnType tryLoadOrReload(const FilterByNameFunction & filter, Duration timeout) |
582 | { |
583 | std::unique_lock lock{mutex}; |
584 | loadImpl(filter, timeout, true, lock); |
585 | return collectLoadResults<ReturnType>(filter); |
586 | } |
587 | |
588 | /// Starts reloading all the object which update time is earlier than now. |
589 | /// The function doesn't touch the objects which were never tried to load. |
590 | void reloadOutdated() |
591 | { |
592 | /// Iterate through all the objects and find loaded ones which should be checked if they need update. |
593 | std::unordered_map<LoadablePtr, bool> should_update_map; |
594 | { |
595 | std::lock_guard lock{mutex}; |
596 | TimePoint now = std::chrono::system_clock::now(); |
597 | for (const auto & name_and_info : infos) |
598 | { |
599 | const auto & info = name_and_info.second; |
600 | if ((now >= info.next_update_time) && !info.is_loading() && info.loaded()) |
601 | should_update_map.emplace(info.object, info.failedToReload()); |
602 | } |
603 | } |
604 | |
605 | /// Find out which of the loaded objects were modified. |
606 | /// We couldn't perform these checks while we were building `should_update_map` because |
607 | /// the `mutex` should be unlocked while we're calling the function object->isModified() |
608 | for (auto & [object, should_update_flag] : should_update_map) |
609 | { |
610 | try |
611 | { |
612 | /// Maybe alredy true, if we have an exception |
613 | if (!should_update_flag) |
614 | should_update_flag = object->isModified(); |
615 | } |
616 | catch (...) |
617 | { |
618 | tryLogCurrentException(log, "Could not check if " + type_name + " '" + object->getLoadableName() + "' was modified" ); |
619 | /// Cannot check isModified, so update |
620 | should_update_flag = true; |
621 | } |
622 | } |
623 | |
624 | /// Iterate through all the objects again and either start loading or just set `next_update_time`. |
625 | { |
626 | std::lock_guard lock{mutex}; |
627 | TimePoint now = std::chrono::system_clock::now(); |
628 | for (auto & [name, info] : infos) |
629 | { |
630 | if ((now >= info.next_update_time) && !info.is_loading()) |
631 | { |
632 | if (info.loaded()) |
633 | { |
634 | auto it = should_update_map.find(info.object); |
635 | if (it == should_update_map.end()) |
636 | continue; /// Object has been just loaded (it wasn't loaded while we were building the map `should_update_map`), so we don't have to reload it right now. |
637 | |
638 | bool should_update_flag = it->second; |
639 | if (!should_update_flag) |
640 | { |
641 | info.next_update_time = calculateNextUpdateTime(info.object, info.error_count); |
642 | continue; |
643 | } |
644 | |
645 | /// Object was modified or it was failed to reload last time, so it should be reloaded. |
646 | startLoading(info); |
647 | } |
648 | else if (info.failed()) |
649 | { |
650 | /// Object was never loaded successfully and should be reloaded. |
651 | startLoading(info); |
652 | } |
653 | } |
654 | } |
655 | } |
656 | } |
657 | |
658 | private: |
659 | struct Info |
660 | { |
661 | Info(const String & name_, const ObjectConfig & object_config_) : name(name_), object_config(object_config_) {} |
662 | |
663 | bool loaded() const { return object != nullptr; } |
664 | bool failed() const { return !object && exception; } |
665 | bool loadedOrFailed() const { return loaded() || failed(); } |
666 | bool triedToLoad() const { return loaded() || failed() || is_loading(); } |
667 | bool failedToReload() const { return loaded() && exception != nullptr; } |
668 | bool is_loading() const { return loading_id > state_id; } |
669 | |
670 | Status status() const |
671 | { |
672 | if (object) |
673 | return is_loading() ? Status::LOADED_AND_RELOADING : Status::LOADED; |
674 | else if (exception) |
675 | return is_loading() ? Status::FAILED_AND_RELOADING : Status::FAILED; |
676 | else |
677 | return is_loading() ? Status::LOADING : Status::NOT_LOADED; |
678 | } |
679 | |
680 | Duration loadingDuration() const |
681 | { |
682 | if (is_loading()) |
683 | return std::chrono::duration_cast<Duration>(std::chrono::system_clock::now() - loading_start_time); |
684 | return std::chrono::duration_cast<Duration>(loading_end_time - loading_start_time); |
685 | } |
686 | |
687 | template <typename ReturnType> |
688 | ReturnType getLoadResult() const |
689 | { |
690 | if constexpr (std::is_same_v<ReturnType, LoadResult>) |
691 | { |
692 | LoadResult result; |
693 | result.name = name; |
694 | result.status = status(); |
695 | result.object = object; |
696 | result.exception = exception; |
697 | result.loading_start_time = loading_start_time; |
698 | result.loading_duration = loadingDuration(); |
699 | result.origin = object_config.path; |
700 | result.repository_name = object_config.repository_name; |
701 | return result; |
702 | } |
703 | else |
704 | { |
705 | static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>); |
706 | return object; |
707 | } |
708 | } |
709 | |
710 | String name; |
711 | LoadablePtr object; |
712 | ObjectConfig object_config; |
713 | TimePoint loading_start_time; |
714 | TimePoint loading_end_time; |
715 | size_t state_id = 0; /// Index of the current state of this `info`, this index is incremented every loading. |
716 | size_t loading_id = 0; /// The value which will be stored in `state_id` after finishing the current loading. |
717 | size_t error_count = 0; /// Numbers of errors since last successful loading. |
718 | std::exception_ptr exception; /// Last error occurred. |
719 | TimePoint next_update_time = TimePoint::max(); /// Time of the next update, `TimePoint::max()` means "never". |
720 | }; |
721 | |
722 | Info * getInfo(const String & name) |
723 | { |
724 | auto it = infos.find(name); |
725 | if (it == infos.end()) |
726 | return nullptr; |
727 | return &it->second; |
728 | } |
729 | |
730 | const Info * getInfo(const String & name) const |
731 | { |
732 | auto it = infos.find(name); |
733 | if (it == infos.end()) |
734 | return nullptr; |
735 | return &it->second; |
736 | } |
737 | |
738 | template <typename ReturnType> |
739 | ReturnType collectLoadResults(const FilterByNameFunction & filter) const |
740 | { |
741 | ReturnType results; |
742 | results.reserve(infos.size()); |
743 | for (const auto & [name, info] : infos) |
744 | { |
745 | if (filter(name)) |
746 | { |
747 | auto result = info.template getLoadResult<typename ReturnType::value_type>(); |
748 | if constexpr (std::is_same_v<typename ReturnType::value_type, LoadablePtr>) |
749 | { |
750 | if (!result) |
751 | continue; |
752 | } |
753 | results.emplace_back(std::move(result)); |
754 | } |
755 | } |
756 | return results; |
757 | } |
758 | |
759 | Info * loadImpl(const String & name, Duration timeout, bool forced_to_reload, std::unique_lock<std::mutex> & lock) |
760 | { |
761 | std::optional<size_t> min_id; |
762 | Info * info = nullptr; |
763 | auto pred = [&] |
764 | { |
765 | info = getInfo(name); |
766 | if (!info) |
767 | return true; /// stop |
768 | |
769 | if (!min_id) |
770 | min_id = getMinIDToFinishLoading(forced_to_reload); |
771 | |
772 | if (info->state_id >= min_id) |
773 | return true; /// stop |
774 | |
775 | if (info->loading_id < min_id) |
776 | startLoading(*info, forced_to_reload, *min_id); |
777 | return false; /// wait for the next event |
778 | }; |
779 | |
780 | if (timeout == WAIT) |
781 | event.wait(lock, pred); |
782 | else |
783 | event.wait_for(lock, timeout, pred); |
784 | |
785 | return info; |
786 | } |
787 | |
788 | void loadImpl(const FilterByNameFunction & filter, Duration timeout, bool forced_to_reload, std::unique_lock<std::mutex> & lock) |
789 | { |
790 | std::optional<size_t> min_id; |
791 | auto pred = [&] |
792 | { |
793 | if (!min_id) |
794 | min_id = getMinIDToFinishLoading(forced_to_reload); |
795 | |
796 | bool all_ready = true; |
797 | for (auto & [name, info] : infos) |
798 | { |
799 | if (!filter(name)) |
800 | continue; |
801 | |
802 | if (info.state_id >= min_id) |
803 | continue; |
804 | |
805 | all_ready = false; |
806 | if (info.loading_id < min_id) |
807 | startLoading(info, forced_to_reload, *min_id); |
808 | } |
809 | return all_ready; |
810 | }; |
811 | |
812 | if (timeout == WAIT) |
813 | event.wait(lock, pred); |
814 | else |
815 | event.wait_for(lock, timeout, pred); |
816 | } |
817 | |
818 | /// When state_id >= getMinIDToFinishLoading() the loading is considered as finished. |
819 | size_t getMinIDToFinishLoading(bool forced_to_reload) const |
820 | { |
821 | if (forced_to_reload) |
822 | { |
823 | /// We need to force reloading, that's why we return next_id_counter here |
824 | /// (because info.state_id < next_id_counter for any info). |
825 | return next_id_counter; |
826 | } |
827 | |
828 | /// The loading of an object can cause the loading of another object. |
829 | /// We use the same "min_id" in this case to allows reloading multiple objects at once |
830 | /// taking into account their dependencies. |
831 | auto it = min_id_to_finish_loading_dependencies.find(std::this_thread::get_id()); |
832 | if (it != min_id_to_finish_loading_dependencies.end()) |
833 | return it->second; |
834 | |
835 | /// We just need the first loading to be finished, that's why we return 1 here |
836 | /// (because info.state_id >= 1 since the first loading is finished, successfully or not). |
837 | return 1; |
838 | } |
839 | |
840 | void startLoading(Info & info, bool forced_to_reload = false, size_t min_id_to_finish_loading_dependencies_ = 1) |
841 | { |
842 | if (info.is_loading()) |
843 | { |
844 | if (!forced_to_reload) |
845 | return; |
846 | cancelLoading(info); |
847 | } |
848 | |
849 | /// All loadings have unique loading IDs. |
850 | size_t loading_id = next_id_counter++; |
851 | info.loading_id = loading_id; |
852 | info.loading_start_time = std::chrono::system_clock::now(); |
853 | info.loading_end_time = TimePoint{}; |
854 | |
855 | if (enable_async_loading) |
856 | { |
857 | /// Put a job to the thread pool for the loading. |
858 | auto thread = ThreadFromGlobalPool{&LoadingDispatcher::doLoading, this, info.name, loading_id, forced_to_reload, min_id_to_finish_loading_dependencies_, true}; |
859 | loading_threads.try_emplace(loading_id, std::move(thread)); |
860 | } |
861 | else |
862 | { |
863 | /// Perform the loading immediately. |
864 | doLoading(info.name, loading_id, forced_to_reload, min_id_to_finish_loading_dependencies_, false); |
865 | } |
866 | } |
867 | |
868 | void cancelLoading(Info & info) |
869 | { |
870 | if (!info.is_loading()) |
871 | return; |
872 | |
873 | /// In fact we cannot actually CANCEL the loading (because it's possibly already being performed in another thread). |
874 | /// But we can reset the `loading_id` and doLoading() will understand it as a signal to stop loading. |
875 | info.loading_id = info.state_id; |
876 | info.loading_end_time = std::chrono::system_clock::now(); |
877 | } |
878 | |
879 | /// Does the loading, possibly in the separate thread. |
880 | void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async) |
881 | { |
882 | try |
883 | { |
884 | /// Prepare for loading. |
885 | std::optional<Info> info; |
886 | { |
887 | LoadingGuardForAsyncLoad lock(async, mutex); |
888 | info = prepareToLoadSingleObject(name, loading_id, min_id_to_finish_loading_dependencies_, lock); |
889 | if (!info) |
890 | return; |
891 | } |
892 | |
893 | /// Previous version can be used as the base for new loading, enabling loading only part of data. |
894 | auto previous_version_as_base_for_loading = info->object; |
895 | if (forced_to_reload) |
896 | previous_version_as_base_for_loading = nullptr; /// Need complete reloading, cannot use the previous version. |
897 | |
898 | /// Loading. |
899 | auto [new_object, new_exception] = loadSingleObject(name, info->object_config, previous_version_as_base_for_loading); |
900 | if (!new_object && !new_exception) |
901 | throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR); |
902 | |
903 | /// Saving the result of the loading. |
904 | { |
905 | LoadingGuardForAsyncLoad lock(async, mutex); |
906 | saveResultOfLoadingSingleObject(name, loading_id, info->object, new_object, new_exception, info->error_count, lock); |
907 | finishLoadingSingleObject(name, loading_id, lock); |
908 | } |
909 | event.notify_all(); |
910 | } |
911 | catch (...) |
912 | { |
913 | LoadingGuardForAsyncLoad lock(async, mutex); |
914 | finishLoadingSingleObject(name, loading_id, lock); |
915 | throw; |
916 | } |
917 | } |
918 | |
919 | /// Returns single object info, checks loading_id and name. |
920 | std::optional<Info> prepareToLoadSingleObject( |
921 | const String & name, size_t loading_id, size_t min_id_to_finish_loading_dependencies_, const LoadingGuardForAsyncLoad &) |
922 | { |
923 | Info * info = getInfo(name); |
924 | /// We check here if this is exactly the same loading as we planned to perform. |
925 | /// This check is necessary because the object could be removed or load with another config before this thread even starts. |
926 | if (!info || !info->is_loading() || (info->loading_id != loading_id)) |
927 | return {}; |
928 | |
929 | min_id_to_finish_loading_dependencies[std::this_thread::get_id()] = min_id_to_finish_loading_dependencies_; |
930 | return *info; |
931 | } |
932 | |
933 | /// Load one object, returns object ptr or exception. |
934 | std::pair<LoadablePtr, std::exception_ptr> |
935 | loadSingleObject(const String & name, const ObjectConfig & config, LoadablePtr previous_version) |
936 | { |
937 | /// Use `create_function` to perform the actual loading. |
938 | /// It's much better to do it with `mutex` unlocked because the loading can take a lot of time |
939 | /// and require access to other objects. |
940 | LoadablePtr new_object; |
941 | std::exception_ptr new_exception; |
942 | try |
943 | { |
944 | new_object = create_object(name, config, previous_version); |
945 | } |
946 | catch (...) |
947 | { |
948 | new_exception = std::current_exception(); |
949 | } |
950 | return std::make_pair(new_object, new_exception); |
951 | } |
952 | |
953 | /// Saves the result of the loading, calculates the time of the next update, and handles errors. |
954 | void saveResultOfLoadingSingleObject( |
955 | const String & name, |
956 | size_t loading_id, |
957 | LoadablePtr previous_version, |
958 | LoadablePtr new_object, |
959 | std::exception_ptr new_exception, |
960 | size_t error_count, |
961 | const LoadingGuardForAsyncLoad &) |
962 | { |
963 | /// Calculate a new update time. |
964 | TimePoint next_update_time; |
965 | try |
966 | { |
967 | if (new_exception) |
968 | ++error_count; |
969 | else |
970 | error_count = 0; |
971 | |
972 | LoadablePtr object = previous_version; |
973 | if (new_object) |
974 | object = new_object; |
975 | |
976 | next_update_time = calculateNextUpdateTime(object, error_count); |
977 | } |
978 | catch (...) |
979 | { |
980 | tryLogCurrentException(log, "Cannot find out when the " + type_name + " '" + name + "' should be updated" ); |
981 | next_update_time = TimePoint::max(); |
982 | } |
983 | |
984 | |
985 | Info * info = getInfo(name); |
986 | |
987 | /// We should check if this is still the same loading as we were doing. |
988 | /// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked. |
989 | if (!info || !info->is_loading() || (info->loading_id != loading_id)) |
990 | return; |
991 | |
992 | if (new_exception) |
993 | { |
994 | auto next_update_time_description = [next_update_time] |
995 | { |
996 | if (next_update_time == TimePoint::max()) |
997 | return String(); |
998 | return ", next update is scheduled at " + ext::to_string(next_update_time); |
999 | }; |
1000 | if (previous_version) |
1001 | tryLogException(new_exception, log, "Could not update " + type_name + " '" + name + "'" |
1002 | ", leaving the previous version" + next_update_time_description()); |
1003 | else |
1004 | tryLogException(new_exception, log, "Could not load " + type_name + " '" + name + "'" + next_update_time_description()); |
1005 | } |
1006 | |
1007 | if (new_object) |
1008 | info->object = new_object; |
1009 | |
1010 | info->exception = new_exception; |
1011 | info->error_count = error_count; |
1012 | info->loading_end_time = std::chrono::system_clock::now(); |
1013 | info->state_id = info->loading_id; |
1014 | info->next_update_time = next_update_time; |
1015 | } |
1016 | |
1017 | /// Removes the references to the loading thread from the maps. |
1018 | void finishLoadingSingleObject(const String & name, size_t loading_id, const LoadingGuardForAsyncLoad &) |
1019 | { |
1020 | Info * info = getInfo(name); |
1021 | if (info && (info->loading_id == loading_id)) |
1022 | info->loading_id = info->state_id; |
1023 | |
1024 | min_id_to_finish_loading_dependencies.erase(std::this_thread::get_id()); |
1025 | |
1026 | auto it = loading_threads.find(loading_id); |
1027 | if (it != loading_threads.end()) |
1028 | { |
1029 | it->second.detach(); |
1030 | loading_threads.erase(it); |
1031 | } |
1032 | } |
1033 | |
1034 | /// Calculate next update time for loaded_object. Can be called without mutex locking, |
1035 | /// because single loadable can be loaded in single thread only. |
1036 | TimePoint calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const |
1037 | { |
1038 | static constexpr auto never = TimePoint::max(); |
1039 | |
1040 | if (loaded_object) |
1041 | { |
1042 | if (!loaded_object->supportUpdates()) |
1043 | return never; |
1044 | |
1045 | /// do not update loadable objects with zero as lifetime |
1046 | const auto & lifetime = loaded_object->getLifetime(); |
1047 | if (lifetime.min_sec == 0 && lifetime.max_sec == 0) |
1048 | return never; |
1049 | |
1050 | if (!error_count) |
1051 | { |
1052 | std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec}; |
1053 | return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; |
1054 | } |
1055 | } |
1056 | |
1057 | return std::chrono::system_clock::now() + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count)); |
1058 | } |
1059 | |
1060 | const CreateObjectFunction create_object; |
1061 | const String type_name; |
1062 | Logger * log; |
1063 | |
1064 | mutable std::mutex mutex; |
1065 | std::condition_variable event; |
1066 | ObjectConfigsPtr configs; |
1067 | std::unordered_map<String, Info> infos; |
1068 | bool always_load_everything = false; |
1069 | std::atomic<bool> enable_async_loading = false; |
1070 | std::unordered_map<size_t, ThreadFromGlobalPool> loading_threads; |
1071 | std::unordered_map<std::thread::id, size_t> min_id_to_finish_loading_dependencies; |
1072 | size_t next_id_counter = 1; /// should always be > 0 |
1073 | mutable pcg64 rnd_engine{randomSeed()}; |
1074 | }; |
1075 | |
1076 | |
1077 | class ExternalLoader::PeriodicUpdater : private boost::noncopyable |
1078 | { |
1079 | public: |
1080 | static constexpr UInt64 check_period_sec = 5; |
1081 | |
1082 | PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_) |
1083 | : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_) |
1084 | { |
1085 | } |
1086 | |
1087 | ~PeriodicUpdater() { enable(false); } |
1088 | |
1089 | void enable(bool enable_) |
1090 | { |
1091 | std::unique_lock lock{mutex}; |
1092 | enabled = enable_; |
1093 | |
1094 | if (enable_) |
1095 | { |
1096 | if (!thread.joinable()) |
1097 | { |
1098 | /// Starts the thread which will do periodic updates. |
1099 | thread = ThreadFromGlobalPool{&PeriodicUpdater::doPeriodicUpdates, this}; |
1100 | } |
1101 | } |
1102 | else |
1103 | { |
1104 | if (thread.joinable()) |
1105 | { |
1106 | /// Wait for the thread to finish. |
1107 | auto temp_thread = std::move(thread); |
1108 | lock.unlock(); |
1109 | event.notify_one(); |
1110 | temp_thread.join(); |
1111 | } |
1112 | } |
1113 | } |
1114 | |
1115 | |
1116 | private: |
1117 | void doPeriodicUpdates() |
1118 | { |
1119 | setThreadName("ExterLdrReload" ); |
1120 | |
1121 | std::unique_lock lock{mutex}; |
1122 | auto pred = [this] { return !enabled; }; |
1123 | while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred)) |
1124 | { |
1125 | lock.unlock(); |
1126 | loading_dispatcher.setConfiguration(config_files_reader.read()); |
1127 | loading_dispatcher.reloadOutdated(); |
1128 | lock.lock(); |
1129 | } |
1130 | } |
1131 | |
1132 | LoadablesConfigReader & config_files_reader; |
1133 | LoadingDispatcher & loading_dispatcher; |
1134 | |
1135 | mutable std::mutex mutex; |
1136 | bool enabled = false; |
1137 | ThreadFromGlobalPool thread; |
1138 | std::condition_variable event; |
1139 | }; |
1140 | |
1141 | |
1142 | ExternalLoader::ExternalLoader(const String & type_name_, Logger * log_) |
1143 | : config_files_reader(std::make_unique<LoadablesConfigReader>(type_name_, log_)) |
1144 | , loading_dispatcher(std::make_unique<LoadingDispatcher>( |
1145 | std::bind(&ExternalLoader::createObject, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3), |
1146 | type_name_, |
1147 | log_)) |
1148 | , periodic_updater(std::make_unique<PeriodicUpdater>(*config_files_reader, *loading_dispatcher)) |
1149 | , type_name(type_name_) |
1150 | , log(log_) |
1151 | { |
1152 | } |
1153 | |
1154 | ExternalLoader::~ExternalLoader() = default; |
1155 | |
1156 | ext::scope_guard ExternalLoader::addConfigRepository(std::unique_ptr<IExternalLoaderConfigRepository> repository) |
1157 | { |
1158 | auto * ptr = repository.get(); |
1159 | String name = ptr->getName(); |
1160 | config_files_reader->addConfigRepository(std::move(repository)); |
1161 | reloadConfig(name); |
1162 | |
1163 | return [this, ptr, name]() |
1164 | { |
1165 | config_files_reader->removeConfigRepository(ptr); |
1166 | reloadConfig(name); |
1167 | }; |
1168 | } |
1169 | |
1170 | void ExternalLoader::setConfigSettings(const ExternalLoaderConfigSettings & settings) |
1171 | { |
1172 | config_files_reader->setConfigSettings(settings); |
1173 | } |
1174 | |
1175 | void ExternalLoader::enableAlwaysLoadEverything(bool enable) |
1176 | { |
1177 | loading_dispatcher->enableAlwaysLoadEverything(enable); |
1178 | } |
1179 | |
1180 | void ExternalLoader::enableAsyncLoading(bool enable) |
1181 | { |
1182 | loading_dispatcher->enableAsyncLoading(enable); |
1183 | } |
1184 | |
1185 | void ExternalLoader::enablePeriodicUpdates(bool enable_) |
1186 | { |
1187 | periodic_updater->enable(enable_); |
1188 | } |
1189 | |
1190 | bool ExternalLoader::hasCurrentlyLoadedObjects() const |
1191 | { |
1192 | return loading_dispatcher->hasCurrentlyLoadedObjects(); |
1193 | } |
1194 | |
1195 | ExternalLoader::Status ExternalLoader::getCurrentStatus(const String & name) const |
1196 | { |
1197 | return loading_dispatcher->getCurrentStatus(name); |
1198 | } |
1199 | |
1200 | template <typename ReturnType, typename> |
1201 | ReturnType ExternalLoader::getCurrentLoadResult(const String & name) const |
1202 | { |
1203 | return loading_dispatcher->getCurrentLoadResult<ReturnType>(name); |
1204 | } |
1205 | |
1206 | template <typename ReturnType, typename> |
1207 | ReturnType ExternalLoader::getCurrentLoadResults(const FilterByNameFunction & filter) const |
1208 | { |
1209 | return loading_dispatcher->getCurrentLoadResults<ReturnType>(filter); |
1210 | } |
1211 | |
1212 | ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects() const |
1213 | { |
1214 | return getCurrentLoadResults<Loadables>(); |
1215 | } |
1216 | |
1217 | ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects(const FilterByNameFunction & filter) const |
1218 | { |
1219 | return getCurrentLoadResults<Loadables>(filter); |
1220 | } |
1221 | |
1222 | size_t ExternalLoader::getNumberOfCurrentlyLoadedObjects() const |
1223 | { |
1224 | return loading_dispatcher->getNumberOfCurrentlyLoadedObjects(); |
1225 | } |
1226 | |
1227 | template <typename ReturnType, typename> |
1228 | ReturnType ExternalLoader::tryLoad(const String & name, Duration timeout) const |
1229 | { |
1230 | return loading_dispatcher->tryLoad<ReturnType>(name, timeout); |
1231 | } |
1232 | |
1233 | template <typename ReturnType, typename> |
1234 | ReturnType ExternalLoader::tryLoad(const FilterByNameFunction & filter, Duration timeout) const |
1235 | { |
1236 | return loading_dispatcher->tryLoad<ReturnType>(filter, timeout); |
1237 | } |
1238 | |
1239 | template <typename ReturnType, typename> |
1240 | ReturnType ExternalLoader::load(const String & name) const |
1241 | { |
1242 | auto result = tryLoad<LoadResult>(name); |
1243 | checkLoaded(result, false); |
1244 | return convertTo<ReturnType>(result); |
1245 | } |
1246 | |
1247 | template <typename ReturnType, typename> |
1248 | ReturnType ExternalLoader::load(const FilterByNameFunction & filter) const |
1249 | { |
1250 | auto results = tryLoad<LoadResults>(filter); |
1251 | checkLoaded(results, false); |
1252 | return convertTo<ReturnType>(results); |
1253 | } |
1254 | |
1255 | template <typename ReturnType, typename> |
1256 | ReturnType ExternalLoader::loadOrReload(const String & name) const |
1257 | { |
1258 | loading_dispatcher->setConfiguration(config_files_reader->read()); |
1259 | auto result = loading_dispatcher->tryLoadOrReload<LoadResult>(name, WAIT); |
1260 | checkLoaded(result, true); |
1261 | return convertTo<ReturnType>(result); |
1262 | } |
1263 | |
1264 | template <typename ReturnType, typename> |
1265 | ReturnType ExternalLoader::loadOrReload(const FilterByNameFunction & filter) const |
1266 | { |
1267 | loading_dispatcher->setConfiguration(config_files_reader->read()); |
1268 | auto results = loading_dispatcher->tryLoadOrReload<LoadResults>(filter, WAIT); |
1269 | checkLoaded(results, true); |
1270 | return convertTo<ReturnType>(results); |
1271 | } |
1272 | |
1273 | template <typename ReturnType, typename> |
1274 | ReturnType ExternalLoader::reloadAllTriedToLoad() const |
1275 | { |
1276 | std::unordered_set<String> names; |
1277 | boost::range::copy(getAllTriedToLoadNames(), std::inserter(names, names.end())); |
1278 | return loadOrReload<ReturnType>([&names](const String & name) { return names.count(name); }); |
1279 | } |
1280 | |
1281 | Strings ExternalLoader::getAllTriedToLoadNames() const |
1282 | { |
1283 | return loading_dispatcher->getAllTriedToLoadNames(); |
1284 | } |
1285 | |
1286 | |
1287 | void ExternalLoader::checkLoaded(const ExternalLoader::LoadResult & result, |
1288 | bool check_no_errors) const |
1289 | { |
1290 | if (result.object && (!check_no_errors || !result.exception)) |
1291 | return; |
1292 | if (result.status == ExternalLoader::Status::LOADING) |
1293 | throw Exception(type_name + " '" + result.name + "' is still loading" , ErrorCodes::BAD_ARGUMENTS); |
1294 | if (result.exception) |
1295 | std::rethrow_exception(result.exception); |
1296 | if (result.status == ExternalLoader::Status::NOT_EXIST) |
1297 | throw Exception(type_name + " '" + result.name + "' not found" , ErrorCodes::BAD_ARGUMENTS); |
1298 | if (result.status == ExternalLoader::Status::NOT_LOADED) |
1299 | throw Exception(type_name + " '" + result.name + "' not tried to load" , ErrorCodes::BAD_ARGUMENTS); |
1300 | } |
1301 | |
1302 | void ExternalLoader::checkLoaded(const ExternalLoader::LoadResults & results, |
1303 | bool check_no_errors) const |
1304 | { |
1305 | std::exception_ptr exception; |
1306 | for (const auto & result : results) |
1307 | { |
1308 | try |
1309 | { |
1310 | checkLoaded(result, check_no_errors); |
1311 | } |
1312 | catch (...) |
1313 | { |
1314 | if (!exception) |
1315 | exception = std::current_exception(); |
1316 | else |
1317 | tryLogCurrentException(log); |
1318 | } |
1319 | } |
1320 | |
1321 | if (exception) |
1322 | std::rethrow_exception(exception); |
1323 | } |
1324 | |
1325 | |
1326 | void ExternalLoader::reloadConfig() const |
1327 | { |
1328 | loading_dispatcher->setConfiguration(config_files_reader->read()); |
1329 | } |
1330 | |
1331 | void ExternalLoader::reloadConfig(const String & repository_name) const |
1332 | { |
1333 | loading_dispatcher->setConfiguration(config_files_reader->read(repository_name)); |
1334 | } |
1335 | |
1336 | void ExternalLoader::reloadConfig(const String & repository_name, const String & path) const |
1337 | { |
1338 | loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path)); |
1339 | } |
1340 | |
1341 | ExternalLoader::LoadablePtr ExternalLoader::createObject( |
1342 | const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const |
1343 | { |
1344 | if (previous_version) |
1345 | return previous_version->clone(); |
1346 | |
1347 | return create(name, *config.config, config.key_in_config, config.repository_name); |
1348 | } |
1349 | |
1350 | std::vector<std::pair<String, Int8>> ExternalLoader::getStatusEnumAllPossibleValues() |
1351 | { |
1352 | return std::vector<std::pair<String, Int8>>{ |
1353 | {toString(Status::NOT_LOADED), static_cast<Int8>(Status::NOT_LOADED)}, |
1354 | {toString(Status::LOADED), static_cast<Int8>(Status::LOADED)}, |
1355 | {toString(Status::FAILED), static_cast<Int8>(Status::FAILED)}, |
1356 | {toString(Status::LOADING), static_cast<Int8>(Status::LOADING)}, |
1357 | {toString(Status::LOADED_AND_RELOADING), static_cast<Int8>(Status::LOADED_AND_RELOADING)}, |
1358 | {toString(Status::FAILED_AND_RELOADING), static_cast<Int8>(Status::FAILED_AND_RELOADING)}, |
1359 | {toString(Status::NOT_EXIST), static_cast<Int8>(Status::NOT_EXIST)}, |
1360 | }; |
1361 | } |
1362 | |
1363 | |
1364 | String toString(ExternalLoader::Status status) |
1365 | { |
1366 | using Status = ExternalLoader::Status; |
1367 | switch (status) |
1368 | { |
1369 | case Status::NOT_LOADED: return "NOT_LOADED" ; |
1370 | case Status::LOADED: return "LOADED" ; |
1371 | case Status::FAILED: return "FAILED" ; |
1372 | case Status::LOADING: return "LOADING" ; |
1373 | case Status::FAILED_AND_RELOADING: return "FAILED_AND_RELOADING" ; |
1374 | case Status::LOADED_AND_RELOADING: return "LOADED_AND_RELOADING" ; |
1375 | case Status::NOT_EXIST: return "NOT_EXIST" ; |
1376 | } |
1377 | __builtin_unreachable(); |
1378 | } |
1379 | |
1380 | |
1381 | std::ostream & operator<<(std::ostream & out, ExternalLoader::Status status) |
1382 | { |
1383 | return out << toString(status); |
1384 | } |
1385 | |
1386 | |
1387 | template ExternalLoader::LoadablePtr ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadablePtr>(const String &) const; |
1388 | template ExternalLoader::LoadResult ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadResult>(const String &) const; |
1389 | template ExternalLoader::Loadables ExternalLoader::getCurrentLoadResults<ExternalLoader::Loadables>(const FilterByNameFunction &) const; |
1390 | template ExternalLoader::LoadResults ExternalLoader::getCurrentLoadResults<ExternalLoader::LoadResults>(const FilterByNameFunction &) const; |
1391 | |
1392 | template ExternalLoader::LoadablePtr ExternalLoader::tryLoad<ExternalLoader::LoadablePtr>(const String &, Duration) const; |
1393 | template ExternalLoader::LoadResult ExternalLoader::tryLoad<ExternalLoader::LoadResult>(const String &, Duration) const; |
1394 | template ExternalLoader::Loadables ExternalLoader::tryLoad<ExternalLoader::Loadables>(const FilterByNameFunction &, Duration) const; |
1395 | template ExternalLoader::LoadResults ExternalLoader::tryLoad<ExternalLoader::LoadResults>(const FilterByNameFunction &, Duration) const; |
1396 | |
1397 | template ExternalLoader::LoadablePtr ExternalLoader::load<ExternalLoader::LoadablePtr>(const String &) const; |
1398 | template ExternalLoader::LoadResult ExternalLoader::load<ExternalLoader::LoadResult>(const String &) const; |
1399 | template ExternalLoader::Loadables ExternalLoader::load<ExternalLoader::Loadables>(const FilterByNameFunction &) const; |
1400 | template ExternalLoader::LoadResults ExternalLoader::load<ExternalLoader::LoadResults>(const FilterByNameFunction &) const; |
1401 | |
1402 | template ExternalLoader::LoadablePtr ExternalLoader::loadOrReload<ExternalLoader::LoadablePtr>(const String &) const; |
1403 | template ExternalLoader::LoadResult ExternalLoader::loadOrReload<ExternalLoader::LoadResult>(const String &) const; |
1404 | template ExternalLoader::Loadables ExternalLoader::loadOrReload<ExternalLoader::Loadables>(const FilterByNameFunction &) const; |
1405 | template ExternalLoader::LoadResults ExternalLoader::loadOrReload<ExternalLoader::LoadResults>(const FilterByNameFunction &) const; |
1406 | |
1407 | template ExternalLoader::Loadables ExternalLoader::reloadAllTriedToLoad<ExternalLoader::Loadables>() const; |
1408 | template ExternalLoader::LoadResults ExternalLoader::reloadAllTriedToLoad<ExternalLoader::LoadResults>() const; |
1409 | } |
1410 | |