1#pragma once
2
3#include <thread>
4#include <atomic>
5#include <condition_variable>
6#include <boost/noncopyable.hpp>
7#include <common/logger_useful.h>
8#include <Core/Types.h>
9#include <Common/ConcurrentBoundedQueue.h>
10#include <Storages/IStorage.h>
11#include <Interpreters/Context.h>
12#include <Common/Stopwatch.h>
13#include <Parsers/ASTCreateQuery.h>
14#include <Parsers/parseQuery.h>
15#include <Parsers/ParserCreateQuery.h>
16#include <Parsers/ASTRenameQuery.h>
17#include <Parsers/formatAST.h>
18#include <Parsers/ASTIndexDeclaration.h>
19#include <Parsers/ASTInsertQuery.h>
20#include <Interpreters/InterpreterCreateQuery.h>
21#include <Interpreters/InterpreterRenameQuery.h>
22#include <Interpreters/InterpreterInsertQuery.h>
23#include <Common/setThreadName.h>
24#include <Common/ThreadPool.h>
25#include <IO/WriteHelpers.h>
26#include <Poco/Util/AbstractConfiguration.h>
27
28
29namespace DB
30{
31
32
33/** Allow to store structured log in system table.
34 *
35 * Logging is asynchronous. Data is put into queue from where it will be read by separate thread.
36 * That thread inserts log into a table with no more than specified periodicity.
37 */
38
39/** Structure of log, template parameter.
40 * Structure could change on server version update.
41 * If on first write, existing table has different structure,
42 * then it get renamed (put aside) and new table is created.
43 */
44/* Example:
45 struct LogElement
46 {
47 /// default constructor must be available
48 /// fields
49
50 static std::string name();
51 static Block createBlock();
52 void appendToBlock(Block & block) const;
53 };
54 */
55
56
57#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
58
59class Context;
60class QueryLog;
61class QueryThreadLog;
62class PartLog;
63class TextLog;
64class TraceLog;
65class MetricLog;
66
67/// System logs should be destroyed in destructor of the last Context and before tables,
68/// because SystemLog destruction makes insert query while flushing data into underlying tables
69struct SystemLogs
70{
71 SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config);
72 ~SystemLogs();
73
74 void shutdown();
75
76 std::shared_ptr<QueryLog> query_log; /// Used to log queries.
77 std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
78 std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
79 std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
80 std::shared_ptr<TextLog> text_log; /// Used to log all text messages.
81 std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
82
83 String part_log_database;
84};
85
86
87template <typename LogElement>
88class SystemLog : private boost::noncopyable
89{
90public:
91 using Self = SystemLog;
92
93 /** Parameter: table name where to write log.
94 * If table is not exists, then it get created with specified engine.
95 * If it already exists, then its structure is checked to be compatible with structure of log record.
96 * If it is compatible, then existing table will be used.
97 * If not - then existing table will be renamed to same name but with suffix '_N' at end,
98 * where N - is a minimal number from 1, for that table with corresponding name doesn't exist yet;
99 * and new table get created - as if previous table was not exist.
100 */
101 SystemLog(
102 Context & context_,
103 const String & database_name_,
104 const String & table_name_,
105 const String & storage_def_,
106 size_t flush_interval_milliseconds_);
107
108 ~SystemLog();
109
110 /** Append a record into log.
111 * Writing to table will be done asynchronously and in case of failure, record could be lost.
112 */
113 void add(const LogElement & element);
114
115 /// Flush data in the buffer to disk
116 void flush();
117
118 /// Stop the background flush thread before destructor. No more data will be written.
119 void shutdown();
120
121protected:
122 Context & context;
123 const String database_name;
124 const String table_name;
125 const String storage_def;
126 StoragePtr table;
127 const size_t flush_interval_milliseconds;
128 std::atomic<bool> is_shutdown{false};
129
130 enum class EntryType
131 {
132 LOG_ELEMENT = 0,
133 AUTO_FLUSH,
134 FORCE_FLUSH,
135 SHUTDOWN,
136 };
137
138 using QueueItem = std::pair<EntryType, LogElement>;
139
140 /// Queue is bounded. But its size is quite large to not block in all normal cases.
141 ConcurrentBoundedQueue<QueueItem> queue {DBMS_SYSTEM_LOG_QUEUE_SIZE};
142
143 /** Data that was pulled from queue. Data is accumulated here before enough time passed.
144 * It's possible to implement double-buffering, but we assume that insertion into table is faster
145 * than accumulation of large amount of log records (for example, for query log - processing of large amount of queries).
146 */
147 std::vector<LogElement> data;
148
149 Logger * log;
150
151 /** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table.
152 */
153 ThreadFromGlobalPool saving_thread;
154
155 void threadFunction();
156
157 /** Creates new table if it does not exist.
158 * Renames old table if its structure is not suitable.
159 * This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
160 */
161 bool is_prepared = false;
162 void prepareTable();
163
164 std::mutex flush_mutex;
165 std::mutex condvar_mutex;
166 std::condition_variable flush_condvar;
167 bool force_flushing = false;
168
169 /// flushImpl can be executed only in saving_thread.
170 void flushImpl(EntryType reason);
171};
172
173
174template <typename LogElement>
175SystemLog<LogElement>::SystemLog(Context & context_,
176 const String & database_name_,
177 const String & table_name_,
178 const String & storage_def_,
179 size_t flush_interval_milliseconds_)
180 : context(context_),
181 database_name(database_name_), table_name(table_name_), storage_def(storage_def_),
182 flush_interval_milliseconds(flush_interval_milliseconds_)
183{
184 log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");
185
186 data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE);
187 saving_thread = ThreadFromGlobalPool([this] { threadFunction(); });
188}
189
190
191template <typename LogElement>
192void SystemLog<LogElement>::add(const LogElement & element)
193{
194 if (is_shutdown)
195 return;
196
197 /// Without try we could block here in case of queue overflow.
198 if (!queue.tryPush({EntryType::LOG_ELEMENT, element}))
199 LOG_ERROR(log, "SystemLog queue is full");
200}
201
202
203template <typename LogElement>
204void SystemLog<LogElement>::flush()
205{
206 if (is_shutdown)
207 return;
208
209 std::lock_guard flush_lock(flush_mutex);
210 force_flushing = true;
211
212 /// Tell thread to execute extra flush.
213 queue.push({EntryType::FORCE_FLUSH, {}});
214
215 /// Wait for flush being finished.
216 std::unique_lock lock(condvar_mutex);
217 while (force_flushing)
218 flush_condvar.wait(lock);
219}
220
221
222template <typename LogElement>
223void SystemLog<LogElement>::shutdown()
224{
225 bool old_val = false;
226 if (!is_shutdown.compare_exchange_strong(old_val, true))
227 return;
228
229 /// Tell thread to shutdown.
230 queue.push({EntryType::SHUTDOWN, {}});
231 saving_thread.join();
232}
233
234
235template <typename LogElement>
236SystemLog<LogElement>::~SystemLog()
237{
238 shutdown();
239}
240
241
242template <typename LogElement>
243void SystemLog<LogElement>::threadFunction()
244{
245 setThreadName("SystemLogFlush");
246
247 Stopwatch time_after_last_write;
248 bool first = true;
249
250 while (true)
251 {
252 try
253 {
254 if (first)
255 {
256 time_after_last_write.restart();
257 first = false;
258 }
259
260 QueueItem element;
261 bool has_element = false;
262
263 /// data.size() is increased only in this function
264 /// TODO: get rid of data and queue duality
265
266 if (data.empty())
267 {
268 queue.pop(element);
269 has_element = true;
270 }
271 else
272 {
273 size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
274 if (milliseconds_elapsed < flush_interval_milliseconds)
275 has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed);
276 }
277
278 if (has_element)
279 {
280 if (element.first == EntryType::SHUTDOWN)
281 {
282 /// NOTE: MergeTree engine can write data even it is already in shutdown state.
283 flushImpl(element.first);
284 break;
285 }
286 else if (element.first == EntryType::FORCE_FLUSH)
287 {
288 flushImpl(element.first);
289 time_after_last_write.restart();
290 continue;
291 }
292 else
293 data.push_back(element.second);
294 }
295
296 size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
297 if (milliseconds_elapsed >= flush_interval_milliseconds)
298 {
299 /// Write data to a table.
300 flushImpl(EntryType::AUTO_FLUSH);
301 time_after_last_write.restart();
302 }
303 }
304 catch (...)
305 {
306 /// In case of exception we lost accumulated data - to avoid locking.
307 data.clear();
308 tryLogCurrentException(__PRETTY_FUNCTION__);
309 }
310 }
311}
312
313
314template <typename LogElement>
315void SystemLog<LogElement>::flushImpl(EntryType reason)
316{
317 try
318 {
319 if ((reason == EntryType::AUTO_FLUSH || reason == EntryType::SHUTDOWN) && data.empty())
320 return;
321
322 LOG_TRACE(log, "Flushing system log");
323
324 /// We check for existence of the table and create it as needed at every flush.
325 /// This is done to allow user to drop the table at any moment (new empty table will be created automatically).
326 /// BTW, flush method is called from single thread.
327 prepareTable();
328
329 Block block = LogElement::createBlock();
330 for (const LogElement & elem : data)
331 elem.appendToBlock(block);
332
333 /// Clear queue early, because insertion to the table could lead to generation of more log entrites
334 /// and pushing them to already full queue will lead to deadlock.
335 data.clear();
336
337 /// We write to table indirectly, using InterpreterInsertQuery.
338 /// This is needed to support DEFAULT-columns in table.
339
340 std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
341 insert->database = database_name;
342 insert->table = table_name;
343 ASTPtr query_ptr(insert.release());
344
345 InterpreterInsertQuery interpreter(query_ptr, context);
346 BlockIO io = interpreter.execute();
347
348 io.out->writePrefix();
349 io.out->write(block);
350 io.out->writeSuffix();
351 }
352 catch (...)
353 {
354 tryLogCurrentException(__PRETTY_FUNCTION__);
355 /// In case of exception, also clean accumulated data - to avoid locking.
356 data.clear();
357 }
358 if (reason == EntryType::FORCE_FLUSH)
359 {
360 std::lock_guard lock(condvar_mutex);
361 force_flushing = false;
362 flush_condvar.notify_one();
363 }
364}
365
366
367template <typename LogElement>
368void SystemLog<LogElement>::prepareTable()
369{
370 String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
371
372 table = context.tryGetTable(database_name, table_name);
373
374 if (table)
375 {
376 const Block expected = LogElement::createBlock();
377 const Block actual = table->getSampleBlockNonMaterialized();
378
379 if (!blocksHaveEqualStructure(actual, expected))
380 {
381 /// Rename the existing table.
382 int suffix = 0;
383 while (context.isTableExist(database_name, table_name + "_" + toString(suffix)))
384 ++suffix;
385
386 auto rename = std::make_shared<ASTRenameQuery>();
387
388 ASTRenameQuery::Table from;
389 from.database = database_name;
390 from.table = table_name;
391
392 ASTRenameQuery::Table to;
393 to.database = database_name;
394 to.table = table_name + "_" + toString(suffix);
395
396 ASTRenameQuery::Element elem;
397 elem.from = from;
398 elem.to = to;
399
400 rename->elements.emplace_back(elem);
401
402 LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure."
403 " Renaming it to " << backQuoteIfNeed(to.table));
404
405 InterpreterRenameQuery(rename, context).execute();
406
407 /// The required table will be created.
408 table = nullptr;
409 }
410 else if (!is_prepared)
411 LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name());
412 }
413
414 if (!table)
415 {
416 /// Create the table.
417 LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name());
418
419 auto create = std::make_shared<ASTCreateQuery>();
420
421 create->database = database_name;
422 create->table = table_name;
423
424 Block sample = LogElement::createBlock();
425
426 auto new_columns_list = std::make_shared<ASTColumns>();
427 new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList()));
428 create->set(create->columns_list, new_columns_list);
429
430 ParserStorage storage_parser;
431 ASTPtr storage_ast = parseQuery(
432 storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
433 "Storage to create table for " + LogElement::name(), 0);
434 create->set(create->storage, storage_ast);
435
436 InterpreterCreateQuery interpreter(create, context);
437 interpreter.setInternal(true);
438 interpreter.execute();
439
440 table = context.getTable(database_name, table_name);
441 }
442
443 is_prepared = true;
444}
445
446}
447