1 | #pragma once |
---|---|
2 | |
3 | #include <Storages/StorageDistributed.h> |
4 | #include <Common/ThreadPool.h> |
5 | |
6 | #include <atomic> |
7 | #include <thread> |
8 | #include <mutex> |
9 | #include <condition_variable> |
10 | #include <IO/ReadBufferFromFile.h> |
11 | |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | /** Details of StorageDistributed. |
17 | * This type is not designed for standalone use. |
18 | */ |
19 | class StorageDistributedDirectoryMonitor |
20 | { |
21 | public: |
22 | StorageDistributedDirectoryMonitor( |
23 | StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_); |
24 | |
25 | ~StorageDistributedDirectoryMonitor(); |
26 | |
27 | static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage); |
28 | |
29 | void updatePath(); |
30 | |
31 | void flushAllData(); |
32 | |
33 | void shutdownAndDropAllData(); |
34 | private: |
35 | void run(); |
36 | bool processFiles(); |
37 | void processFile(const std::string & file_path); |
38 | void processFilesWithBatching(const std::map<UInt64, std::string> & files); |
39 | |
40 | static bool isFileBrokenErrorCode(int code); |
41 | void markAsBroken(const std::string & file_path) const; |
42 | bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const; |
43 | |
44 | std::string getLoggerName() const; |
45 | |
46 | StorageDistributed & storage; |
47 | const ConnectionPoolPtr pool; |
48 | const std::string name; |
49 | std::string path; |
50 | |
51 | const bool should_batch_inserts = false; |
52 | const size_t min_batched_block_size_rows = 0; |
53 | const size_t min_batched_block_size_bytes = 0; |
54 | String current_batch_file_path; |
55 | |
56 | struct BatchHeader; |
57 | struct Batch; |
58 | |
59 | size_t error_count{}; |
60 | const std::chrono::milliseconds default_sleep_time; |
61 | std::chrono::milliseconds sleep_time; |
62 | const std::chrono::milliseconds max_sleep_time; |
63 | std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()}; |
64 | std::atomic<bool> quit {false}; |
65 | std::mutex mutex; |
66 | std::condition_variable cond; |
67 | Logger * log; |
68 | ActionBlocker & monitor_blocker; |
69 | ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; |
70 | |
71 | /// Read insert query and insert settings for backward compatible. |
72 | void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const; |
73 | }; |
74 | |
75 | } |
76 |