1#include "ExternalLoader.h"
2
3#include <mutex>
4#include <pcg_random.hpp>
5#include <Common/Config/AbstractConfigurationComparison.h>
6#include <Common/Exception.h>
7#include <Common/StringUtils/StringUtils.h>
8#include <Common/ThreadPool.h>
9#include <Common/randomSeed.h>
10#include <Common/setThreadName.h>
11#include <ext/chrono_io.h>
12#include <ext/scope_guard.h>
13#include <boost/range/adaptor/map.hpp>
14#include <boost/range/algorithm/copy.hpp>
15#include <unordered_set>
16
17
18namespace DB
19{
20namespace ErrorCodes
21{
22 extern const int LOGICAL_ERROR;
23 extern const int BAD_ARGUMENTS;
24}
25
26
27namespace
28{
29 template <typename ReturnType>
30 ReturnType convertTo(ExternalLoader::LoadResult result)
31 {
32 if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResult>)
33 return result;
34 else
35 {
36 static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>);
37 return std::move(result.object);
38 }
39 }
40
41 template <typename ReturnType>
42 ReturnType convertTo(ExternalLoader::LoadResults results)
43 {
44 if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResults>)
45 return results;
46 else
47 {
48 static_assert(std::is_same_v<ReturnType, ExternalLoader::Loadables>);
49 ExternalLoader::Loadables objects;
50 objects.reserve(results.size());
51 for (const auto & result : results)
52 {
53 if (auto object = std::move(result.object))
54 objects.push_back(std::move(object));
55 }
56 return objects;
57 }
58 }
59
60 template <typename ReturnType>
61 ReturnType notExists(const String & name)
62 {
63 if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResult>)
64 {
65 ExternalLoader::LoadResult res;
66 res.name = name;
67 return res;
68 }
69 else
70 {
71 static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>);
72 return nullptr;
73 }
74 }
75
76
77 /// Lock mutex only in async mode
78 /// In other case does nothing
79 struct LoadingGuardForAsyncLoad
80 {
81 std::unique_lock<std::mutex> lock;
82 LoadingGuardForAsyncLoad(bool async, std::mutex & mutex)
83 {
84 if (async)
85 lock = std::unique_lock(mutex);
86 }
87 };
88}
89
90struct ExternalLoader::ObjectConfig
91{
92 Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
93 String key_in_config;
94 String repository_name;
95 bool from_temp_repository = false;
96 String path;
97};
98
99
100/** Reads configurations from configuration repository and parses it.
101 */
102class ExternalLoader::LoadablesConfigReader : private boost::noncopyable
103{
104public:
105 LoadablesConfigReader(const String & type_name_, Logger * log_)
106 : type_name(type_name_), log(log_)
107 {
108 }
109 ~LoadablesConfigReader() = default;
110
111 using Repository = IExternalLoaderConfigRepository;
112
113 void addConfigRepository(std::unique_ptr<Repository> repository)
114 {
115 std::lock_guard lock{mutex};
116 auto * ptr = repository.get();
117 repositories.emplace(ptr, RepositoryInfo{std::move(repository), {}});
118 need_collect_object_configs = true;
119 }
120
121 void removeConfigRepository(Repository * repository)
122 {
123 std::lock_guard lock{mutex};
124 auto it = repositories.find(repository);
125 if (it == repositories.end())
126 return;
127 repositories.erase(it);
128 need_collect_object_configs = true;
129 }
130
131 void setConfigSettings(const ExternalLoaderConfigSettings & settings_)
132 {
133 std::lock_guard lock{mutex};
134 settings = settings_;
135 }
136
137 using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, ObjectConfig>>;
138
139 /// Reads all repositories.
140 ObjectConfigsPtr read()
141 {
142 std::lock_guard lock(mutex);
143 readRepositories();
144 collectObjectConfigs();
145 return object_configs;
146 }
147
148 /// Reads only a specified repository.
149 /// This functions checks only a specified repository but returns configs from all repositories.
150 ObjectConfigsPtr read(const String & repository_name)
151 {
152 std::lock_guard lock(mutex);
153 readRepositories(repository_name);
154 collectObjectConfigs();
155 return object_configs;
156 }
157
158 /// Reads only a specified path from a specified repository.
159 /// This functions checks only a specified repository but returns configs from all repositories.
160 ObjectConfigsPtr read(const String & repository_name, const String & path)
161 {
162 std::lock_guard lock(mutex);
163 readRepositories(repository_name, path);
164 collectObjectConfigs();
165 return object_configs;
166 }
167
168private:
169 struct FileInfo
170 {
171 Poco::Timestamp last_update_time = 0;
172 std::vector<std::pair<String, ObjectConfig>> objects; // Parsed contents of the file.
173 bool in_use = true; // Whether the `FileInfo` should be destroyed because the correspondent file is deleted.
174 };
175
176 struct RepositoryInfo
177 {
178 std::unique_ptr<Repository> repository;
179 std::unordered_map<String /* path */, FileInfo> files;
180 };
181
182 /// Reads the repositories.
183 /// Checks last modification times of files and read those files which are new or changed.
184 void readRepositories(const std::optional<String> & only_repository_name = {}, const std::optional<String> & only_path = {})
185 {
186 for (auto & [repository, repository_info] : repositories)
187 {
188 if (only_repository_name && (repository->getName() != *only_repository_name))
189 continue;
190
191 for (auto & file_info : repository_info.files | boost::adaptors::map_values)
192 file_info.in_use = false;
193
194 Strings existing_paths;
195 if (only_path)
196 {
197 if (repository->exists(*only_path))
198 existing_paths.push_back(*only_path);
199 }
200 else
201 boost::copy(repository->getAllLoadablesDefinitionNames(), std::back_inserter(existing_paths));
202
203 for (const auto & path : existing_paths)
204 {
205 auto it = repository_info.files.find(path);
206 if (it != repository_info.files.end())
207 {
208 FileInfo & file_info = it->second;
209 if (readFileInfo(file_info, *repository, path))
210 need_collect_object_configs = true;
211 }
212 else
213 {
214 FileInfo file_info;
215 if (readFileInfo(file_info, *repository, path))
216 {
217 repository_info.files.emplace(path, std::move(file_info));
218 need_collect_object_configs = true;
219 }
220 }
221 }
222
223 Strings deleted_paths;
224 for (auto & [path, file_info] : repository_info.files)
225 {
226 if (file_info.in_use)
227 continue;
228
229 if (only_path && (*only_path != path))
230 continue;
231
232 deleted_paths.emplace_back(path);
233 }
234
235 if (!deleted_paths.empty())
236 {
237 for (const String & deleted_path : deleted_paths)
238 repository_info.files.erase(deleted_path);
239 need_collect_object_configs = true;
240 }
241 }
242 }
243
244 /// Reads a file, returns true if the file is new or changed.
245 bool readFileInfo(
246 FileInfo & file_info,
247 IExternalLoaderConfigRepository & repository,
248 const String & path) const
249 {
250 try
251 {
252 if (path.empty() || !repository.exists(path))
253 {
254 LOG_WARNING(log, "Config file '" + path + "' does not exist");
255 return false;
256 }
257
258 auto update_time_from_repository = repository.getUpdateTime(path);
259
260 /// Actually it can't be less, but for sure we check less or equal
261 if (update_time_from_repository <= file_info.last_update_time)
262 {
263 file_info.in_use = true;
264 return false;
265 }
266
267 auto file_contents = repository.load(path);
268
269 /// get all objects' definitions
270 Poco::Util::AbstractConfiguration::Keys keys;
271 file_contents->keys(keys);
272
273 /// for each object defined in repositories
274 std::vector<std::pair<String, ObjectConfig>> object_configs_from_file;
275 for (const auto & key : keys)
276 {
277 if (!startsWith(key, settings.external_config))
278 {
279 if (!startsWith(key, "comment") && !startsWith(key, "include_from"))
280 LOG_WARNING(log, path << ": file contains unknown node '" << key << "', expected '" << settings.external_config << "'");
281 continue;
282 }
283
284 String object_name = file_contents->getString(key + "." + settings.external_name);
285 if (object_name.empty())
286 {
287 LOG_WARNING(log, path << ": node '" << key << "' defines " << type_name << " with an empty name. It's not allowed");
288 continue;
289 }
290
291 String database;
292 if (!settings.external_database.empty())
293 database = file_contents->getString(key + "." + settings.external_database, "");
294 if (!database.empty())
295 object_name = database + "." + object_name;
296
297 object_configs_from_file.emplace_back(object_name, ObjectConfig{file_contents, key, {}, {}, {}});
298 }
299
300 file_info.objects = std::move(object_configs_from_file);
301 file_info.last_update_time = update_time_from_repository;
302 file_info.in_use = true;
303 return true;
304 }
305 catch (...)
306 {
307 tryLogCurrentException(log, "Failed to load config file '" + path + "'");
308 return false;
309 }
310 }
311
312 /// Builds a map of current configurations of objects.
313 void collectObjectConfigs()
314 {
315 if (!need_collect_object_configs)
316 return;
317 need_collect_object_configs = false;
318
319 // Generate new result.
320 auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, ObjectConfig>>();
321
322 for (const auto & [repository, repository_info] : repositories)
323 {
324 for (const auto & [path, file_info] : repository_info.files)
325 {
326 for (const auto & [object_name, object_config] : file_info.objects)
327 {
328 auto already_added_it = new_configs->find(object_name);
329 if (already_added_it == new_configs->end())
330 {
331 auto & new_config = new_configs->emplace(object_name, object_config).first->second;
332 new_config.from_temp_repository = repository->isTemporary();
333 new_config.repository_name = repository->getName();
334 new_config.path = path;
335 }
336 else
337 {
338 const auto & already_added = already_added_it->second;
339 if (!already_added.from_temp_repository && !repository->isTemporary())
340 {
341 LOG_WARNING(
342 log,
343 type_name << " '" << object_name << "' is found "
344 << (((path == already_added.path) && (repository->getName() == already_added.repository_name))
345 ? ("twice in the same file '" + path + "'")
346 : ("both in file '" + already_added.path + "' and '" + path + "'")));
347 }
348 }
349 }
350 }
351 }
352
353 object_configs = new_configs;
354 }
355
356 const String type_name;
357 Logger * log;
358
359 std::mutex mutex;
360 ExternalLoaderConfigSettings settings;
361 std::unordered_map<Repository *, RepositoryInfo> repositories;
362 ObjectConfigsPtr object_configs;
363 bool need_collect_object_configs = false;
364};
365
366
367/** Manages loading and reloading objects. Uses configurations from the class LoadablesConfigReader.
368 * Supports parallel loading.
369 */
370class ExternalLoader::LoadingDispatcher : private boost::noncopyable
371{
372public:
373 /// Called to load or reload an object.
374 using CreateObjectFunction = std::function<LoadablePtr(
375 const String & /* name */, const ObjectConfig & /* config */, const LoadablePtr & /* previous_version */)>;
376
377 LoadingDispatcher(
378 const CreateObjectFunction & create_object_function_,
379 const String & type_name_,
380 Logger * log_)
381 : create_object(create_object_function_)
382 , type_name(type_name_)
383 , log(log_)
384 {
385 }
386
387 ~LoadingDispatcher()
388 {
389 std::unique_lock lock{mutex};
390 infos.clear(); /// We clear this map to tell the threads that we don't want any load results anymore.
391
392 /// Wait for all the threads to finish.
393 while (!loading_threads.empty())
394 {
395 auto it = loading_threads.begin();
396 auto thread = std::move(it->second);
397 loading_threads.erase(it);
398 lock.unlock();
399 event.notify_all();
400 thread.join();
401 lock.lock();
402 }
403 }
404
405 using ObjectConfigsPtr = LoadablesConfigReader::ObjectConfigsPtr;
406
407 /// Sets new configurations for all the objects.
408 void setConfiguration(const ObjectConfigsPtr & new_configs)
409 {
410 std::lock_guard lock{mutex};
411 if (configs == new_configs)
412 return;
413
414 configs = new_configs;
415
416 std::vector<String> removed_names;
417 for (auto & [name, info] : infos)
418 {
419 auto new_config_it = new_configs->find(name);
420 if (new_config_it == new_configs->end())
421 removed_names.emplace_back(name);
422 else
423 {
424 const auto & new_config = new_config_it->second;
425 bool config_is_same = isSameConfiguration(*info.object_config.config, info.object_config.key_in_config, *new_config.config, new_config.key_in_config);
426 info.object_config = new_config;
427 if (!config_is_same)
428 {
429 /// Configuration has been changed.
430 info.object_config = new_config;
431
432 if (info.triedToLoad())
433 {
434 /// The object has been tried to load before, so it is currently in use or was in use
435 /// and we should try to reload it with the new config.
436 startLoading(info, true);
437 }
438 }
439 }
440 }
441
442 /// Insert to the map those objects which added to the new configuration.
443 for (const auto & [name, config] : *new_configs)
444 {
445 if (infos.find(name) == infos.end())
446 {
447 Info & info = infos.emplace(name, Info{name, config}).first->second;
448 if (always_load_everything)
449 startLoading(info);
450 }
451 }
452
453 /// Remove from the map those objects which were removed from the configuration.
454 for (const String & name : removed_names)
455 infos.erase(name);
456
457 /// Maybe we have just added new objects which require to be loaded
458 /// or maybe we have just removed object which were been loaded,
459 /// so we should notify `event` to recheck conditions in load() and loadAll() now.
460 event.notify_all();
461 }
462
463 /// Sets whether all the objects from the configuration should be always loaded (even if they aren't used).
464 void enableAlwaysLoadEverything(bool enable)
465 {
466 std::lock_guard lock{mutex};
467 if (always_load_everything == enable)
468 return;
469
470 always_load_everything = enable;
471
472 if (enable)
473 {
474 /// Start loading all the objects which were not loaded yet.
475 for (auto & [name, info] : infos)
476 if (!info.triedToLoad())
477 startLoading(info);
478 }
479 }
480
481 /// Sets whether the objects should be loaded asynchronously, each loading in a new thread (from the thread pool).
482 void enableAsyncLoading(bool enable)
483 {
484 enable_async_loading = enable;
485 }
486
487 /// Returns the status of the object.
488 /// If the object has not been loaded yet then the function returns Status::NOT_LOADED.
489 /// If the specified name isn't found in the configuration then the function returns Status::NOT_EXIST.
490 Status getCurrentStatus(const String & name) const
491 {
492 std::lock_guard lock{mutex};
493 const Info * info = getInfo(name);
494 if (!info)
495 return Status::NOT_EXIST;
496 return info->status();
497 }
498
499 /// Returns the load result of the object.
500 template <typename ReturnType>
501 ReturnType getCurrentLoadResult(const String & name) const
502 {
503 std::lock_guard lock{mutex};
504 const Info * info = getInfo(name);
505 if (!info)
506 return notExists<ReturnType>(name);
507 return info->getLoadResult<ReturnType>();
508 }
509
510 /// Returns all the load results as a map.
511 /// The function doesn't load anything, it just returns the current load results as is.
512 template <typename ReturnType>
513 ReturnType getCurrentLoadResults(const FilterByNameFunction & filter) const
514 {
515 std::lock_guard lock{mutex};
516 return collectLoadResults<ReturnType>(filter);
517 }
518
519 size_t getNumberOfCurrentlyLoadedObjects() const
520 {
521 std::lock_guard lock{mutex};
522 size_t count = 0;
523 for (const auto & name_and_info : infos)
524 {
525 const auto & info = name_and_info.second;
526 if (info.loaded())
527 ++count;
528 }
529 return count;
530 }
531
532 bool hasCurrentlyLoadedObjects() const
533 {
534 std::lock_guard lock{mutex};
535 for (auto & name_info : infos)
536 if (name_info.second.loaded())
537 return true;
538 return false;
539 }
540
541 Strings getAllTriedToLoadNames() const
542 {
543 Strings names;
544 for (auto & [name, info] : infos)
545 if (info.triedToLoad())
546 names.push_back(name);
547 return names;
548 }
549
550 /// Tries to load a specified object during the timeout.
551 template <typename ReturnType>
552 ReturnType tryLoad(const String & name, Duration timeout)
553 {
554 std::unique_lock lock{mutex};
555 Info * info = loadImpl(name, timeout, false, lock);
556 if (!info)
557 return notExists<ReturnType>(name);
558 return info->getLoadResult<ReturnType>();
559 }
560
561 template <typename ReturnType>
562 ReturnType tryLoad(const FilterByNameFunction & filter, Duration timeout)
563 {
564 std::unique_lock lock{mutex};
565 loadImpl(filter, timeout, false, lock);
566 return collectLoadResults<ReturnType>(filter);
567 }
568
569 /// Tries to load or reload a specified object.
570 template <typename ReturnType>
571 ReturnType tryLoadOrReload(const String & name, Duration timeout)
572 {
573 std::unique_lock lock{mutex};
574 Info * info = loadImpl(name, timeout, true, lock);
575 if (!info)
576 return notExists<ReturnType>(name);
577 return info->getLoadResult<ReturnType>();
578 }
579
580 template <typename ReturnType>
581 ReturnType tryLoadOrReload(const FilterByNameFunction & filter, Duration timeout)
582 {
583 std::unique_lock lock{mutex};
584 loadImpl(filter, timeout, true, lock);
585 return collectLoadResults<ReturnType>(filter);
586 }
587
588 /// Starts reloading all the object which update time is earlier than now.
589 /// The function doesn't touch the objects which were never tried to load.
590 void reloadOutdated()
591 {
592 /// Iterate through all the objects and find loaded ones which should be checked if they need update.
593 std::unordered_map<LoadablePtr, bool> should_update_map;
594 {
595 std::lock_guard lock{mutex};
596 TimePoint now = std::chrono::system_clock::now();
597 for (const auto & name_and_info : infos)
598 {
599 const auto & info = name_and_info.second;
600 if ((now >= info.next_update_time) && !info.is_loading() && info.loaded())
601 should_update_map.emplace(info.object, info.failedToReload());
602 }
603 }
604
605 /// Find out which of the loaded objects were modified.
606 /// We couldn't perform these checks while we were building `should_update_map` because
607 /// the `mutex` should be unlocked while we're calling the function object->isModified()
608 for (auto & [object, should_update_flag] : should_update_map)
609 {
610 try
611 {
612 /// Maybe alredy true, if we have an exception
613 if (!should_update_flag)
614 should_update_flag = object->isModified();
615 }
616 catch (...)
617 {
618 tryLogCurrentException(log, "Could not check if " + type_name + " '" + object->getLoadableName() + "' was modified");
619 /// Cannot check isModified, so update
620 should_update_flag = true;
621 }
622 }
623
624 /// Iterate through all the objects again and either start loading or just set `next_update_time`.
625 {
626 std::lock_guard lock{mutex};
627 TimePoint now = std::chrono::system_clock::now();
628 for (auto & [name, info] : infos)
629 {
630 if ((now >= info.next_update_time) && !info.is_loading())
631 {
632 if (info.loaded())
633 {
634 auto it = should_update_map.find(info.object);
635 if (it == should_update_map.end())
636 continue; /// Object has been just loaded (it wasn't loaded while we were building the map `should_update_map`), so we don't have to reload it right now.
637
638 bool should_update_flag = it->second;
639 if (!should_update_flag)
640 {
641 info.next_update_time = calculateNextUpdateTime(info.object, info.error_count);
642 continue;
643 }
644
645 /// Object was modified or it was failed to reload last time, so it should be reloaded.
646 startLoading(info);
647 }
648 else if (info.failed())
649 {
650 /// Object was never loaded successfully and should be reloaded.
651 startLoading(info);
652 }
653 }
654 }
655 }
656 }
657
658private:
659 struct Info
660 {
661 Info(const String & name_, const ObjectConfig & object_config_) : name(name_), object_config(object_config_) {}
662
663 bool loaded() const { return object != nullptr; }
664 bool failed() const { return !object && exception; }
665 bool loadedOrFailed() const { return loaded() || failed(); }
666 bool triedToLoad() const { return loaded() || failed() || is_loading(); }
667 bool failedToReload() const { return loaded() && exception != nullptr; }
668 bool is_loading() const { return loading_id > state_id; }
669
670 Status status() const
671 {
672 if (object)
673 return is_loading() ? Status::LOADED_AND_RELOADING : Status::LOADED;
674 else if (exception)
675 return is_loading() ? Status::FAILED_AND_RELOADING : Status::FAILED;
676 else
677 return is_loading() ? Status::LOADING : Status::NOT_LOADED;
678 }
679
680 Duration loadingDuration() const
681 {
682 if (is_loading())
683 return std::chrono::duration_cast<Duration>(std::chrono::system_clock::now() - loading_start_time);
684 return std::chrono::duration_cast<Duration>(loading_end_time - loading_start_time);
685 }
686
687 template <typename ReturnType>
688 ReturnType getLoadResult() const
689 {
690 if constexpr (std::is_same_v<ReturnType, LoadResult>)
691 {
692 LoadResult result;
693 result.name = name;
694 result.status = status();
695 result.object = object;
696 result.exception = exception;
697 result.loading_start_time = loading_start_time;
698 result.loading_duration = loadingDuration();
699 result.origin = object_config.path;
700 result.repository_name = object_config.repository_name;
701 return result;
702 }
703 else
704 {
705 static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>);
706 return object;
707 }
708 }
709
710 String name;
711 LoadablePtr object;
712 ObjectConfig object_config;
713 TimePoint loading_start_time;
714 TimePoint loading_end_time;
715 size_t state_id = 0; /// Index of the current state of this `info`, this index is incremented every loading.
716 size_t loading_id = 0; /// The value which will be stored in `state_id` after finishing the current loading.
717 size_t error_count = 0; /// Numbers of errors since last successful loading.
718 std::exception_ptr exception; /// Last error occurred.
719 TimePoint next_update_time = TimePoint::max(); /// Time of the next update, `TimePoint::max()` means "never".
720 };
721
722 Info * getInfo(const String & name)
723 {
724 auto it = infos.find(name);
725 if (it == infos.end())
726 return nullptr;
727 return &it->second;
728 }
729
730 const Info * getInfo(const String & name) const
731 {
732 auto it = infos.find(name);
733 if (it == infos.end())
734 return nullptr;
735 return &it->second;
736 }
737
738 template <typename ReturnType>
739 ReturnType collectLoadResults(const FilterByNameFunction & filter) const
740 {
741 ReturnType results;
742 results.reserve(infos.size());
743 for (const auto & [name, info] : infos)
744 {
745 if (filter(name))
746 {
747 auto result = info.template getLoadResult<typename ReturnType::value_type>();
748 if constexpr (std::is_same_v<typename ReturnType::value_type, LoadablePtr>)
749 {
750 if (!result)
751 continue;
752 }
753 results.emplace_back(std::move(result));
754 }
755 }
756 return results;
757 }
758
759 Info * loadImpl(const String & name, Duration timeout, bool forced_to_reload, std::unique_lock<std::mutex> & lock)
760 {
761 std::optional<size_t> min_id;
762 Info * info = nullptr;
763 auto pred = [&]
764 {
765 info = getInfo(name);
766 if (!info)
767 return true; /// stop
768
769 if (!min_id)
770 min_id = getMinIDToFinishLoading(forced_to_reload);
771
772 if (info->state_id >= min_id)
773 return true; /// stop
774
775 if (info->loading_id < min_id)
776 startLoading(*info, forced_to_reload, *min_id);
777 return false; /// wait for the next event
778 };
779
780 if (timeout == WAIT)
781 event.wait(lock, pred);
782 else
783 event.wait_for(lock, timeout, pred);
784
785 return info;
786 }
787
788 void loadImpl(const FilterByNameFunction & filter, Duration timeout, bool forced_to_reload, std::unique_lock<std::mutex> & lock)
789 {
790 std::optional<size_t> min_id;
791 auto pred = [&]
792 {
793 if (!min_id)
794 min_id = getMinIDToFinishLoading(forced_to_reload);
795
796 bool all_ready = true;
797 for (auto & [name, info] : infos)
798 {
799 if (!filter(name))
800 continue;
801
802 if (info.state_id >= min_id)
803 continue;
804
805 all_ready = false;
806 if (info.loading_id < min_id)
807 startLoading(info, forced_to_reload, *min_id);
808 }
809 return all_ready;
810 };
811
812 if (timeout == WAIT)
813 event.wait(lock, pred);
814 else
815 event.wait_for(lock, timeout, pred);
816 }
817
818 /// When state_id >= getMinIDToFinishLoading() the loading is considered as finished.
819 size_t getMinIDToFinishLoading(bool forced_to_reload) const
820 {
821 if (forced_to_reload)
822 {
823 /// We need to force reloading, that's why we return next_id_counter here
824 /// (because info.state_id < next_id_counter for any info).
825 return next_id_counter;
826 }
827
828 /// The loading of an object can cause the loading of another object.
829 /// We use the same "min_id" in this case to allows reloading multiple objects at once
830 /// taking into account their dependencies.
831 auto it = min_id_to_finish_loading_dependencies.find(std::this_thread::get_id());
832 if (it != min_id_to_finish_loading_dependencies.end())
833 return it->second;
834
835 /// We just need the first loading to be finished, that's why we return 1 here
836 /// (because info.state_id >= 1 since the first loading is finished, successfully or not).
837 return 1;
838 }
839
840 void startLoading(Info & info, bool forced_to_reload = false, size_t min_id_to_finish_loading_dependencies_ = 1)
841 {
842 if (info.is_loading())
843 {
844 if (!forced_to_reload)
845 return;
846 cancelLoading(info);
847 }
848
849 /// All loadings have unique loading IDs.
850 size_t loading_id = next_id_counter++;
851 info.loading_id = loading_id;
852 info.loading_start_time = std::chrono::system_clock::now();
853 info.loading_end_time = TimePoint{};
854
855 if (enable_async_loading)
856 {
857 /// Put a job to the thread pool for the loading.
858 auto thread = ThreadFromGlobalPool{&LoadingDispatcher::doLoading, this, info.name, loading_id, forced_to_reload, min_id_to_finish_loading_dependencies_, true};
859 loading_threads.try_emplace(loading_id, std::move(thread));
860 }
861 else
862 {
863 /// Perform the loading immediately.
864 doLoading(info.name, loading_id, forced_to_reload, min_id_to_finish_loading_dependencies_, false);
865 }
866 }
867
868 void cancelLoading(Info & info)
869 {
870 if (!info.is_loading())
871 return;
872
873 /// In fact we cannot actually CANCEL the loading (because it's possibly already being performed in another thread).
874 /// But we can reset the `loading_id` and doLoading() will understand it as a signal to stop loading.
875 info.loading_id = info.state_id;
876 info.loading_end_time = std::chrono::system_clock::now();
877 }
878
879 /// Does the loading, possibly in the separate thread.
880 void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async)
881 {
882 try
883 {
884 /// Prepare for loading.
885 std::optional<Info> info;
886 {
887 LoadingGuardForAsyncLoad lock(async, mutex);
888 info = prepareToLoadSingleObject(name, loading_id, min_id_to_finish_loading_dependencies_, lock);
889 if (!info)
890 return;
891 }
892
893 /// Previous version can be used as the base for new loading, enabling loading only part of data.
894 auto previous_version_as_base_for_loading = info->object;
895 if (forced_to_reload)
896 previous_version_as_base_for_loading = nullptr; /// Need complete reloading, cannot use the previous version.
897
898 /// Loading.
899 auto [new_object, new_exception] = loadSingleObject(name, info->object_config, previous_version_as_base_for_loading);
900 if (!new_object && !new_exception)
901 throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
902
903 /// Saving the result of the loading.
904 {
905 LoadingGuardForAsyncLoad lock(async, mutex);
906 saveResultOfLoadingSingleObject(name, loading_id, info->object, new_object, new_exception, info->error_count, lock);
907 finishLoadingSingleObject(name, loading_id, lock);
908 }
909 event.notify_all();
910 }
911 catch (...)
912 {
913 LoadingGuardForAsyncLoad lock(async, mutex);
914 finishLoadingSingleObject(name, loading_id, lock);
915 throw;
916 }
917 }
918
919 /// Returns single object info, checks loading_id and name.
920 std::optional<Info> prepareToLoadSingleObject(
921 const String & name, size_t loading_id, size_t min_id_to_finish_loading_dependencies_, const LoadingGuardForAsyncLoad &)
922 {
923 Info * info = getInfo(name);
924 /// We check here if this is exactly the same loading as we planned to perform.
925 /// This check is necessary because the object could be removed or load with another config before this thread even starts.
926 if (!info || !info->is_loading() || (info->loading_id != loading_id))
927 return {};
928
929 min_id_to_finish_loading_dependencies[std::this_thread::get_id()] = min_id_to_finish_loading_dependencies_;
930 return *info;
931 }
932
933 /// Load one object, returns object ptr or exception.
934 std::pair<LoadablePtr, std::exception_ptr>
935 loadSingleObject(const String & name, const ObjectConfig & config, LoadablePtr previous_version)
936 {
937 /// Use `create_function` to perform the actual loading.
938 /// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
939 /// and require access to other objects.
940 LoadablePtr new_object;
941 std::exception_ptr new_exception;
942 try
943 {
944 new_object = create_object(name, config, previous_version);
945 }
946 catch (...)
947 {
948 new_exception = std::current_exception();
949 }
950 return std::make_pair(new_object, new_exception);
951 }
952
953 /// Saves the result of the loading, calculates the time of the next update, and handles errors.
954 void saveResultOfLoadingSingleObject(
955 const String & name,
956 size_t loading_id,
957 LoadablePtr previous_version,
958 LoadablePtr new_object,
959 std::exception_ptr new_exception,
960 size_t error_count,
961 const LoadingGuardForAsyncLoad &)
962 {
963 /// Calculate a new update time.
964 TimePoint next_update_time;
965 try
966 {
967 if (new_exception)
968 ++error_count;
969 else
970 error_count = 0;
971
972 LoadablePtr object = previous_version;
973 if (new_object)
974 object = new_object;
975
976 next_update_time = calculateNextUpdateTime(object, error_count);
977 }
978 catch (...)
979 {
980 tryLogCurrentException(log, "Cannot find out when the " + type_name + " '" + name + "' should be updated");
981 next_update_time = TimePoint::max();
982 }
983
984
985 Info * info = getInfo(name);
986
987 /// We should check if this is still the same loading as we were doing.
988 /// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked.
989 if (!info || !info->is_loading() || (info->loading_id != loading_id))
990 return;
991
992 if (new_exception)
993 {
994 auto next_update_time_description = [next_update_time]
995 {
996 if (next_update_time == TimePoint::max())
997 return String();
998 return ", next update is scheduled at " + ext::to_string(next_update_time);
999 };
1000 if (previous_version)
1001 tryLogException(new_exception, log, "Could not update " + type_name + " '" + name + "'"
1002 ", leaving the previous version" + next_update_time_description());
1003 else
1004 tryLogException(new_exception, log, "Could not load " + type_name + " '" + name + "'" + next_update_time_description());
1005 }
1006
1007 if (new_object)
1008 info->object = new_object;
1009
1010 info->exception = new_exception;
1011 info->error_count = error_count;
1012 info->loading_end_time = std::chrono::system_clock::now();
1013 info->state_id = info->loading_id;
1014 info->next_update_time = next_update_time;
1015 }
1016
1017 /// Removes the references to the loading thread from the maps.
1018 void finishLoadingSingleObject(const String & name, size_t loading_id, const LoadingGuardForAsyncLoad &)
1019 {
1020 Info * info = getInfo(name);
1021 if (info && (info->loading_id == loading_id))
1022 info->loading_id = info->state_id;
1023
1024 min_id_to_finish_loading_dependencies.erase(std::this_thread::get_id());
1025
1026 auto it = loading_threads.find(loading_id);
1027 if (it != loading_threads.end())
1028 {
1029 it->second.detach();
1030 loading_threads.erase(it);
1031 }
1032 }
1033
1034 /// Calculate next update time for loaded_object. Can be called without mutex locking,
1035 /// because single loadable can be loaded in single thread only.
1036 TimePoint calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const
1037 {
1038 static constexpr auto never = TimePoint::max();
1039
1040 if (loaded_object)
1041 {
1042 if (!loaded_object->supportUpdates())
1043 return never;
1044
1045 /// do not update loadable objects with zero as lifetime
1046 const auto & lifetime = loaded_object->getLifetime();
1047 if (lifetime.min_sec == 0 && lifetime.max_sec == 0)
1048 return never;
1049
1050 if (!error_count)
1051 {
1052 std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
1053 return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
1054 }
1055 }
1056
1057 return std::chrono::system_clock::now() + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
1058 }
1059
1060 const CreateObjectFunction create_object;
1061 const String type_name;
1062 Logger * log;
1063
1064 mutable std::mutex mutex;
1065 std::condition_variable event;
1066 ObjectConfigsPtr configs;
1067 std::unordered_map<String, Info> infos;
1068 bool always_load_everything = false;
1069 std::atomic<bool> enable_async_loading = false;
1070 std::unordered_map<size_t, ThreadFromGlobalPool> loading_threads;
1071 std::unordered_map<std::thread::id, size_t> min_id_to_finish_loading_dependencies;
1072 size_t next_id_counter = 1; /// should always be > 0
1073 mutable pcg64 rnd_engine{randomSeed()};
1074};
1075
1076
1077class ExternalLoader::PeriodicUpdater : private boost::noncopyable
1078{
1079public:
1080 static constexpr UInt64 check_period_sec = 5;
1081
1082 PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_)
1083 : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_)
1084 {
1085 }
1086
1087 ~PeriodicUpdater() { enable(false); }
1088
1089 void enable(bool enable_)
1090 {
1091 std::unique_lock lock{mutex};
1092 enabled = enable_;
1093
1094 if (enable_)
1095 {
1096 if (!thread.joinable())
1097 {
1098 /// Starts the thread which will do periodic updates.
1099 thread = ThreadFromGlobalPool{&PeriodicUpdater::doPeriodicUpdates, this};
1100 }
1101 }
1102 else
1103 {
1104 if (thread.joinable())
1105 {
1106 /// Wait for the thread to finish.
1107 auto temp_thread = std::move(thread);
1108 lock.unlock();
1109 event.notify_one();
1110 temp_thread.join();
1111 }
1112 }
1113 }
1114
1115
1116private:
1117 void doPeriodicUpdates()
1118 {
1119 setThreadName("ExterLdrReload");
1120
1121 std::unique_lock lock{mutex};
1122 auto pred = [this] { return !enabled; };
1123 while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred))
1124 {
1125 lock.unlock();
1126 loading_dispatcher.setConfiguration(config_files_reader.read());
1127 loading_dispatcher.reloadOutdated();
1128 lock.lock();
1129 }
1130 }
1131
1132 LoadablesConfigReader & config_files_reader;
1133 LoadingDispatcher & loading_dispatcher;
1134
1135 mutable std::mutex mutex;
1136 bool enabled = false;
1137 ThreadFromGlobalPool thread;
1138 std::condition_variable event;
1139};
1140
1141
1142ExternalLoader::ExternalLoader(const String & type_name_, Logger * log_)
1143 : config_files_reader(std::make_unique<LoadablesConfigReader>(type_name_, log_))
1144 , loading_dispatcher(std::make_unique<LoadingDispatcher>(
1145 std::bind(&ExternalLoader::createObject, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
1146 type_name_,
1147 log_))
1148 , periodic_updater(std::make_unique<PeriodicUpdater>(*config_files_reader, *loading_dispatcher))
1149 , type_name(type_name_)
1150 , log(log_)
1151{
1152}
1153
1154ExternalLoader::~ExternalLoader() = default;
1155
1156ext::scope_guard ExternalLoader::addConfigRepository(std::unique_ptr<IExternalLoaderConfigRepository> repository)
1157{
1158 auto * ptr = repository.get();
1159 String name = ptr->getName();
1160 config_files_reader->addConfigRepository(std::move(repository));
1161 reloadConfig(name);
1162
1163 return [this, ptr, name]()
1164 {
1165 config_files_reader->removeConfigRepository(ptr);
1166 reloadConfig(name);
1167 };
1168}
1169
1170void ExternalLoader::setConfigSettings(const ExternalLoaderConfigSettings & settings)
1171{
1172 config_files_reader->setConfigSettings(settings);
1173}
1174
1175void ExternalLoader::enableAlwaysLoadEverything(bool enable)
1176{
1177 loading_dispatcher->enableAlwaysLoadEverything(enable);
1178}
1179
1180void ExternalLoader::enableAsyncLoading(bool enable)
1181{
1182 loading_dispatcher->enableAsyncLoading(enable);
1183}
1184
1185void ExternalLoader::enablePeriodicUpdates(bool enable_)
1186{
1187 periodic_updater->enable(enable_);
1188}
1189
1190bool ExternalLoader::hasCurrentlyLoadedObjects() const
1191{
1192 return loading_dispatcher->hasCurrentlyLoadedObjects();
1193}
1194
1195ExternalLoader::Status ExternalLoader::getCurrentStatus(const String & name) const
1196{
1197 return loading_dispatcher->getCurrentStatus(name);
1198}
1199
1200template <typename ReturnType, typename>
1201ReturnType ExternalLoader::getCurrentLoadResult(const String & name) const
1202{
1203 return loading_dispatcher->getCurrentLoadResult<ReturnType>(name);
1204}
1205
1206template <typename ReturnType, typename>
1207ReturnType ExternalLoader::getCurrentLoadResults(const FilterByNameFunction & filter) const
1208{
1209 return loading_dispatcher->getCurrentLoadResults<ReturnType>(filter);
1210}
1211
1212ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects() const
1213{
1214 return getCurrentLoadResults<Loadables>();
1215}
1216
1217ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects(const FilterByNameFunction & filter) const
1218{
1219 return getCurrentLoadResults<Loadables>(filter);
1220}
1221
1222size_t ExternalLoader::getNumberOfCurrentlyLoadedObjects() const
1223{
1224 return loading_dispatcher->getNumberOfCurrentlyLoadedObjects();
1225}
1226
1227template <typename ReturnType, typename>
1228ReturnType ExternalLoader::tryLoad(const String & name, Duration timeout) const
1229{
1230 return loading_dispatcher->tryLoad<ReturnType>(name, timeout);
1231}
1232
1233template <typename ReturnType, typename>
1234ReturnType ExternalLoader::tryLoad(const FilterByNameFunction & filter, Duration timeout) const
1235{
1236 return loading_dispatcher->tryLoad<ReturnType>(filter, timeout);
1237}
1238
1239template <typename ReturnType, typename>
1240ReturnType ExternalLoader::load(const String & name) const
1241{
1242 auto result = tryLoad<LoadResult>(name);
1243 checkLoaded(result, false);
1244 return convertTo<ReturnType>(result);
1245}
1246
1247template <typename ReturnType, typename>
1248ReturnType ExternalLoader::load(const FilterByNameFunction & filter) const
1249{
1250 auto results = tryLoad<LoadResults>(filter);
1251 checkLoaded(results, false);
1252 return convertTo<ReturnType>(results);
1253}
1254
1255template <typename ReturnType, typename>
1256ReturnType ExternalLoader::loadOrReload(const String & name) const
1257{
1258 loading_dispatcher->setConfiguration(config_files_reader->read());
1259 auto result = loading_dispatcher->tryLoadOrReload<LoadResult>(name, WAIT);
1260 checkLoaded(result, true);
1261 return convertTo<ReturnType>(result);
1262}
1263
1264template <typename ReturnType, typename>
1265ReturnType ExternalLoader::loadOrReload(const FilterByNameFunction & filter) const
1266{
1267 loading_dispatcher->setConfiguration(config_files_reader->read());
1268 auto results = loading_dispatcher->tryLoadOrReload<LoadResults>(filter, WAIT);
1269 checkLoaded(results, true);
1270 return convertTo<ReturnType>(results);
1271}
1272
1273template <typename ReturnType, typename>
1274ReturnType ExternalLoader::reloadAllTriedToLoad() const
1275{
1276 std::unordered_set<String> names;
1277 boost::range::copy(getAllTriedToLoadNames(), std::inserter(names, names.end()));
1278 return loadOrReload<ReturnType>([&names](const String & name) { return names.count(name); });
1279}
1280
1281Strings ExternalLoader::getAllTriedToLoadNames() const
1282{
1283 return loading_dispatcher->getAllTriedToLoadNames();
1284}
1285
1286
1287void ExternalLoader::checkLoaded(const ExternalLoader::LoadResult & result,
1288 bool check_no_errors) const
1289{
1290 if (result.object && (!check_no_errors || !result.exception))
1291 return;
1292 if (result.status == ExternalLoader::Status::LOADING)
1293 throw Exception(type_name + " '" + result.name + "' is still loading", ErrorCodes::BAD_ARGUMENTS);
1294 if (result.exception)
1295 std::rethrow_exception(result.exception);
1296 if (result.status == ExternalLoader::Status::NOT_EXIST)
1297 throw Exception(type_name + " '" + result.name + "' not found", ErrorCodes::BAD_ARGUMENTS);
1298 if (result.status == ExternalLoader::Status::NOT_LOADED)
1299 throw Exception(type_name + " '" + result.name + "' not tried to load", ErrorCodes::BAD_ARGUMENTS);
1300}
1301
1302void ExternalLoader::checkLoaded(const ExternalLoader::LoadResults & results,
1303 bool check_no_errors) const
1304{
1305 std::exception_ptr exception;
1306 for (const auto & result : results)
1307 {
1308 try
1309 {
1310 checkLoaded(result, check_no_errors);
1311 }
1312 catch (...)
1313 {
1314 if (!exception)
1315 exception = std::current_exception();
1316 else
1317 tryLogCurrentException(log);
1318 }
1319 }
1320
1321 if (exception)
1322 std::rethrow_exception(exception);
1323}
1324
1325
1326void ExternalLoader::reloadConfig() const
1327{
1328 loading_dispatcher->setConfiguration(config_files_reader->read());
1329}
1330
1331void ExternalLoader::reloadConfig(const String & repository_name) const
1332{
1333 loading_dispatcher->setConfiguration(config_files_reader->read(repository_name));
1334}
1335
1336void ExternalLoader::reloadConfig(const String & repository_name, const String & path) const
1337{
1338 loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path));
1339}
1340
1341ExternalLoader::LoadablePtr ExternalLoader::createObject(
1342 const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const
1343{
1344 if (previous_version)
1345 return previous_version->clone();
1346
1347 return create(name, *config.config, config.key_in_config, config.repository_name);
1348}
1349
1350std::vector<std::pair<String, Int8>> ExternalLoader::getStatusEnumAllPossibleValues()
1351{
1352 return std::vector<std::pair<String, Int8>>{
1353 {toString(Status::NOT_LOADED), static_cast<Int8>(Status::NOT_LOADED)},
1354 {toString(Status::LOADED), static_cast<Int8>(Status::LOADED)},
1355 {toString(Status::FAILED), static_cast<Int8>(Status::FAILED)},
1356 {toString(Status::LOADING), static_cast<Int8>(Status::LOADING)},
1357 {toString(Status::LOADED_AND_RELOADING), static_cast<Int8>(Status::LOADED_AND_RELOADING)},
1358 {toString(Status::FAILED_AND_RELOADING), static_cast<Int8>(Status::FAILED_AND_RELOADING)},
1359 {toString(Status::NOT_EXIST), static_cast<Int8>(Status::NOT_EXIST)},
1360 };
1361}
1362
1363
1364String toString(ExternalLoader::Status status)
1365{
1366 using Status = ExternalLoader::Status;
1367 switch (status)
1368 {
1369 case Status::NOT_LOADED: return "NOT_LOADED";
1370 case Status::LOADED: return "LOADED";
1371 case Status::FAILED: return "FAILED";
1372 case Status::LOADING: return "LOADING";
1373 case Status::FAILED_AND_RELOADING: return "FAILED_AND_RELOADING";
1374 case Status::LOADED_AND_RELOADING: return "LOADED_AND_RELOADING";
1375 case Status::NOT_EXIST: return "NOT_EXIST";
1376 }
1377 __builtin_unreachable();
1378}
1379
1380
1381std::ostream & operator<<(std::ostream & out, ExternalLoader::Status status)
1382{
1383 return out << toString(status);
1384}
1385
1386
1387template ExternalLoader::LoadablePtr ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadablePtr>(const String &) const;
1388template ExternalLoader::LoadResult ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadResult>(const String &) const;
1389template ExternalLoader::Loadables ExternalLoader::getCurrentLoadResults<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
1390template ExternalLoader::LoadResults ExternalLoader::getCurrentLoadResults<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
1391
1392template ExternalLoader::LoadablePtr ExternalLoader::tryLoad<ExternalLoader::LoadablePtr>(const String &, Duration) const;
1393template ExternalLoader::LoadResult ExternalLoader::tryLoad<ExternalLoader::LoadResult>(const String &, Duration) const;
1394template ExternalLoader::Loadables ExternalLoader::tryLoad<ExternalLoader::Loadables>(const FilterByNameFunction &, Duration) const;
1395template ExternalLoader::LoadResults ExternalLoader::tryLoad<ExternalLoader::LoadResults>(const FilterByNameFunction &, Duration) const;
1396
1397template ExternalLoader::LoadablePtr ExternalLoader::load<ExternalLoader::LoadablePtr>(const String &) const;
1398template ExternalLoader::LoadResult ExternalLoader::load<ExternalLoader::LoadResult>(const String &) const;
1399template ExternalLoader::Loadables ExternalLoader::load<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
1400template ExternalLoader::LoadResults ExternalLoader::load<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
1401
1402template ExternalLoader::LoadablePtr ExternalLoader::loadOrReload<ExternalLoader::LoadablePtr>(const String &) const;
1403template ExternalLoader::LoadResult ExternalLoader::loadOrReload<ExternalLoader::LoadResult>(const String &) const;
1404template ExternalLoader::Loadables ExternalLoader::loadOrReload<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
1405template ExternalLoader::LoadResults ExternalLoader::loadOrReload<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
1406
1407template ExternalLoader::Loadables ExternalLoader::reloadAllTriedToLoad<ExternalLoader::Loadables>() const;
1408template ExternalLoader::LoadResults ExternalLoader::reloadAllTriedToLoad<ExternalLoader::LoadResults>() const;
1409}
1410