1 | #pragma once |
2 | |
3 | #include <thread> |
4 | #include <atomic> |
5 | #include <condition_variable> |
6 | #include <boost/noncopyable.hpp> |
7 | #include <common/logger_useful.h> |
8 | #include <Core/Types.h> |
9 | #include <Common/ConcurrentBoundedQueue.h> |
10 | #include <Storages/IStorage.h> |
11 | #include <Interpreters/Context.h> |
12 | #include <Common/Stopwatch.h> |
13 | #include <Parsers/ASTCreateQuery.h> |
14 | #include <Parsers/parseQuery.h> |
15 | #include <Parsers/ParserCreateQuery.h> |
16 | #include <Parsers/ASTRenameQuery.h> |
17 | #include <Parsers/formatAST.h> |
18 | #include <Parsers/ASTIndexDeclaration.h> |
19 | #include <Parsers/ASTInsertQuery.h> |
20 | #include <Interpreters/InterpreterCreateQuery.h> |
21 | #include <Interpreters/InterpreterRenameQuery.h> |
22 | #include <Interpreters/InterpreterInsertQuery.h> |
23 | #include <Common/setThreadName.h> |
24 | #include <Common/ThreadPool.h> |
25 | #include <IO/WriteHelpers.h> |
26 | #include <Poco/Util/AbstractConfiguration.h> |
27 | |
28 | |
29 | namespace DB |
30 | { |
31 | |
32 | |
33 | /** Allow to store structured log in system table. |
34 | * |
35 | * Logging is asynchronous. Data is put into queue from where it will be read by separate thread. |
36 | * That thread inserts log into a table with no more than specified periodicity. |
37 | */ |
38 | |
39 | /** Structure of log, template parameter. |
40 | * Structure could change on server version update. |
41 | * If on first write, existing table has different structure, |
42 | * then it get renamed (put aside) and new table is created. |
43 | */ |
44 | /* Example: |
45 | struct LogElement |
46 | { |
47 | /// default constructor must be available |
48 | /// fields |
49 | |
50 | static std::string name(); |
51 | static Block createBlock(); |
52 | void appendToBlock(Block & block) const; |
53 | }; |
54 | */ |
55 | |
56 | |
57 | #define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576 |
58 | |
59 | class Context; |
60 | class QueryLog; |
61 | class QueryThreadLog; |
62 | class PartLog; |
63 | class TextLog; |
64 | class TraceLog; |
65 | class MetricLog; |
66 | |
67 | /// System logs should be destroyed in destructor of the last Context and before tables, |
68 | /// because SystemLog destruction makes insert query while flushing data into underlying tables |
69 | struct SystemLogs |
70 | { |
71 | SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config); |
72 | ~SystemLogs(); |
73 | |
74 | void shutdown(); |
75 | |
76 | std::shared_ptr<QueryLog> query_log; /// Used to log queries. |
77 | std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads. |
78 | std::shared_ptr<PartLog> part_log; /// Used to log operations with parts |
79 | std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler |
80 | std::shared_ptr<TextLog> text_log; /// Used to log all text messages. |
81 | std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics. |
82 | |
83 | String part_log_database; |
84 | }; |
85 | |
86 | |
87 | template <typename LogElement> |
88 | class SystemLog : private boost::noncopyable |
89 | { |
90 | public: |
91 | using Self = SystemLog; |
92 | |
93 | /** Parameter: table name where to write log. |
94 | * If table is not exists, then it get created with specified engine. |
95 | * If it already exists, then its structure is checked to be compatible with structure of log record. |
96 | * If it is compatible, then existing table will be used. |
97 | * If not - then existing table will be renamed to same name but with suffix '_N' at end, |
98 | * where N - is a minimal number from 1, for that table with corresponding name doesn't exist yet; |
99 | * and new table get created - as if previous table was not exist. |
100 | */ |
101 | SystemLog( |
102 | Context & context_, |
103 | const String & database_name_, |
104 | const String & table_name_, |
105 | const String & storage_def_, |
106 | size_t flush_interval_milliseconds_); |
107 | |
108 | ~SystemLog(); |
109 | |
110 | /** Append a record into log. |
111 | * Writing to table will be done asynchronously and in case of failure, record could be lost. |
112 | */ |
113 | void add(const LogElement & element); |
114 | |
115 | /// Flush data in the buffer to disk |
116 | void flush(); |
117 | |
118 | /// Stop the background flush thread before destructor. No more data will be written. |
119 | void shutdown(); |
120 | |
121 | protected: |
122 | Context & context; |
123 | const String database_name; |
124 | const String table_name; |
125 | const String storage_def; |
126 | StoragePtr table; |
127 | const size_t flush_interval_milliseconds; |
128 | std::atomic<bool> is_shutdown{false}; |
129 | |
130 | enum class EntryType |
131 | { |
132 | LOG_ELEMENT = 0, |
133 | AUTO_FLUSH, |
134 | FORCE_FLUSH, |
135 | SHUTDOWN, |
136 | }; |
137 | |
138 | using QueueItem = std::pair<EntryType, LogElement>; |
139 | |
140 | /// Queue is bounded. But its size is quite large to not block in all normal cases. |
141 | ConcurrentBoundedQueue<QueueItem> queue {DBMS_SYSTEM_LOG_QUEUE_SIZE}; |
142 | |
143 | /** Data that was pulled from queue. Data is accumulated here before enough time passed. |
144 | * It's possible to implement double-buffering, but we assume that insertion into table is faster |
145 | * than accumulation of large amount of log records (for example, for query log - processing of large amount of queries). |
146 | */ |
147 | std::vector<LogElement> data; |
148 | |
149 | Logger * log; |
150 | |
151 | /** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table. |
152 | */ |
153 | ThreadFromGlobalPool saving_thread; |
154 | |
155 | void threadFunction(); |
156 | |
157 | /** Creates new table if it does not exist. |
158 | * Renames old table if its structure is not suitable. |
159 | * This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created. |
160 | */ |
161 | bool is_prepared = false; |
162 | void prepareTable(); |
163 | |
164 | std::mutex flush_mutex; |
165 | std::mutex condvar_mutex; |
166 | std::condition_variable flush_condvar; |
167 | bool force_flushing = false; |
168 | |
169 | /// flushImpl can be executed only in saving_thread. |
170 | void flushImpl(EntryType reason); |
171 | }; |
172 | |
173 | |
174 | template <typename LogElement> |
175 | SystemLog<LogElement>::SystemLog(Context & context_, |
176 | const String & database_name_, |
177 | const String & table_name_, |
178 | const String & storage_def_, |
179 | size_t flush_interval_milliseconds_) |
180 | : context(context_), |
181 | database_name(database_name_), table_name(table_name_), storage_def(storage_def_), |
182 | flush_interval_milliseconds(flush_interval_milliseconds_) |
183 | { |
184 | log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")" ); |
185 | |
186 | data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE); |
187 | saving_thread = ThreadFromGlobalPool([this] { threadFunction(); }); |
188 | } |
189 | |
190 | |
191 | template <typename LogElement> |
192 | void SystemLog<LogElement>::add(const LogElement & element) |
193 | { |
194 | if (is_shutdown) |
195 | return; |
196 | |
197 | /// Without try we could block here in case of queue overflow. |
198 | if (!queue.tryPush({EntryType::LOG_ELEMENT, element})) |
199 | LOG_ERROR(log, "SystemLog queue is full" ); |
200 | } |
201 | |
202 | |
203 | template <typename LogElement> |
204 | void SystemLog<LogElement>::flush() |
205 | { |
206 | if (is_shutdown) |
207 | return; |
208 | |
209 | std::lock_guard flush_lock(flush_mutex); |
210 | force_flushing = true; |
211 | |
212 | /// Tell thread to execute extra flush. |
213 | queue.push({EntryType::FORCE_FLUSH, {}}); |
214 | |
215 | /// Wait for flush being finished. |
216 | std::unique_lock lock(condvar_mutex); |
217 | while (force_flushing) |
218 | flush_condvar.wait(lock); |
219 | } |
220 | |
221 | |
222 | template <typename LogElement> |
223 | void SystemLog<LogElement>::shutdown() |
224 | { |
225 | bool old_val = false; |
226 | if (!is_shutdown.compare_exchange_strong(old_val, true)) |
227 | return; |
228 | |
229 | /// Tell thread to shutdown. |
230 | queue.push({EntryType::SHUTDOWN, {}}); |
231 | saving_thread.join(); |
232 | } |
233 | |
234 | |
235 | template <typename LogElement> |
236 | SystemLog<LogElement>::~SystemLog() |
237 | { |
238 | shutdown(); |
239 | } |
240 | |
241 | |
242 | template <typename LogElement> |
243 | void SystemLog<LogElement>::threadFunction() |
244 | { |
245 | setThreadName("SystemLogFlush" ); |
246 | |
247 | Stopwatch time_after_last_write; |
248 | bool first = true; |
249 | |
250 | while (true) |
251 | { |
252 | try |
253 | { |
254 | if (first) |
255 | { |
256 | time_after_last_write.restart(); |
257 | first = false; |
258 | } |
259 | |
260 | QueueItem element; |
261 | bool has_element = false; |
262 | |
263 | /// data.size() is increased only in this function |
264 | /// TODO: get rid of data and queue duality |
265 | |
266 | if (data.empty()) |
267 | { |
268 | queue.pop(element); |
269 | has_element = true; |
270 | } |
271 | else |
272 | { |
273 | size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; |
274 | if (milliseconds_elapsed < flush_interval_milliseconds) |
275 | has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed); |
276 | } |
277 | |
278 | if (has_element) |
279 | { |
280 | if (element.first == EntryType::SHUTDOWN) |
281 | { |
282 | /// NOTE: MergeTree engine can write data even it is already in shutdown state. |
283 | flushImpl(element.first); |
284 | break; |
285 | } |
286 | else if (element.first == EntryType::FORCE_FLUSH) |
287 | { |
288 | flushImpl(element.first); |
289 | time_after_last_write.restart(); |
290 | continue; |
291 | } |
292 | else |
293 | data.push_back(element.second); |
294 | } |
295 | |
296 | size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; |
297 | if (milliseconds_elapsed >= flush_interval_milliseconds) |
298 | { |
299 | /// Write data to a table. |
300 | flushImpl(EntryType::AUTO_FLUSH); |
301 | time_after_last_write.restart(); |
302 | } |
303 | } |
304 | catch (...) |
305 | { |
306 | /// In case of exception we lost accumulated data - to avoid locking. |
307 | data.clear(); |
308 | tryLogCurrentException(__PRETTY_FUNCTION__); |
309 | } |
310 | } |
311 | } |
312 | |
313 | |
314 | template <typename LogElement> |
315 | void SystemLog<LogElement>::flushImpl(EntryType reason) |
316 | { |
317 | try |
318 | { |
319 | if ((reason == EntryType::AUTO_FLUSH || reason == EntryType::SHUTDOWN) && data.empty()) |
320 | return; |
321 | |
322 | LOG_TRACE(log, "Flushing system log" ); |
323 | |
324 | /// We check for existence of the table and create it as needed at every flush. |
325 | /// This is done to allow user to drop the table at any moment (new empty table will be created automatically). |
326 | /// BTW, flush method is called from single thread. |
327 | prepareTable(); |
328 | |
329 | Block block = LogElement::createBlock(); |
330 | for (const LogElement & elem : data) |
331 | elem.appendToBlock(block); |
332 | |
333 | /// Clear queue early, because insertion to the table could lead to generation of more log entrites |
334 | /// and pushing them to already full queue will lead to deadlock. |
335 | data.clear(); |
336 | |
337 | /// We write to table indirectly, using InterpreterInsertQuery. |
338 | /// This is needed to support DEFAULT-columns in table. |
339 | |
340 | std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>(); |
341 | insert->database = database_name; |
342 | insert->table = table_name; |
343 | ASTPtr query_ptr(insert.release()); |
344 | |
345 | InterpreterInsertQuery interpreter(query_ptr, context); |
346 | BlockIO io = interpreter.execute(); |
347 | |
348 | io.out->writePrefix(); |
349 | io.out->write(block); |
350 | io.out->writeSuffix(); |
351 | } |
352 | catch (...) |
353 | { |
354 | tryLogCurrentException(__PRETTY_FUNCTION__); |
355 | /// In case of exception, also clean accumulated data - to avoid locking. |
356 | data.clear(); |
357 | } |
358 | if (reason == EntryType::FORCE_FLUSH) |
359 | { |
360 | std::lock_guard lock(condvar_mutex); |
361 | force_flushing = false; |
362 | flush_condvar.notify_one(); |
363 | } |
364 | } |
365 | |
366 | |
367 | template <typename LogElement> |
368 | void SystemLog<LogElement>::prepareTable() |
369 | { |
370 | String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name); |
371 | |
372 | table = context.tryGetTable(database_name, table_name); |
373 | |
374 | if (table) |
375 | { |
376 | const Block expected = LogElement::createBlock(); |
377 | const Block actual = table->getSampleBlockNonMaterialized(); |
378 | |
379 | if (!blocksHaveEqualStructure(actual, expected)) |
380 | { |
381 | /// Rename the existing table. |
382 | int suffix = 0; |
383 | while (context.isTableExist(database_name, table_name + "_" + toString(suffix))) |
384 | ++suffix; |
385 | |
386 | auto rename = std::make_shared<ASTRenameQuery>(); |
387 | |
388 | ASTRenameQuery::Table from; |
389 | from.database = database_name; |
390 | from.table = table_name; |
391 | |
392 | ASTRenameQuery::Table to; |
393 | to.database = database_name; |
394 | to.table = table_name + "_" + toString(suffix); |
395 | |
396 | ASTRenameQuery::Element elem; |
397 | elem.from = from; |
398 | elem.to = to; |
399 | |
400 | rename->elements.emplace_back(elem); |
401 | |
402 | LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure." |
403 | " Renaming it to " << backQuoteIfNeed(to.table)); |
404 | |
405 | InterpreterRenameQuery(rename, context).execute(); |
406 | |
407 | /// The required table will be created. |
408 | table = nullptr; |
409 | } |
410 | else if (!is_prepared) |
411 | LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name()); |
412 | } |
413 | |
414 | if (!table) |
415 | { |
416 | /// Create the table. |
417 | LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name()); |
418 | |
419 | auto create = std::make_shared<ASTCreateQuery>(); |
420 | |
421 | create->database = database_name; |
422 | create->table = table_name; |
423 | |
424 | Block sample = LogElement::createBlock(); |
425 | |
426 | auto new_columns_list = std::make_shared<ASTColumns>(); |
427 | new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList())); |
428 | create->set(create->columns_list, new_columns_list); |
429 | |
430 | ParserStorage storage_parser; |
431 | ASTPtr storage_ast = parseQuery( |
432 | storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), |
433 | "Storage to create table for " + LogElement::name(), 0); |
434 | create->set(create->storage, storage_ast); |
435 | |
436 | InterpreterCreateQuery interpreter(create, context); |
437 | interpreter.setInternal(true); |
438 | interpreter.execute(); |
439 | |
440 | table = context.getTable(database_name, table_name); |
441 | } |
442 | |
443 | is_prepared = true; |
444 | } |
445 | |
446 | } |
447 | |