1#pragma once
2
3#include <Storages/StorageDistributed.h>
4#include <Common/ThreadPool.h>
5
6#include <atomic>
7#include <thread>
8#include <mutex>
9#include <condition_variable>
10#include <IO/ReadBufferFromFile.h>
11
12
13namespace DB
14{
15
16/** Details of StorageDistributed.
17 * This type is not designed for standalone use.
18 */
19class StorageDistributedDirectoryMonitor
20{
21public:
22 StorageDistributedDirectoryMonitor(
23 StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_);
24
25 ~StorageDistributedDirectoryMonitor();
26
27 static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
28
29 void updatePath();
30
31 void flushAllData();
32
33 void shutdownAndDropAllData();
34private:
35 void run();
36 bool processFiles();
37 void processFile(const std::string & file_path);
38 void processFilesWithBatching(const std::map<UInt64, std::string> & files);
39
40 static bool isFileBrokenErrorCode(int code);
41 void markAsBroken(const std::string & file_path) const;
42 bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const;
43
44 std::string getLoggerName() const;
45
46 StorageDistributed & storage;
47 const ConnectionPoolPtr pool;
48 const std::string name;
49 std::string path;
50
51 const bool should_batch_inserts = false;
52 const size_t min_batched_block_size_rows = 0;
53 const size_t min_batched_block_size_bytes = 0;
54 String current_batch_file_path;
55
56 struct BatchHeader;
57 struct Batch;
58
59 size_t error_count{};
60 const std::chrono::milliseconds default_sleep_time;
61 std::chrono::milliseconds sleep_time;
62 const std::chrono::milliseconds max_sleep_time;
63 std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()};
64 std::atomic<bool> quit {false};
65 std::mutex mutex;
66 std::condition_variable cond;
67 Logger * log;
68 ActionBlocker & monitor_blocker;
69 ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this};
70
71 /// Read insert query and insert settings for backward compatible.
72 void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const;
73};
74
75}
76