1#include "ConfigReloader.h"
2
3#include <Poco/Util/Application.h>
4#include <Poco/File.h>
5#include <common/logger_useful.h>
6#include <Common/setThreadName.h>
7#include "ConfigProcessor.h"
8
9
10namespace DB
11{
12
13constexpr decltype(ConfigReloader::reload_interval) ConfigReloader::reload_interval;
14
15ConfigReloader::ConfigReloader(
16 const std::string & path_,
17 const std::string & include_from_path_,
18 const std::string & preprocessed_dir_,
19 zkutil::ZooKeeperNodeCache && zk_node_cache_,
20 const zkutil::EventPtr & zk_changed_event_,
21 Updater && updater_,
22 bool already_loaded)
23 : path(path_), include_from_path(include_from_path_)
24 , preprocessed_dir(preprocessed_dir_)
25 , zk_node_cache(std::move(zk_node_cache_))
26 , zk_changed_event(zk_changed_event_)
27 , updater(std::move(updater_))
28{
29 if (!already_loaded)
30 reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true);
31}
32
33
34void ConfigReloader::start()
35{
36 thread = ThreadFromGlobalPool(&ConfigReloader::run, this);
37}
38
39
40ConfigReloader::~ConfigReloader()
41{
42 try
43 {
44 quit = true;
45 zk_changed_event->set();
46
47 if (thread.joinable())
48 thread.join();
49 }
50 catch (...)
51 {
52 DB::tryLogCurrentException(__PRETTY_FUNCTION__);
53 }
54}
55
56
57void ConfigReloader::run()
58{
59 setThreadName("ConfigReloader");
60
61 while (true)
62 {
63 try
64 {
65 bool zk_changed = zk_changed_event->tryWait(std::chrono::milliseconds(reload_interval).count());
66 if (quit)
67 return;
68
69 reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false);
70 }
71 catch (...)
72 {
73 tryLogCurrentException(log, __PRETTY_FUNCTION__);
74 std::this_thread::sleep_for(reload_interval);
75 }
76 }
77}
78
79void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed)
80{
81 std::lock_guard lock(reload_mutex);
82
83 FilesChangesTracker new_files = getNewFileList();
84 if (force || need_reload_from_zk || new_files.isDifferOrNewerThan(files))
85 {
86 ConfigProcessor config_processor(path);
87 ConfigProcessor::LoadedConfig loaded_config;
88 try
89 {
90 LOG_DEBUG(log, "Loading config '" << path << "'");
91
92 loaded_config = config_processor.loadConfig(/* allow_zk_includes = */ true);
93 if (loaded_config.has_zk_includes)
94 loaded_config = config_processor.loadConfigWithZooKeeperIncludes(
95 zk_node_cache, zk_changed_event, fallback_to_preprocessed);
96 }
97 catch (const Coordination::Exception & e)
98 {
99 if (Coordination::isHardwareError(e.code))
100 need_reload_from_zk = true;
101
102 if (throw_on_error)
103 throw;
104
105 tryLogCurrentException(log, "ZooKeeper error when loading config from '" + path + "'");
106 return;
107 }
108 catch (...)
109 {
110 if (throw_on_error)
111 throw;
112
113 tryLogCurrentException(log, "Error loading config from '" + path + "'");
114 return;
115 }
116 config_processor.savePreprocessedConfig(loaded_config, preprocessed_dir);
117
118 /** We should remember last modification time if and only if config was sucessfully loaded
119 * Otherwise a race condition could occur during config files update:
120 * File is contain raw (and non-valid) data, therefore config is not applied.
121 * When file has been written (and contain valid data), we don't load new data since modification time remains the same.
122 */
123 if (!loaded_config.loaded_from_preprocessed)
124 {
125 files = std::move(new_files);
126 need_reload_from_zk = false;
127 }
128
129 try
130 {
131 updater(loaded_config.configuration);
132 }
133 catch (...)
134 {
135 if (throw_on_error)
136 throw;
137 tryLogCurrentException(log, "Error updating configuration from '" + path + "' config.");
138 }
139 }
140}
141
142struct ConfigReloader::FileWithTimestamp
143{
144 std::string path;
145 time_t modification_time;
146
147 FileWithTimestamp(const std::string & path_, time_t modification_time_)
148 : path(path_), modification_time(modification_time_) {}
149
150 bool operator < (const FileWithTimestamp & rhs) const
151 {
152 return path < rhs.path;
153 }
154
155 static bool isTheSame(const FileWithTimestamp & lhs, const FileWithTimestamp & rhs)
156 {
157 return (lhs.modification_time == rhs.modification_time) && (lhs.path == rhs.path);
158 }
159};
160
161
162void ConfigReloader::FilesChangesTracker::addIfExists(const std::string & path_to_add)
163{
164 if (!path_to_add.empty() && Poco::File(path_to_add).exists())
165 files.emplace(path_to_add, Poco::File(path_to_add).getLastModified().epochTime());
166}
167
168bool ConfigReloader::FilesChangesTracker::isDifferOrNewerThan(const FilesChangesTracker & rhs)
169{
170 return (files.size() != rhs.files.size()) ||
171 !std::equal(files.begin(), files.end(), rhs.files.begin(), FileWithTimestamp::isTheSame);
172}
173
174ConfigReloader::FilesChangesTracker ConfigReloader::getNewFileList() const
175{
176 FilesChangesTracker file_list;
177
178 file_list.addIfExists(path);
179 file_list.addIfExists(include_from_path);
180
181 for (const auto & merge_path : ConfigProcessor::getConfigMergeFiles(path))
182 file_list.addIfExists(merge_path);
183
184 return file_list;
185}
186
187}
188