| 1 | #pragma once | 
|---|---|
| 2 | |
| 3 | #include <Storages/StorageDistributed.h> | 
| 4 | #include <Common/ThreadPool.h> | 
| 5 | |
| 6 | #include <atomic> | 
| 7 | #include <thread> | 
| 8 | #include <mutex> | 
| 9 | #include <condition_variable> | 
| 10 | #include <IO/ReadBufferFromFile.h> | 
| 11 | |
| 12 | |
| 13 | namespace DB | 
| 14 | { | 
| 15 | |
| 16 | /** Details of StorageDistributed. | 
| 17 | * This type is not designed for standalone use. | 
| 18 | */ | 
| 19 | class StorageDistributedDirectoryMonitor | 
| 20 | { | 
| 21 | public: | 
| 22 | StorageDistributedDirectoryMonitor( | 
| 23 | StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_); | 
| 24 | |
| 25 | ~StorageDistributedDirectoryMonitor(); | 
| 26 | |
| 27 | static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage); | 
| 28 | |
| 29 | void updatePath(); | 
| 30 | |
| 31 | void flushAllData(); | 
| 32 | |
| 33 | void shutdownAndDropAllData(); | 
| 34 | private: | 
| 35 | void run(); | 
| 36 | bool processFiles(); | 
| 37 | void processFile(const std::string & file_path); | 
| 38 | void processFilesWithBatching(const std::map<UInt64, std::string> & files); | 
| 39 | |
| 40 | static bool isFileBrokenErrorCode(int code); | 
| 41 | void markAsBroken(const std::string & file_path) const; | 
| 42 | bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const; | 
| 43 | |
| 44 | std::string getLoggerName() const; | 
| 45 | |
| 46 | StorageDistributed & storage; | 
| 47 | const ConnectionPoolPtr pool; | 
| 48 | const std::string name; | 
| 49 | std::string path; | 
| 50 | |
| 51 | const bool should_batch_inserts = false; | 
| 52 | const size_t min_batched_block_size_rows = 0; | 
| 53 | const size_t min_batched_block_size_bytes = 0; | 
| 54 | String current_batch_file_path; | 
| 55 | |
| 56 | struct BatchHeader; | 
| 57 | struct Batch; | 
| 58 | |
| 59 | size_t error_count{}; | 
| 60 | const std::chrono::milliseconds default_sleep_time; | 
| 61 | std::chrono::milliseconds sleep_time; | 
| 62 | const std::chrono::milliseconds max_sleep_time; | 
| 63 | std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()}; | 
| 64 | std::atomic<bool> quit {false}; | 
| 65 | std::mutex mutex; | 
| 66 | std::condition_variable cond; | 
| 67 | Logger * log; | 
| 68 | ActionBlocker & monitor_blocker; | 
| 69 | ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; | 
| 70 | |
| 71 | /// Read insert query and insert settings for backward compatible. | 
| 72 | void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const; | 
| 73 | }; | 
| 74 | |
| 75 | } | 
| 76 | 
