1#pragma once
2
3#include <DataStreams/IBlockInputStream.h>
4#include <Formats/FormatFactory.h>
5#include <Common/ThreadPool.h>
6#include <Common/setThreadName.h>
7#include <IO/BufferWithOwnMemory.h>
8#include <IO/ReadBuffer.h>
9#include <Processors/Formats/IRowInputFormat.h>
10#include <Processors/Formats/InputStreamFromInputFormat.h>
11#include <Interpreters/Context.h>
12
13namespace DB
14{
15
16/**
17 * ORDER-PRESERVING parallel parsing of data formats.
18 * It splits original data into chunks. Then each chunk is parsed by different thread.
19 * The number of chunks equals to the number or parser threads.
20 * The size of chunk is equal to min_chunk_bytes_for_parallel_parsing setting.
21 *
22 * This stream has three kinds of threads: one segmentator, multiple parsers,
23 * and one reader thread -- that is, the one from which readImpl() is called.
24 * They operate one after another on parts of data called "processing units".
25 * One unit consists of buffer with raw data from file, filled by segmentator
26 * thread. This raw data is then parsed by a parser thread to form a number of
27 * Blocks. These Blocks are returned to the parent stream from readImpl().
28 * After being read out, a processing unit is reused, to save on allocating
29 * memory for the raw buffer. The processing units are organized into a circular
30 * array to facilitate reuse and to apply backpressure on the segmentator thread
31 * -- after it runs out of processing units, it has to wait for the reader to
32 * read out the previous blocks.
33 * The outline of what the threads do is as follows:
34 * segmentator thread:
35 * 1) wait for the next processing unit to become empty
36 * 2) fill it with a part of input file
37 * 3) start a parser thread
38 * 4) repeat until eof
39 * parser thread:
40 * 1) parse the given raw buffer without any synchronization
41 * 2) signal that the given unit is ready to read
42 * 3) finish
43 * readImpl():
44 * 1) wait for the next processing unit to become ready to read
45 * 2) take the blocks from the processing unit to return them to the caller
46 * 3) signal that the processing unit is empty
47 * 4) repeat until it encounters unit that is marked as "past_the_end"
48 * All threads must also check for cancel/eof/exception flags.
49 */
50class ParallelParsingBlockInputStream : public IBlockInputStream
51{
52private:
53 using ReadCallback = std::function<void()>;
54
55 using InputProcessorCreator = std::function<InputFormatPtr(
56 ReadBuffer & buf,
57 const Block & header,
58 const RowInputFormatParams & params,
59 const FormatSettings & settings)>;
60public:
61 struct InputCreatorParams
62 {
63 const Block & sample;
64 const RowInputFormatParams & row_input_format_params;
65 const FormatSettings &settings;
66 };
67
68 struct Params
69 {
70 ReadBuffer & read_buffer;
71 const InputProcessorCreator & input_processor_creator;
72 const InputCreatorParams & input_creator_params;
73 FormatFactory::FileSegmentationEngine file_segmentation_engine;
74 int max_threads;
75 size_t min_chunk_bytes;
76 };
77
78 explicit ParallelParsingBlockInputStream(const Params & params)
79 : header(params.input_creator_params.sample),
80 row_input_format_params(params.input_creator_params.row_input_format_params),
81 format_settings(params.input_creator_params.settings),
82 input_processor_creator(params.input_processor_creator),
83 min_chunk_bytes(params.min_chunk_bytes),
84 original_buffer(params.read_buffer),
85 // Subtract one thread that we use for segmentation and one for
86 // reading. After that, must have at least two threads left for
87 // parsing. See the assertion below.
88 pool(std::max(2, params.max_threads - 2)),
89 file_segmentation_engine(params.file_segmentation_engine)
90 {
91 // See comment above.
92 assert(params.max_threads >= 4);
93
94 // One unit for each thread, including segmentator and reader, plus a
95 // couple more units so that the segmentation thread doesn't spuriously
96 // bump into reader thread on wraparound.
97 processing_units.resize(params.max_threads + 2);
98
99 segmentator_thread = ThreadFromGlobalPool([this] { segmentatorThreadFunction(); });
100 }
101
102 String getName() const override { return "ParallelParsing"; }
103
104 ~ParallelParsingBlockInputStream() override
105 {
106 finishAndWait();
107 }
108
109 void cancel(bool kill) override
110 {
111 /**
112 * Can be called multiple times, from different threads. Saturate the
113 * the kill flag with OR.
114 */
115 if (kill)
116 is_killed = true;
117 is_cancelled = true;
118
119 /*
120 * The format parsers themselves are not being cancelled here, so we'll
121 * have to wait until they process the current block. Given that the
122 * chunk size is on the order of megabytes, this should't be too long.
123 * We can't call IInputFormat->cancel here, because the parser object is
124 * local to the parser thread, and we don't want to introduce any
125 * synchronization between parser threads and the other threads to get
126 * better performance. An ideal solution would be to add a callback to
127 * IInputFormat that checks whether it was cancelled.
128 */
129
130 finishAndWait();
131 }
132
133 Block getHeader() const override
134 {
135 return header;
136 }
137
138protected:
139 //Reader routine
140 Block readImpl() override;
141
142 const BlockMissingValues & getMissingValues() const override
143 {
144 return last_block_missing_values;
145 }
146
147private:
148 const Block header;
149 const RowInputFormatParams row_input_format_params;
150 const FormatSettings format_settings;
151 const InputProcessorCreator input_processor_creator;
152
153 const size_t min_chunk_bytes;
154
155 /*
156 * This is declared as atomic to avoid UB, because parser threads access it
157 * without synchronization.
158 */
159 std::atomic<bool> finished{false};
160
161 BlockMissingValues last_block_missing_values;
162
163 // Original ReadBuffer to read from.
164 ReadBuffer & original_buffer;
165
166 //Non-atomic because it is used in one thread.
167 std::optional<size_t> next_block_in_current_unit;
168 size_t segmentator_ticket_number{0};
169 size_t reader_ticket_number{0};
170
171 std::mutex mutex;
172 std::condition_variable reader_condvar;
173 std::condition_variable segmentator_condvar;
174
175 // There are multiple "parsers", that's why we use thread pool.
176 ThreadPool pool;
177 // Reading and segmentating the file
178 ThreadFromGlobalPool segmentator_thread;
179
180 // Function to segment the file. Then "parsers" will parse that segments.
181 FormatFactory::FileSegmentationEngine file_segmentation_engine;
182
183 enum ProcessingUnitStatus
184 {
185 READY_TO_INSERT,
186 READY_TO_PARSE,
187 READY_TO_READ
188 };
189
190 struct BlockExt
191 {
192 std::vector<Block> block;
193 std::vector<BlockMissingValues> block_missing_values;
194 };
195
196 struct ProcessingUnit
197 {
198 explicit ProcessingUnit()
199 : status(ProcessingUnitStatus::READY_TO_INSERT)
200 {
201 }
202
203 BlockExt block_ext;
204 Memory<> segment;
205 std::atomic<ProcessingUnitStatus> status;
206 bool is_last{false};
207 };
208
209 std::exception_ptr background_exception = nullptr;
210
211 // We use deque instead of vector, because it does not require a move
212 // constructor, which is absent for atomics that are inside ProcessingUnit.
213 std::deque<ProcessingUnit> processing_units;
214
215
216 void scheduleParserThreadForUnitWithNumber(size_t unit_number)
217 {
218 pool.scheduleOrThrowOnError(std::bind(&ParallelParsingBlockInputStream::parserThreadFunction, this, unit_number));
219 }
220
221 void finishAndWait()
222 {
223 finished = true;
224
225 {
226 std::unique_lock<std::mutex> lock(mutex);
227 segmentator_condvar.notify_all();
228 reader_condvar.notify_all();
229 }
230
231 if (segmentator_thread.joinable())
232 segmentator_thread.join();
233
234 try
235 {
236 pool.wait();
237 }
238 catch (...)
239 {
240 tryLogCurrentException(__PRETTY_FUNCTION__);
241 }
242 }
243
244 void segmentatorThreadFunction();
245 void parserThreadFunction(size_t bucket_num);
246
247 // Save/log a background exception, set termination flag, wake up all
248 // threads. This function is used by segmentator and parsed threads.
249 // readImpl() is called from the main thread, so the exception handling
250 // is different.
251 void onBackgroundException();
252};
253
254}
255