| 1 | #include <Interpreters/SystemLog.h> |
| 2 | #include <Interpreters/QueryLog.h> |
| 3 | #include <Interpreters/QueryThreadLog.h> |
| 4 | #include <Interpreters/PartLog.h> |
| 5 | #include <Interpreters/TextLog.h> |
| 6 | #include <Interpreters/TraceLog.h> |
| 7 | #include <Interpreters/MetricLog.h> |
| 8 | |
| 9 | #include <Poco/Util/AbstractConfiguration.h> |
| 10 | |
| 11 | |
| 12 | namespace DB |
| 13 | { |
| 14 | |
| 15 | namespace |
| 16 | { |
| 17 | |
| 18 | constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500; |
| 19 | |
| 20 | /// Creates a system log with MergeTree engine using parameters from config |
| 21 | template <typename TSystemLog> |
| 22 | std::shared_ptr<TSystemLog> createSystemLog( |
| 23 | Context & context, |
| 24 | const String & default_database_name, |
| 25 | const String & default_table_name, |
| 26 | const Poco::Util::AbstractConfiguration & config, |
| 27 | const String & config_prefix) |
| 28 | { |
| 29 | if (!config.has(config_prefix)) |
| 30 | return {}; |
| 31 | |
| 32 | String database = config.getString(config_prefix + ".database" , default_database_name); |
| 33 | String table = config.getString(config_prefix + ".table" , default_table_name); |
| 34 | String partition_by = config.getString(config_prefix + ".partition_by" , "toYYYYMM(event_date)" ); |
| 35 | String engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time)" ; |
| 36 | |
| 37 | size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds" , DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS); |
| 38 | |
| 39 | return std::make_shared<TSystemLog>(context, database, table, engine, flush_interval_milliseconds); |
| 40 | } |
| 41 | |
| 42 | } |
| 43 | |
| 44 | |
| 45 | SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config) |
| 46 | { |
| 47 | query_log = createSystemLog<QueryLog>(global_context, "system" , "query_log" , config, "query_log" ); |
| 48 | query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system" , "query_thread_log" , config, "query_thread_log" ); |
| 49 | part_log = createSystemLog<PartLog>(global_context, "system" , "part_log" , config, "part_log" ); |
| 50 | trace_log = createSystemLog<TraceLog>(global_context, "system" , "trace_log" , config, "trace_log" ); |
| 51 | text_log = createSystemLog<TextLog>(global_context, "system" , "text_log" , config, "text_log" ); |
| 52 | metric_log = createSystemLog<MetricLog>(global_context, "system" , "metric_log" , config, "metric_log" ); |
| 53 | |
| 54 | if (metric_log) |
| 55 | { |
| 56 | size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds" ); |
| 57 | metric_log->startCollectMetric(collect_interval_milliseconds); |
| 58 | } |
| 59 | |
| 60 | part_log_database = config.getString("part_log.database" , "system" ); |
| 61 | } |
| 62 | |
| 63 | |
| 64 | SystemLogs::~SystemLogs() |
| 65 | { |
| 66 | shutdown(); |
| 67 | } |
| 68 | |
| 69 | void SystemLogs::shutdown() |
| 70 | { |
| 71 | if (query_log) |
| 72 | query_log->shutdown(); |
| 73 | if (query_thread_log) |
| 74 | query_thread_log->shutdown(); |
| 75 | if (part_log) |
| 76 | part_log->shutdown(); |
| 77 | if (trace_log) |
| 78 | trace_log->shutdown(); |
| 79 | if (text_log) |
| 80 | text_log->shutdown(); |
| 81 | if (metric_log) |
| 82 | { |
| 83 | metric_log->stopCollectMetric(); |
| 84 | metric_log->shutdown(); |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | } |
| 89 | |