| 1 | #pragma once |
|---|---|
| 2 | |
| 3 | #include <Storages/StorageDistributed.h> |
| 4 | #include <Common/ThreadPool.h> |
| 5 | |
| 6 | #include <atomic> |
| 7 | #include <thread> |
| 8 | #include <mutex> |
| 9 | #include <condition_variable> |
| 10 | #include <IO/ReadBufferFromFile.h> |
| 11 | |
| 12 | |
| 13 | namespace DB |
| 14 | { |
| 15 | |
| 16 | /** Details of StorageDistributed. |
| 17 | * This type is not designed for standalone use. |
| 18 | */ |
| 19 | class StorageDistributedDirectoryMonitor |
| 20 | { |
| 21 | public: |
| 22 | StorageDistributedDirectoryMonitor( |
| 23 | StorageDistributed & storage_, std::string name_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_); |
| 24 | |
| 25 | ~StorageDistributedDirectoryMonitor(); |
| 26 | |
| 27 | static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage); |
| 28 | |
| 29 | void updatePath(); |
| 30 | |
| 31 | void flushAllData(); |
| 32 | |
| 33 | void shutdownAndDropAllData(); |
| 34 | private: |
| 35 | void run(); |
| 36 | bool processFiles(); |
| 37 | void processFile(const std::string & file_path); |
| 38 | void processFilesWithBatching(const std::map<UInt64, std::string> & files); |
| 39 | |
| 40 | static bool isFileBrokenErrorCode(int code); |
| 41 | void markAsBroken(const std::string & file_path) const; |
| 42 | bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const; |
| 43 | |
| 44 | std::string getLoggerName() const; |
| 45 | |
| 46 | StorageDistributed & storage; |
| 47 | const ConnectionPoolPtr pool; |
| 48 | const std::string name; |
| 49 | std::string path; |
| 50 | |
| 51 | const bool should_batch_inserts = false; |
| 52 | const size_t min_batched_block_size_rows = 0; |
| 53 | const size_t min_batched_block_size_bytes = 0; |
| 54 | String current_batch_file_path; |
| 55 | |
| 56 | struct BatchHeader; |
| 57 | struct Batch; |
| 58 | |
| 59 | size_t error_count{}; |
| 60 | const std::chrono::milliseconds default_sleep_time; |
| 61 | std::chrono::milliseconds sleep_time; |
| 62 | const std::chrono::milliseconds max_sleep_time; |
| 63 | std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()}; |
| 64 | std::atomic<bool> quit {false}; |
| 65 | std::mutex mutex; |
| 66 | std::condition_variable cond; |
| 67 | Logger * log; |
| 68 | ActionBlocker & monitor_blocker; |
| 69 | ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; |
| 70 | |
| 71 | /// Read insert query and insert settings for backward compatible. |
| 72 | void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const; |
| 73 | }; |
| 74 | |
| 75 | } |
| 76 |