1 | #include "ReplicasStatusHandler.h" |
---|---|
2 | |
3 | #include <Interpreters/Context.h> |
4 | #include <Storages/StorageReplicatedMergeTree.h> |
5 | #include <Common/HTMLForm.h> |
6 | #include <Common/typeid_cast.h> |
7 | #include <Databases/IDatabase.h> |
8 | #include <IO/HTTPCommon.h> |
9 | |
10 | #include <Poco/Net/HTTPServerRequest.h> |
11 | #include <Poco/Net/HTTPServerResponse.h> |
12 | |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | |
18 | ReplicasStatusHandler::ReplicasStatusHandler(IServer & server) |
19 | : context(server.context()) |
20 | { |
21 | } |
22 | |
23 | |
24 | void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) |
25 | { |
26 | try |
27 | { |
28 | HTMLForm params(request); |
29 | |
30 | /// Even if lag is small, output detailed information about the lag. |
31 | bool verbose = params.get("verbose", "") == "1"; |
32 | |
33 | const MergeTreeSettings & settings = context.getMergeTreeSettings(); |
34 | |
35 | bool ok = true; |
36 | std::stringstream message; |
37 | |
38 | auto databases = context.getDatabases(); |
39 | |
40 | /// Iterate through all the replicated tables. |
41 | for (const auto & db : databases) |
42 | { |
43 | /// Lazy database can not contain replicated tables |
44 | if (db.second->getEngineName() == "Lazy") |
45 | continue; |
46 | |
47 | for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) |
48 | { |
49 | auto & table = iterator->table(); |
50 | StorageReplicatedMergeTree * table_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()); |
51 | |
52 | if (!table_replicated) |
53 | continue; |
54 | |
55 | time_t absolute_delay = 0; |
56 | time_t relative_delay = 0; |
57 | |
58 | table_replicated->getReplicaDelays(absolute_delay, relative_delay); |
59 | |
60 | if ((settings.min_absolute_delay_to_close && absolute_delay >= static_cast<time_t>(settings.min_absolute_delay_to_close)) |
61 | || (settings.min_relative_delay_to_close && relative_delay >= static_cast<time_t>(settings.min_relative_delay_to_close))) |
62 | ok = false; |
63 | |
64 | message << backQuoteIfNeed(db.first) << "."<< backQuoteIfNeed(iterator->name()) |
65 | << ":\tAbsolute delay: "<< absolute_delay << ". Relative delay: "<< relative_delay << ".\n"; |
66 | } |
67 | } |
68 | |
69 | const auto & config = context.getConfigRef(); |
70 | setResponseDefaultHeaders(response, config.getUInt("keep_alive_timeout", 10)); |
71 | |
72 | if (ok && !verbose) |
73 | { |
74 | const char * data = "Ok.\n"; |
75 | response.sendBuffer(data, strlen(data)); |
76 | } |
77 | else |
78 | { |
79 | response.send() << message.rdbuf(); |
80 | } |
81 | } |
82 | catch (...) |
83 | { |
84 | tryLogCurrentException("ReplicasStatusHandler"); |
85 | |
86 | try |
87 | { |
88 | response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); |
89 | |
90 | if (!response.sent()) |
91 | { |
92 | /// We have not sent anything yet and we don't even know if we need to compress response. |
93 | response.send() << getCurrentExceptionMessage(false) << std::endl; |
94 | } |
95 | } |
96 | catch (...) |
97 | { |
98 | LOG_ERROR((&Logger::get("ReplicasStatusHandler")), "Cannot send exception to client"); |
99 | } |
100 | } |
101 | } |
102 | |
103 | |
104 | } |
105 |