| 1 | #pragma once |
| 2 | |
| 3 | #include <thread> |
| 4 | #include <atomic> |
| 5 | #include <condition_variable> |
| 6 | #include <boost/noncopyable.hpp> |
| 7 | #include <common/logger_useful.h> |
| 8 | #include <Core/Types.h> |
| 9 | #include <Common/ConcurrentBoundedQueue.h> |
| 10 | #include <Storages/IStorage.h> |
| 11 | #include <Interpreters/Context.h> |
| 12 | #include <Common/Stopwatch.h> |
| 13 | #include <Parsers/ASTCreateQuery.h> |
| 14 | #include <Parsers/parseQuery.h> |
| 15 | #include <Parsers/ParserCreateQuery.h> |
| 16 | #include <Parsers/ASTRenameQuery.h> |
| 17 | #include <Parsers/formatAST.h> |
| 18 | #include <Parsers/ASTIndexDeclaration.h> |
| 19 | #include <Parsers/ASTInsertQuery.h> |
| 20 | #include <Interpreters/InterpreterCreateQuery.h> |
| 21 | #include <Interpreters/InterpreterRenameQuery.h> |
| 22 | #include <Interpreters/InterpreterInsertQuery.h> |
| 23 | #include <Common/setThreadName.h> |
| 24 | #include <Common/ThreadPool.h> |
| 25 | #include <IO/WriteHelpers.h> |
| 26 | #include <Poco/Util/AbstractConfiguration.h> |
| 27 | |
| 28 | |
| 29 | namespace DB |
| 30 | { |
| 31 | |
| 32 | |
| 33 | /** Allow to store structured log in system table. |
| 34 | * |
| 35 | * Logging is asynchronous. Data is put into queue from where it will be read by separate thread. |
| 36 | * That thread inserts log into a table with no more than specified periodicity. |
| 37 | */ |
| 38 | |
| 39 | /** Structure of log, template parameter. |
| 40 | * Structure could change on server version update. |
| 41 | * If on first write, existing table has different structure, |
| 42 | * then it get renamed (put aside) and new table is created. |
| 43 | */ |
| 44 | /* Example: |
| 45 | struct LogElement |
| 46 | { |
| 47 | /// default constructor must be available |
| 48 | /// fields |
| 49 | |
| 50 | static std::string name(); |
| 51 | static Block createBlock(); |
| 52 | void appendToBlock(Block & block) const; |
| 53 | }; |
| 54 | */ |
| 55 | |
| 56 | |
| 57 | #define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576 |
| 58 | |
| 59 | class Context; |
| 60 | class QueryLog; |
| 61 | class QueryThreadLog; |
| 62 | class PartLog; |
| 63 | class TextLog; |
| 64 | class TraceLog; |
| 65 | class MetricLog; |
| 66 | |
| 67 | /// System logs should be destroyed in destructor of the last Context and before tables, |
| 68 | /// because SystemLog destruction makes insert query while flushing data into underlying tables |
| 69 | struct SystemLogs |
| 70 | { |
| 71 | SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config); |
| 72 | ~SystemLogs(); |
| 73 | |
| 74 | void shutdown(); |
| 75 | |
| 76 | std::shared_ptr<QueryLog> query_log; /// Used to log queries. |
| 77 | std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads. |
| 78 | std::shared_ptr<PartLog> part_log; /// Used to log operations with parts |
| 79 | std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler |
| 80 | std::shared_ptr<TextLog> text_log; /// Used to log all text messages. |
| 81 | std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics. |
| 82 | |
| 83 | String part_log_database; |
| 84 | }; |
| 85 | |
| 86 | |
| 87 | template <typename LogElement> |
| 88 | class SystemLog : private boost::noncopyable |
| 89 | { |
| 90 | public: |
| 91 | using Self = SystemLog; |
| 92 | |
| 93 | /** Parameter: table name where to write log. |
| 94 | * If table is not exists, then it get created with specified engine. |
| 95 | * If it already exists, then its structure is checked to be compatible with structure of log record. |
| 96 | * If it is compatible, then existing table will be used. |
| 97 | * If not - then existing table will be renamed to same name but with suffix '_N' at end, |
| 98 | * where N - is a minimal number from 1, for that table with corresponding name doesn't exist yet; |
| 99 | * and new table get created - as if previous table was not exist. |
| 100 | */ |
| 101 | SystemLog( |
| 102 | Context & context_, |
| 103 | const String & database_name_, |
| 104 | const String & table_name_, |
| 105 | const String & storage_def_, |
| 106 | size_t flush_interval_milliseconds_); |
| 107 | |
| 108 | ~SystemLog(); |
| 109 | |
| 110 | /** Append a record into log. |
| 111 | * Writing to table will be done asynchronously and in case of failure, record could be lost. |
| 112 | */ |
| 113 | void add(const LogElement & element); |
| 114 | |
| 115 | /// Flush data in the buffer to disk |
| 116 | void flush(); |
| 117 | |
| 118 | /// Stop the background flush thread before destructor. No more data will be written. |
| 119 | void shutdown(); |
| 120 | |
| 121 | protected: |
| 122 | Context & context; |
| 123 | const String database_name; |
| 124 | const String table_name; |
| 125 | const String storage_def; |
| 126 | StoragePtr table; |
| 127 | const size_t flush_interval_milliseconds; |
| 128 | std::atomic<bool> is_shutdown{false}; |
| 129 | |
| 130 | enum class EntryType |
| 131 | { |
| 132 | LOG_ELEMENT = 0, |
| 133 | AUTO_FLUSH, |
| 134 | FORCE_FLUSH, |
| 135 | SHUTDOWN, |
| 136 | }; |
| 137 | |
| 138 | using QueueItem = std::pair<EntryType, LogElement>; |
| 139 | |
| 140 | /// Queue is bounded. But its size is quite large to not block in all normal cases. |
| 141 | ConcurrentBoundedQueue<QueueItem> queue {DBMS_SYSTEM_LOG_QUEUE_SIZE}; |
| 142 | |
| 143 | /** Data that was pulled from queue. Data is accumulated here before enough time passed. |
| 144 | * It's possible to implement double-buffering, but we assume that insertion into table is faster |
| 145 | * than accumulation of large amount of log records (for example, for query log - processing of large amount of queries). |
| 146 | */ |
| 147 | std::vector<LogElement> data; |
| 148 | |
| 149 | Logger * log; |
| 150 | |
| 151 | /** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table. |
| 152 | */ |
| 153 | ThreadFromGlobalPool saving_thread; |
| 154 | |
| 155 | void threadFunction(); |
| 156 | |
| 157 | /** Creates new table if it does not exist. |
| 158 | * Renames old table if its structure is not suitable. |
| 159 | * This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created. |
| 160 | */ |
| 161 | bool is_prepared = false; |
| 162 | void prepareTable(); |
| 163 | |
| 164 | std::mutex flush_mutex; |
| 165 | std::mutex condvar_mutex; |
| 166 | std::condition_variable flush_condvar; |
| 167 | bool force_flushing = false; |
| 168 | |
| 169 | /// flushImpl can be executed only in saving_thread. |
| 170 | void flushImpl(EntryType reason); |
| 171 | }; |
| 172 | |
| 173 | |
| 174 | template <typename LogElement> |
| 175 | SystemLog<LogElement>::SystemLog(Context & context_, |
| 176 | const String & database_name_, |
| 177 | const String & table_name_, |
| 178 | const String & storage_def_, |
| 179 | size_t flush_interval_milliseconds_) |
| 180 | : context(context_), |
| 181 | database_name(database_name_), table_name(table_name_), storage_def(storage_def_), |
| 182 | flush_interval_milliseconds(flush_interval_milliseconds_) |
| 183 | { |
| 184 | log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")" ); |
| 185 | |
| 186 | data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE); |
| 187 | saving_thread = ThreadFromGlobalPool([this] { threadFunction(); }); |
| 188 | } |
| 189 | |
| 190 | |
| 191 | template <typename LogElement> |
| 192 | void SystemLog<LogElement>::add(const LogElement & element) |
| 193 | { |
| 194 | if (is_shutdown) |
| 195 | return; |
| 196 | |
| 197 | /// Without try we could block here in case of queue overflow. |
| 198 | if (!queue.tryPush({EntryType::LOG_ELEMENT, element})) |
| 199 | LOG_ERROR(log, "SystemLog queue is full" ); |
| 200 | } |
| 201 | |
| 202 | |
| 203 | template <typename LogElement> |
| 204 | void SystemLog<LogElement>::flush() |
| 205 | { |
| 206 | if (is_shutdown) |
| 207 | return; |
| 208 | |
| 209 | std::lock_guard flush_lock(flush_mutex); |
| 210 | force_flushing = true; |
| 211 | |
| 212 | /// Tell thread to execute extra flush. |
| 213 | queue.push({EntryType::FORCE_FLUSH, {}}); |
| 214 | |
| 215 | /// Wait for flush being finished. |
| 216 | std::unique_lock lock(condvar_mutex); |
| 217 | while (force_flushing) |
| 218 | flush_condvar.wait(lock); |
| 219 | } |
| 220 | |
| 221 | |
| 222 | template <typename LogElement> |
| 223 | void SystemLog<LogElement>::shutdown() |
| 224 | { |
| 225 | bool old_val = false; |
| 226 | if (!is_shutdown.compare_exchange_strong(old_val, true)) |
| 227 | return; |
| 228 | |
| 229 | /// Tell thread to shutdown. |
| 230 | queue.push({EntryType::SHUTDOWN, {}}); |
| 231 | saving_thread.join(); |
| 232 | } |
| 233 | |
| 234 | |
| 235 | template <typename LogElement> |
| 236 | SystemLog<LogElement>::~SystemLog() |
| 237 | { |
| 238 | shutdown(); |
| 239 | } |
| 240 | |
| 241 | |
| 242 | template <typename LogElement> |
| 243 | void SystemLog<LogElement>::threadFunction() |
| 244 | { |
| 245 | setThreadName("SystemLogFlush" ); |
| 246 | |
| 247 | Stopwatch time_after_last_write; |
| 248 | bool first = true; |
| 249 | |
| 250 | while (true) |
| 251 | { |
| 252 | try |
| 253 | { |
| 254 | if (first) |
| 255 | { |
| 256 | time_after_last_write.restart(); |
| 257 | first = false; |
| 258 | } |
| 259 | |
| 260 | QueueItem element; |
| 261 | bool has_element = false; |
| 262 | |
| 263 | /// data.size() is increased only in this function |
| 264 | /// TODO: get rid of data and queue duality |
| 265 | |
| 266 | if (data.empty()) |
| 267 | { |
| 268 | queue.pop(element); |
| 269 | has_element = true; |
| 270 | } |
| 271 | else |
| 272 | { |
| 273 | size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; |
| 274 | if (milliseconds_elapsed < flush_interval_milliseconds) |
| 275 | has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed); |
| 276 | } |
| 277 | |
| 278 | if (has_element) |
| 279 | { |
| 280 | if (element.first == EntryType::SHUTDOWN) |
| 281 | { |
| 282 | /// NOTE: MergeTree engine can write data even it is already in shutdown state. |
| 283 | flushImpl(element.first); |
| 284 | break; |
| 285 | } |
| 286 | else if (element.first == EntryType::FORCE_FLUSH) |
| 287 | { |
| 288 | flushImpl(element.first); |
| 289 | time_after_last_write.restart(); |
| 290 | continue; |
| 291 | } |
| 292 | else |
| 293 | data.push_back(element.second); |
| 294 | } |
| 295 | |
| 296 | size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; |
| 297 | if (milliseconds_elapsed >= flush_interval_milliseconds) |
| 298 | { |
| 299 | /// Write data to a table. |
| 300 | flushImpl(EntryType::AUTO_FLUSH); |
| 301 | time_after_last_write.restart(); |
| 302 | } |
| 303 | } |
| 304 | catch (...) |
| 305 | { |
| 306 | /// In case of exception we lost accumulated data - to avoid locking. |
| 307 | data.clear(); |
| 308 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 309 | } |
| 310 | } |
| 311 | } |
| 312 | |
| 313 | |
| 314 | template <typename LogElement> |
| 315 | void SystemLog<LogElement>::flushImpl(EntryType reason) |
| 316 | { |
| 317 | try |
| 318 | { |
| 319 | if ((reason == EntryType::AUTO_FLUSH || reason == EntryType::SHUTDOWN) && data.empty()) |
| 320 | return; |
| 321 | |
| 322 | LOG_TRACE(log, "Flushing system log" ); |
| 323 | |
| 324 | /// We check for existence of the table and create it as needed at every flush. |
| 325 | /// This is done to allow user to drop the table at any moment (new empty table will be created automatically). |
| 326 | /// BTW, flush method is called from single thread. |
| 327 | prepareTable(); |
| 328 | |
| 329 | Block block = LogElement::createBlock(); |
| 330 | for (const LogElement & elem : data) |
| 331 | elem.appendToBlock(block); |
| 332 | |
| 333 | /// Clear queue early, because insertion to the table could lead to generation of more log entrites |
| 334 | /// and pushing them to already full queue will lead to deadlock. |
| 335 | data.clear(); |
| 336 | |
| 337 | /// We write to table indirectly, using InterpreterInsertQuery. |
| 338 | /// This is needed to support DEFAULT-columns in table. |
| 339 | |
| 340 | std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>(); |
| 341 | insert->database = database_name; |
| 342 | insert->table = table_name; |
| 343 | ASTPtr query_ptr(insert.release()); |
| 344 | |
| 345 | InterpreterInsertQuery interpreter(query_ptr, context); |
| 346 | BlockIO io = interpreter.execute(); |
| 347 | |
| 348 | io.out->writePrefix(); |
| 349 | io.out->write(block); |
| 350 | io.out->writeSuffix(); |
| 351 | } |
| 352 | catch (...) |
| 353 | { |
| 354 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 355 | /// In case of exception, also clean accumulated data - to avoid locking. |
| 356 | data.clear(); |
| 357 | } |
| 358 | if (reason == EntryType::FORCE_FLUSH) |
| 359 | { |
| 360 | std::lock_guard lock(condvar_mutex); |
| 361 | force_flushing = false; |
| 362 | flush_condvar.notify_one(); |
| 363 | } |
| 364 | } |
| 365 | |
| 366 | |
| 367 | template <typename LogElement> |
| 368 | void SystemLog<LogElement>::prepareTable() |
| 369 | { |
| 370 | String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name); |
| 371 | |
| 372 | table = context.tryGetTable(database_name, table_name); |
| 373 | |
| 374 | if (table) |
| 375 | { |
| 376 | const Block expected = LogElement::createBlock(); |
| 377 | const Block actual = table->getSampleBlockNonMaterialized(); |
| 378 | |
| 379 | if (!blocksHaveEqualStructure(actual, expected)) |
| 380 | { |
| 381 | /// Rename the existing table. |
| 382 | int suffix = 0; |
| 383 | while (context.isTableExist(database_name, table_name + "_" + toString(suffix))) |
| 384 | ++suffix; |
| 385 | |
| 386 | auto rename = std::make_shared<ASTRenameQuery>(); |
| 387 | |
| 388 | ASTRenameQuery::Table from; |
| 389 | from.database = database_name; |
| 390 | from.table = table_name; |
| 391 | |
| 392 | ASTRenameQuery::Table to; |
| 393 | to.database = database_name; |
| 394 | to.table = table_name + "_" + toString(suffix); |
| 395 | |
| 396 | ASTRenameQuery::Element elem; |
| 397 | elem.from = from; |
| 398 | elem.to = to; |
| 399 | |
| 400 | rename->elements.emplace_back(elem); |
| 401 | |
| 402 | LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure." |
| 403 | " Renaming it to " << backQuoteIfNeed(to.table)); |
| 404 | |
| 405 | InterpreterRenameQuery(rename, context).execute(); |
| 406 | |
| 407 | /// The required table will be created. |
| 408 | table = nullptr; |
| 409 | } |
| 410 | else if (!is_prepared) |
| 411 | LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name()); |
| 412 | } |
| 413 | |
| 414 | if (!table) |
| 415 | { |
| 416 | /// Create the table. |
| 417 | LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name()); |
| 418 | |
| 419 | auto create = std::make_shared<ASTCreateQuery>(); |
| 420 | |
| 421 | create->database = database_name; |
| 422 | create->table = table_name; |
| 423 | |
| 424 | Block sample = LogElement::createBlock(); |
| 425 | |
| 426 | auto new_columns_list = std::make_shared<ASTColumns>(); |
| 427 | new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList())); |
| 428 | create->set(create->columns_list, new_columns_list); |
| 429 | |
| 430 | ParserStorage storage_parser; |
| 431 | ASTPtr storage_ast = parseQuery( |
| 432 | storage_parser, storage_def.data(), storage_def.data() + storage_def.size(), |
| 433 | "Storage to create table for " + LogElement::name(), 0); |
| 434 | create->set(create->storage, storage_ast); |
| 435 | |
| 436 | InterpreterCreateQuery interpreter(create, context); |
| 437 | interpreter.setInternal(true); |
| 438 | interpreter.execute(); |
| 439 | |
| 440 | table = context.getTable(database_name, table_name); |
| 441 | } |
| 442 | |
| 443 | is_prepared = true; |
| 444 | } |
| 445 | |
| 446 | } |
| 447 | |