| 1 | #pragma once |
| 2 | |
| 3 | #include <DataStreams/IBlockInputStream.h> |
| 4 | #include <Formats/FormatFactory.h> |
| 5 | #include <Common/ThreadPool.h> |
| 6 | #include <Common/setThreadName.h> |
| 7 | #include <IO/BufferWithOwnMemory.h> |
| 8 | #include <IO/ReadBuffer.h> |
| 9 | #include <Processors/Formats/IRowInputFormat.h> |
| 10 | #include <Processors/Formats/InputStreamFromInputFormat.h> |
| 11 | #include <Interpreters/Context.h> |
| 12 | |
| 13 | namespace DB |
| 14 | { |
| 15 | |
| 16 | /** |
| 17 | * ORDER-PRESERVING parallel parsing of data formats. |
| 18 | * It splits original data into chunks. Then each chunk is parsed by different thread. |
| 19 | * The number of chunks equals to the number or parser threads. |
| 20 | * The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting. |
| 21 | * |
| 22 | * This stream has three kinds of threads: one segmentator, multiple parsers, |
| 23 | * and one reader thread -- that is, the one from which readImpl() is called. |
| 24 | * They operate one after another on parts of data called "processing units". |
| 25 | * One unit consists of buffer with raw data from file, filled by segmentator |
| 26 | * thread. This raw data is then parsed by a parser thread to form a number of |
| 27 | * Blocks. These Blocks are returned to the parent stream from readImpl(). |
| 28 | * After being read out, a processing unit is reused, to save on allocating |
| 29 | * memory for the raw buffer. The processing units are organized into a circular |
| 30 | * array to facilitate reuse and to apply backpressure on the segmentator thread |
| 31 | * -- after it runs out of processing units, it has to wait for the reader to |
| 32 | * read out the previous blocks. |
| 33 | * The outline of what the threads do is as follows: |
| 34 | * segmentator thread: |
| 35 | * 1) wait for the next processing unit to become empty |
| 36 | * 2) fill it with a part of input file |
| 37 | * 3) start a parser thread |
| 38 | * 4) repeat until eof |
| 39 | * parser thread: |
| 40 | * 1) parse the given raw buffer without any synchronization |
| 41 | * 2) signal that the given unit is ready to read |
| 42 | * 3) finish |
| 43 | * readImpl(): |
| 44 | * 1) wait for the next processing unit to become ready to read |
| 45 | * 2) take the blocks from the processing unit to return them to the caller |
| 46 | * 3) signal that the processing unit is empty |
| 47 | * 4) repeat until it encounters unit that is marked as "past_the_end" |
| 48 | * All threads must also check for cancel/eof/exception flags. |
| 49 | */ |
| 50 | class ParallelParsingBlockInputStream : public IBlockInputStream |
| 51 | { |
| 52 | private: |
| 53 | using ReadCallback = std::function<void()>; |
| 54 | |
| 55 | using InputProcessorCreator = std::function<InputFormatPtr( |
| 56 | ReadBuffer & buf, |
| 57 | const Block & , |
| 58 | const RowInputFormatParams & params, |
| 59 | const FormatSettings & settings)>; |
| 60 | public: |
| 61 | struct InputCreatorParams |
| 62 | { |
| 63 | const Block & sample; |
| 64 | const RowInputFormatParams & row_input_format_params; |
| 65 | const FormatSettings &settings; |
| 66 | }; |
| 67 | |
| 68 | struct Params |
| 69 | { |
| 70 | ReadBuffer & read_buffer; |
| 71 | const InputProcessorCreator & input_processor_creator; |
| 72 | const InputCreatorParams & input_creator_params; |
| 73 | FormatFactory::FileSegmentationEngine file_segmentation_engine; |
| 74 | int max_threads; |
| 75 | size_t min_chunk_bytes; |
| 76 | }; |
| 77 | |
| 78 | explicit ParallelParsingBlockInputStream(const Params & params) |
| 79 | : header(params.input_creator_params.sample), |
| 80 | row_input_format_params(params.input_creator_params.row_input_format_params), |
| 81 | format_settings(params.input_creator_params.settings), |
| 82 | input_processor_creator(params.input_processor_creator), |
| 83 | min_chunk_bytes(params.min_chunk_bytes), |
| 84 | original_buffer(params.read_buffer), |
| 85 | // Subtract one thread that we use for segmentation and one for |
| 86 | // reading. After that, must have at least two threads left for |
| 87 | // parsing. See the assertion below. |
| 88 | pool(std::max(2, params.max_threads - 2)), |
| 89 | file_segmentation_engine(params.file_segmentation_engine) |
| 90 | { |
| 91 | // See comment above. |
| 92 | assert(params.max_threads >= 4); |
| 93 | |
| 94 | // One unit for each thread, including segmentator and reader, plus a |
| 95 | // couple more units so that the segmentation thread doesn't spuriously |
| 96 | // bump into reader thread on wraparound. |
| 97 | processing_units.resize(params.max_threads + 2); |
| 98 | |
| 99 | segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); }); |
| 100 | } |
| 101 | |
| 102 | String getName() const override { return "ParallelParsing" ; } |
| 103 | |
| 104 | ~ParallelParsingBlockInputStream() override |
| 105 | { |
| 106 | finishAndWait(); |
| 107 | } |
| 108 | |
| 109 | void cancel(bool kill) override |
| 110 | { |
| 111 | /** |
| 112 | * Can be called multiple times, from different threads. Saturate the |
| 113 | * the kill flag with OR. |
| 114 | */ |
| 115 | if (kill) |
| 116 | is_killed = true; |
| 117 | is_cancelled = true; |
| 118 | |
| 119 | /* |
| 120 | * The format parsers themselves are not being cancelled here, so we'll |
| 121 | * have to wait until they process the current block. Given that the |
| 122 | * chunk size is on the order of megabytes, this should't be too long. |
| 123 | * We can't call IInputFormat->cancel here, because the parser object is |
| 124 | * local to the parser thread, and we don't want to introduce any |
| 125 | * synchronization between parser threads and the other threads to get |
| 126 | * better performance. An ideal solution would be to add a callback to |
| 127 | * IInputFormat that checks whether it was cancelled. |
| 128 | */ |
| 129 | |
| 130 | finishAndWait(); |
| 131 | } |
| 132 | |
| 133 | Block () const override |
| 134 | { |
| 135 | return header; |
| 136 | } |
| 137 | |
| 138 | protected: |
| 139 | //Reader routine |
| 140 | Block readImpl() override; |
| 141 | |
| 142 | const BlockMissingValues & getMissingValues() const override |
| 143 | { |
| 144 | return last_block_missing_values; |
| 145 | } |
| 146 | |
| 147 | private: |
| 148 | const Block ; |
| 149 | const RowInputFormatParams row_input_format_params; |
| 150 | const FormatSettings format_settings; |
| 151 | const InputProcessorCreator input_processor_creator; |
| 152 | |
| 153 | const size_t min_chunk_bytes; |
| 154 | |
| 155 | /* |
| 156 | * This is declared as atomic to avoid UB, because parser threads access it |
| 157 | * without synchronization. |
| 158 | */ |
| 159 | std::atomic<bool> finished{false}; |
| 160 | |
| 161 | BlockMissingValues last_block_missing_values; |
| 162 | |
| 163 | // Original ReadBuffer to read from. |
| 164 | ReadBuffer & original_buffer; |
| 165 | |
| 166 | //Non-atomic because it is used in one thread. |
| 167 | std::optional<size_t> next_block_in_current_unit; |
| 168 | size_t segmentator_ticket_number{0}; |
| 169 | size_t reader_ticket_number{0}; |
| 170 | |
| 171 | std::mutex mutex; |
| 172 | std::condition_variable reader_condvar; |
| 173 | std::condition_variable segmentator_condvar; |
| 174 | |
| 175 | // There are multiple "parsers", that's why we use thread pool. |
| 176 | ThreadPool pool; |
| 177 | // Reading and segmentating the file |
| 178 | ThreadFromGlobalPool segmentator_thread; |
| 179 | |
| 180 | // Function to segment the file. Then "parsers" will parse that segments. |
| 181 | FormatFactory::FileSegmentationEngine file_segmentation_engine; |
| 182 | |
| 183 | enum ProcessingUnitStatus |
| 184 | { |
| 185 | READY_TO_INSERT, |
| 186 | READY_TO_PARSE, |
| 187 | READY_TO_READ |
| 188 | }; |
| 189 | |
| 190 | struct BlockExt |
| 191 | { |
| 192 | std::vector<Block> block; |
| 193 | std::vector<BlockMissingValues> block_missing_values; |
| 194 | }; |
| 195 | |
| 196 | struct ProcessingUnit |
| 197 | { |
| 198 | explicit ProcessingUnit() |
| 199 | : status(ProcessingUnitStatus::READY_TO_INSERT) |
| 200 | { |
| 201 | } |
| 202 | |
| 203 | BlockExt block_ext; |
| 204 | Memory<> segment; |
| 205 | std::atomic<ProcessingUnitStatus> status; |
| 206 | bool is_last{false}; |
| 207 | }; |
| 208 | |
| 209 | std::exception_ptr background_exception = nullptr; |
| 210 | |
| 211 | // We use deque instead of vector, because it does not require a move |
| 212 | // constructor, which is absent for atomics that are inside ProcessingUnit. |
| 213 | std::deque<ProcessingUnit> processing_units; |
| 214 | |
| 215 | |
| 216 | void scheduleParserThreadForUnitWithNumber(size_t unit_number) |
| 217 | { |
| 218 | pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number)); |
| 219 | } |
| 220 | |
| 221 | void finishAndWait() |
| 222 | { |
| 223 | finished = true; |
| 224 | |
| 225 | { |
| 226 | std::unique_lock<std::mutex> lock(mutex); |
| 227 | segmentator_condvar.notify_all(); |
| 228 | reader_condvar.notify_all(); |
| 229 | } |
| 230 | |
| 231 | if (segmentator_thread.joinable()) |
| 232 | segmentator_thread.join(); |
| 233 | |
| 234 | try |
| 235 | { |
| 236 | pool.wait(); |
| 237 | } |
| 238 | catch (...) |
| 239 | { |
| 240 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 241 | } |
| 242 | } |
| 243 | |
| 244 | void segmentatorThreadFunction(); |
| 245 | void parserThreadFunction(size_t bucket_num); |
| 246 | |
| 247 | // Save/log a background exception, set termination flag, wake up all |
| 248 | // threads. This function is used by segmentator and parsed threads. |
| 249 | // readImpl() is called from the main thread, so the exception handling |
| 250 | // is different. |
| 251 | void onBackgroundException(); |
| 252 | }; |
| 253 | |
| 254 | } |
| 255 | |