1 | #include "ConfigReloader.h" |
2 | |
3 | #include <Poco/Util/Application.h> |
4 | #include <Poco/File.h> |
5 | #include <common/logger_useful.h> |
6 | #include <Common/setThreadName.h> |
7 | #include "ConfigProcessor.h" |
8 | |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | constexpr decltype(ConfigReloader::reload_interval) ConfigReloader::reload_interval; |
14 | |
15 | ConfigReloader::ConfigReloader( |
16 | const std::string & path_, |
17 | const std::string & include_from_path_, |
18 | const std::string & preprocessed_dir_, |
19 | zkutil::ZooKeeperNodeCache && zk_node_cache_, |
20 | const zkutil::EventPtr & zk_changed_event_, |
21 | Updater && updater_, |
22 | bool already_loaded) |
23 | : path(path_), include_from_path(include_from_path_) |
24 | , preprocessed_dir(preprocessed_dir_) |
25 | , zk_node_cache(std::move(zk_node_cache_)) |
26 | , zk_changed_event(zk_changed_event_) |
27 | , updater(std::move(updater_)) |
28 | { |
29 | if (!already_loaded) |
30 | reloadIfNewer(/* force = */ true, /* throw_on_error = */ true, /* fallback_to_preprocessed = */ true); |
31 | } |
32 | |
33 | |
34 | void ConfigReloader::start() |
35 | { |
36 | thread = ThreadFromGlobalPool(&ConfigReloader::run, this); |
37 | } |
38 | |
39 | |
40 | ConfigReloader::~ConfigReloader() |
41 | { |
42 | try |
43 | { |
44 | quit = true; |
45 | zk_changed_event->set(); |
46 | |
47 | if (thread.joinable()) |
48 | thread.join(); |
49 | } |
50 | catch (...) |
51 | { |
52 | DB::tryLogCurrentException(__PRETTY_FUNCTION__); |
53 | } |
54 | } |
55 | |
56 | |
57 | void ConfigReloader::run() |
58 | { |
59 | setThreadName("ConfigReloader" ); |
60 | |
61 | while (true) |
62 | { |
63 | try |
64 | { |
65 | bool zk_changed = zk_changed_event->tryWait(std::chrono::milliseconds(reload_interval).count()); |
66 | if (quit) |
67 | return; |
68 | |
69 | reloadIfNewer(zk_changed, /* throw_on_error = */ false, /* fallback_to_preprocessed = */ false); |
70 | } |
71 | catch (...) |
72 | { |
73 | tryLogCurrentException(log, __PRETTY_FUNCTION__); |
74 | std::this_thread::sleep_for(reload_interval); |
75 | } |
76 | } |
77 | } |
78 | |
79 | void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error, bool fallback_to_preprocessed) |
80 | { |
81 | std::lock_guard lock(reload_mutex); |
82 | |
83 | FilesChangesTracker new_files = getNewFileList(); |
84 | if (force || need_reload_from_zk || new_files.isDifferOrNewerThan(files)) |
85 | { |
86 | ConfigProcessor config_processor(path); |
87 | ConfigProcessor::LoadedConfig loaded_config; |
88 | try |
89 | { |
90 | LOG_DEBUG(log, "Loading config '" << path << "'" ); |
91 | |
92 | loaded_config = config_processor.loadConfig(/* allow_zk_includes = */ true); |
93 | if (loaded_config.has_zk_includes) |
94 | loaded_config = config_processor.loadConfigWithZooKeeperIncludes( |
95 | zk_node_cache, zk_changed_event, fallback_to_preprocessed); |
96 | } |
97 | catch (const Coordination::Exception & e) |
98 | { |
99 | if (Coordination::isHardwareError(e.code)) |
100 | need_reload_from_zk = true; |
101 | |
102 | if (throw_on_error) |
103 | throw; |
104 | |
105 | tryLogCurrentException(log, "ZooKeeper error when loading config from '" + path + "'" ); |
106 | return; |
107 | } |
108 | catch (...) |
109 | { |
110 | if (throw_on_error) |
111 | throw; |
112 | |
113 | tryLogCurrentException(log, "Error loading config from '" + path + "'" ); |
114 | return; |
115 | } |
116 | config_processor.savePreprocessedConfig(loaded_config, preprocessed_dir); |
117 | |
118 | /** We should remember last modification time if and only if config was sucessfully loaded |
119 | * Otherwise a race condition could occur during config files update: |
120 | * File is contain raw (and non-valid) data, therefore config is not applied. |
121 | * When file has been written (and contain valid data), we don't load new data since modification time remains the same. |
122 | */ |
123 | if (!loaded_config.loaded_from_preprocessed) |
124 | { |
125 | files = std::move(new_files); |
126 | need_reload_from_zk = false; |
127 | } |
128 | |
129 | try |
130 | { |
131 | updater(loaded_config.configuration); |
132 | } |
133 | catch (...) |
134 | { |
135 | if (throw_on_error) |
136 | throw; |
137 | tryLogCurrentException(log, "Error updating configuration from '" + path + "' config." ); |
138 | } |
139 | } |
140 | } |
141 | |
142 | struct ConfigReloader::FileWithTimestamp |
143 | { |
144 | std::string path; |
145 | time_t modification_time; |
146 | |
147 | FileWithTimestamp(const std::string & path_, time_t modification_time_) |
148 | : path(path_), modification_time(modification_time_) {} |
149 | |
150 | bool operator < (const FileWithTimestamp & rhs) const |
151 | { |
152 | return path < rhs.path; |
153 | } |
154 | |
155 | static bool isTheSame(const FileWithTimestamp & lhs, const FileWithTimestamp & rhs) |
156 | { |
157 | return (lhs.modification_time == rhs.modification_time) && (lhs.path == rhs.path); |
158 | } |
159 | }; |
160 | |
161 | |
162 | void ConfigReloader::FilesChangesTracker::addIfExists(const std::string & path_to_add) |
163 | { |
164 | if (!path_to_add.empty() && Poco::File(path_to_add).exists()) |
165 | files.emplace(path_to_add, Poco::File(path_to_add).getLastModified().epochTime()); |
166 | } |
167 | |
168 | bool ConfigReloader::FilesChangesTracker::isDifferOrNewerThan(const FilesChangesTracker & rhs) |
169 | { |
170 | return (files.size() != rhs.files.size()) || |
171 | !std::equal(files.begin(), files.end(), rhs.files.begin(), FileWithTimestamp::isTheSame); |
172 | } |
173 | |
174 | ConfigReloader::FilesChangesTracker ConfigReloader::getNewFileList() const |
175 | { |
176 | FilesChangesTracker file_list; |
177 | |
178 | file_list.addIfExists(path); |
179 | file_list.addIfExists(include_from_path); |
180 | |
181 | for (const auto & merge_path : ConfigProcessor::getConfigMergeFiles(path)) |
182 | file_list.addIfExists(merge_path); |
183 | |
184 | return file_list; |
185 | } |
186 | |
187 | } |
188 | |