1#pragma once
2
3#include <set>
4#include <map>
5#include <list>
6#include <mutex>
7#include <thread>
8#include <atomic>
9#include <boost/noncopyable.hpp>
10#include <Poco/Event.h>
11#include <Core/Types.h>
12#include <common/logger_useful.h>
13#include <Core/BackgroundSchedulePool.h>
14#include <Storages/CheckResults.h>
15
16namespace DB
17{
18
19class StorageReplicatedMergeTree;
20
21
22/** Checks the integrity of the parts requested for validation.
23 *
24 * Identifies the extra parts and removes them from the working set.
25 * Find the missing parts and add them for download from replicas.
26 * Checks the integrity of the data and, in the event of a violation,
27 * removes a part from the working set and adds it for download from replicas.
28 */
29class ReplicatedMergeTreePartCheckThread
30{
31public:
32 ReplicatedMergeTreePartCheckThread(StorageReplicatedMergeTree & storage_);
33 ~ReplicatedMergeTreePartCheckThread();
34
35 /// Processing of the queue to be checked is done in the background thread, which you must first start.
36 void start();
37 void stop();
38
39 /// Don't create more than one instance of this object simultaneously.
40 struct TemporarilyStop : private boost::noncopyable
41 {
42 ReplicatedMergeTreePartCheckThread * parent;
43
44 TemporarilyStop(ReplicatedMergeTreePartCheckThread * parent_) : parent(parent_)
45 {
46 parent->stop();
47 }
48
49 TemporarilyStop(TemporarilyStop && old) : parent(old.parent)
50 {
51 old.parent = nullptr;
52 }
53
54 ~TemporarilyStop()
55 {
56 if (parent)
57 parent->start();
58 }
59 };
60
61 TemporarilyStop temporarilyStop() { return TemporarilyStop(this); }
62
63 /// Add a part (for which there are suspicions that it is missing, damaged or not needed) in the queue for check.
64 /// delay_to_check_seconds - check no sooner than the specified number of seconds.
65 void enqueuePart(const String & name, time_t delay_to_check_seconds = 0);
66
67 /// Get the number of parts in the queue for check.
68 size_t size() const;
69
70 /// Check part by name
71 CheckResult checkPart(const String & part_name);
72
73private:
74 void run();
75
76 void searchForMissingPart(const String & part_name);
77
78 StorageReplicatedMergeTree & storage;
79 String log_name;
80 Logger * log;
81
82 using StringSet = std::set<String>;
83 using PartToCheck = std::pair<String, time_t>; /// The name of the part and the minimum time to check (or zero, if not important).
84 using PartsToCheckQueue = std::list<PartToCheck>;
85
86 /** Parts for which you want to check one of two:
87 * - If we have the part, check, its data with its checksums, and them with ZooKeeper.
88 * - If we do not have a part, check to see if it (or the part covering it) exists anywhere on another replicas.
89 */
90
91 mutable std::mutex parts_mutex;
92 StringSet parts_set;
93 PartsToCheckQueue parts_queue;
94
95 std::mutex start_stop_mutex;
96 std::atomic<bool> need_stop { false };
97 BackgroundSchedulePool::TaskHolder task;
98};
99
100}
101