1 | #pragma once |
2 | |
3 | #include <DataStreams/IBlockInputStream.h> |
4 | #include <Formats/FormatFactory.h> |
5 | #include <Common/ThreadPool.h> |
6 | #include <Common/setThreadName.h> |
7 | #include <IO/BufferWithOwnMemory.h> |
8 | #include <IO/ReadBuffer.h> |
9 | #include <Processors/Formats/IRowInputFormat.h> |
10 | #include <Processors/Formats/InputStreamFromInputFormat.h> |
11 | #include <Interpreters/Context.h> |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | /** |
17 | * ORDER-PRESERVING parallel parsing of data formats. |
18 | * It splits original data into chunks. Then each chunk is parsed by different thread. |
19 | * The number of chunks equals to the number or parser threads. |
20 | * The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting. |
21 | * |
22 | * This stream has three kinds of threads: one segmentator, multiple parsers, |
23 | * and one reader thread -- that is, the one from which readImpl() is called. |
24 | * They operate one after another on parts of data called "processing units". |
25 | * One unit consists of buffer with raw data from file, filled by segmentator |
26 | * thread. This raw data is then parsed by a parser thread to form a number of |
27 | * Blocks. These Blocks are returned to the parent stream from readImpl(). |
28 | * After being read out, a processing unit is reused, to save on allocating |
29 | * memory for the raw buffer. The processing units are organized into a circular |
30 | * array to facilitate reuse and to apply backpressure on the segmentator thread |
31 | * -- after it runs out of processing units, it has to wait for the reader to |
32 | * read out the previous blocks. |
33 | * The outline of what the threads do is as follows: |
34 | * segmentator thread: |
35 | * 1) wait for the next processing unit to become empty |
36 | * 2) fill it with a part of input file |
37 | * 3) start a parser thread |
38 | * 4) repeat until eof |
39 | * parser thread: |
40 | * 1) parse the given raw buffer without any synchronization |
41 | * 2) signal that the given unit is ready to read |
42 | * 3) finish |
43 | * readImpl(): |
44 | * 1) wait for the next processing unit to become ready to read |
45 | * 2) take the blocks from the processing unit to return them to the caller |
46 | * 3) signal that the processing unit is empty |
47 | * 4) repeat until it encounters unit that is marked as "past_the_end" |
48 | * All threads must also check for cancel/eof/exception flags. |
49 | */ |
50 | class ParallelParsingBlockInputStream : public IBlockInputStream |
51 | { |
52 | private: |
53 | using ReadCallback = std::function<void()>; |
54 | |
55 | using InputProcessorCreator = std::function<InputFormatPtr( |
56 | ReadBuffer & buf, |
57 | const Block & , |
58 | const RowInputFormatParams & params, |
59 | const FormatSettings & settings)>; |
60 | public: |
61 | struct InputCreatorParams |
62 | { |
63 | const Block & sample; |
64 | const RowInputFormatParams & row_input_format_params; |
65 | const FormatSettings &settings; |
66 | }; |
67 | |
68 | struct Params |
69 | { |
70 | ReadBuffer & read_buffer; |
71 | const InputProcessorCreator & input_processor_creator; |
72 | const InputCreatorParams & input_creator_params; |
73 | FormatFactory::FileSegmentationEngine file_segmentation_engine; |
74 | int max_threads; |
75 | size_t min_chunk_bytes; |
76 | }; |
77 | |
78 | explicit ParallelParsingBlockInputStream(const Params & params) |
79 | : header(params.input_creator_params.sample), |
80 | row_input_format_params(params.input_creator_params.row_input_format_params), |
81 | format_settings(params.input_creator_params.settings), |
82 | input_processor_creator(params.input_processor_creator), |
83 | min_chunk_bytes(params.min_chunk_bytes), |
84 | original_buffer(params.read_buffer), |
85 | // Subtract one thread that we use for segmentation and one for |
86 | // reading. After that, must have at least two threads left for |
87 | // parsing. See the assertion below. |
88 | pool(std::max(2, params.max_threads - 2)), |
89 | file_segmentation_engine(params.file_segmentation_engine) |
90 | { |
91 | // See comment above. |
92 | assert(params.max_threads >= 4); |
93 | |
94 | // One unit for each thread, including segmentator and reader, plus a |
95 | // couple more units so that the segmentation thread doesn't spuriously |
96 | // bump into reader thread on wraparound. |
97 | processing_units.resize(params.max_threads + 2); |
98 | |
99 | segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); }); |
100 | } |
101 | |
102 | String getName() const override { return "ParallelParsing" ; } |
103 | |
104 | ~ParallelParsingBlockInputStream() override |
105 | { |
106 | finishAndWait(); |
107 | } |
108 | |
109 | void cancel(bool kill) override |
110 | { |
111 | /** |
112 | * Can be called multiple times, from different threads. Saturate the |
113 | * the kill flag with OR. |
114 | */ |
115 | if (kill) |
116 | is_killed = true; |
117 | is_cancelled = true; |
118 | |
119 | /* |
120 | * The format parsers themselves are not being cancelled here, so we'll |
121 | * have to wait until they process the current block. Given that the |
122 | * chunk size is on the order of megabytes, this should't be too long. |
123 | * We can't call IInputFormat->cancel here, because the parser object is |
124 | * local to the parser thread, and we don't want to introduce any |
125 | * synchronization between parser threads and the other threads to get |
126 | * better performance. An ideal solution would be to add a callback to |
127 | * IInputFormat that checks whether it was cancelled. |
128 | */ |
129 | |
130 | finishAndWait(); |
131 | } |
132 | |
133 | Block () const override |
134 | { |
135 | return header; |
136 | } |
137 | |
138 | protected: |
139 | //Reader routine |
140 | Block readImpl() override; |
141 | |
142 | const BlockMissingValues & getMissingValues() const override |
143 | { |
144 | return last_block_missing_values; |
145 | } |
146 | |
147 | private: |
148 | const Block ; |
149 | const RowInputFormatParams row_input_format_params; |
150 | const FormatSettings format_settings; |
151 | const InputProcessorCreator input_processor_creator; |
152 | |
153 | const size_t min_chunk_bytes; |
154 | |
155 | /* |
156 | * This is declared as atomic to avoid UB, because parser threads access it |
157 | * without synchronization. |
158 | */ |
159 | std::atomic<bool> finished{false}; |
160 | |
161 | BlockMissingValues last_block_missing_values; |
162 | |
163 | // Original ReadBuffer to read from. |
164 | ReadBuffer & original_buffer; |
165 | |
166 | //Non-atomic because it is used in one thread. |
167 | std::optional<size_t> next_block_in_current_unit; |
168 | size_t segmentator_ticket_number{0}; |
169 | size_t reader_ticket_number{0}; |
170 | |
171 | std::mutex mutex; |
172 | std::condition_variable reader_condvar; |
173 | std::condition_variable segmentator_condvar; |
174 | |
175 | // There are multiple "parsers", that's why we use thread pool. |
176 | ThreadPool pool; |
177 | // Reading and segmentating the file |
178 | ThreadFromGlobalPool segmentator_thread; |
179 | |
180 | // Function to segment the file. Then "parsers" will parse that segments. |
181 | FormatFactory::FileSegmentationEngine file_segmentation_engine; |
182 | |
183 | enum ProcessingUnitStatus |
184 | { |
185 | READY_TO_INSERT, |
186 | READY_TO_PARSE, |
187 | READY_TO_READ |
188 | }; |
189 | |
190 | struct BlockExt |
191 | { |
192 | std::vector<Block> block; |
193 | std::vector<BlockMissingValues> block_missing_values; |
194 | }; |
195 | |
196 | struct ProcessingUnit |
197 | { |
198 | explicit ProcessingUnit() |
199 | : status(ProcessingUnitStatus::READY_TO_INSERT) |
200 | { |
201 | } |
202 | |
203 | BlockExt block_ext; |
204 | Memory<> segment; |
205 | std::atomic<ProcessingUnitStatus> status; |
206 | bool is_last{false}; |
207 | }; |
208 | |
209 | std::exception_ptr background_exception = nullptr; |
210 | |
211 | // We use deque instead of vector, because it does not require a move |
212 | // constructor, which is absent for atomics that are inside ProcessingUnit. |
213 | std::deque<ProcessingUnit> processing_units; |
214 | |
215 | |
216 | void scheduleParserThreadForUnitWithNumber(size_t unit_number) |
217 | { |
218 | pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number)); |
219 | } |
220 | |
221 | void finishAndWait() |
222 | { |
223 | finished = true; |
224 | |
225 | { |
226 | std::unique_lock<std::mutex> lock(mutex); |
227 | segmentator_condvar.notify_all(); |
228 | reader_condvar.notify_all(); |
229 | } |
230 | |
231 | if (segmentator_thread.joinable()) |
232 | segmentator_thread.join(); |
233 | |
234 | try |
235 | { |
236 | pool.wait(); |
237 | } |
238 | catch (...) |
239 | { |
240 | tryLogCurrentException(__PRETTY_FUNCTION__); |
241 | } |
242 | } |
243 | |
244 | void segmentatorThreadFunction(); |
245 | void parserThreadFunction(size_t bucket_num); |
246 | |
247 | // Save/log a background exception, set termination flag, wake up all |
248 | // threads. This function is used by segmentator and parsed threads. |
249 | // readImpl() is called from the main thread, so the exception handling |
250 | // is different. |
251 | void onBackgroundException(); |
252 | }; |
253 | |
254 | } |
255 | |