1 | #include <Interpreters/SystemLog.h> |
2 | #include <Interpreters/QueryLog.h> |
3 | #include <Interpreters/QueryThreadLog.h> |
4 | #include <Interpreters/PartLog.h> |
5 | #include <Interpreters/TextLog.h> |
6 | #include <Interpreters/TraceLog.h> |
7 | #include <Interpreters/MetricLog.h> |
8 | |
9 | #include <Poco/Util/AbstractConfiguration.h> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | |
15 | namespace |
16 | { |
17 | |
18 | constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500; |
19 | |
20 | /// Creates a system log with MergeTree engine using parameters from config |
21 | template <typename TSystemLog> |
22 | std::shared_ptr<TSystemLog> createSystemLog( |
23 | Context & context, |
24 | const String & default_database_name, |
25 | const String & default_table_name, |
26 | const Poco::Util::AbstractConfiguration & config, |
27 | const String & config_prefix) |
28 | { |
29 | if (!config.has(config_prefix)) |
30 | return {}; |
31 | |
32 | String database = config.getString(config_prefix + ".database" , default_database_name); |
33 | String table = config.getString(config_prefix + ".table" , default_table_name); |
34 | String partition_by = config.getString(config_prefix + ".partition_by" , "toYYYYMM(event_date)" ); |
35 | String engine = "ENGINE = MergeTree PARTITION BY (" + partition_by + ") ORDER BY (event_date, event_time)" ; |
36 | |
37 | size_t flush_interval_milliseconds = config.getUInt64(config_prefix + ".flush_interval_milliseconds" , DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS); |
38 | |
39 | return std::make_shared<TSystemLog>(context, database, table, engine, flush_interval_milliseconds); |
40 | } |
41 | |
42 | } |
43 | |
44 | |
45 | SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config) |
46 | { |
47 | query_log = createSystemLog<QueryLog>(global_context, "system" , "query_log" , config, "query_log" ); |
48 | query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system" , "query_thread_log" , config, "query_thread_log" ); |
49 | part_log = createSystemLog<PartLog>(global_context, "system" , "part_log" , config, "part_log" ); |
50 | trace_log = createSystemLog<TraceLog>(global_context, "system" , "trace_log" , config, "trace_log" ); |
51 | text_log = createSystemLog<TextLog>(global_context, "system" , "text_log" , config, "text_log" ); |
52 | metric_log = createSystemLog<MetricLog>(global_context, "system" , "metric_log" , config, "metric_log" ); |
53 | |
54 | if (metric_log) |
55 | { |
56 | size_t collect_interval_milliseconds = config.getUInt64("metric_log.collect_interval_milliseconds" ); |
57 | metric_log->startCollectMetric(collect_interval_milliseconds); |
58 | } |
59 | |
60 | part_log_database = config.getString("part_log.database" , "system" ); |
61 | } |
62 | |
63 | |
64 | SystemLogs::~SystemLogs() |
65 | { |
66 | shutdown(); |
67 | } |
68 | |
69 | void SystemLogs::shutdown() |
70 | { |
71 | if (query_log) |
72 | query_log->shutdown(); |
73 | if (query_thread_log) |
74 | query_thread_log->shutdown(); |
75 | if (part_log) |
76 | part_log->shutdown(); |
77 | if (trace_log) |
78 | trace_log->shutdown(); |
79 | if (text_log) |
80 | text_log->shutdown(); |
81 | if (metric_log) |
82 | { |
83 | metric_log->stopCollectMetric(); |
84 | metric_log->shutdown(); |
85 | } |
86 | } |
87 | |
88 | } |
89 | |