1 | #include <DataStreams/ParallelParsingBlockInputStream.h> |
2 | |
3 | namespace DB |
4 | { |
5 | |
6 | void ParallelParsingBlockInputStream::segmentatorThreadFunction() |
7 | { |
8 | setThreadName("Segmentator" ); |
9 | try |
10 | { |
11 | while (!finished) |
12 | { |
13 | const auto current_unit_number = segmentator_ticket_number % processing_units.size(); |
14 | auto & unit = processing_units[current_unit_number]; |
15 | |
16 | { |
17 | std::unique_lock<std::mutex> lock(mutex); |
18 | segmentator_condvar.wait(lock, |
19 | [&]{ return unit.status == READY_TO_INSERT || finished; }); |
20 | } |
21 | |
22 | if (finished) |
23 | { |
24 | break; |
25 | } |
26 | |
27 | assert(unit.status == READY_TO_INSERT); |
28 | |
29 | // Segmentating the original input. |
30 | unit.segment.resize(0); |
31 | |
32 | const bool have_more_data = file_segmentation_engine(original_buffer, |
33 | unit.segment, min_chunk_bytes); |
34 | |
35 | unit.is_last = !have_more_data; |
36 | unit.status = READY_TO_PARSE; |
37 | scheduleParserThreadForUnitWithNumber(current_unit_number); |
38 | ++segmentator_ticket_number; |
39 | |
40 | if (!have_more_data) |
41 | { |
42 | break; |
43 | } |
44 | } |
45 | } |
46 | catch (...) |
47 | { |
48 | onBackgroundException(); |
49 | } |
50 | } |
51 | |
52 | void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number) |
53 | { |
54 | try |
55 | { |
56 | setThreadName("ChunkParser" ); |
57 | |
58 | auto & unit = processing_units[current_unit_number]; |
59 | |
60 | /* |
61 | * This is kind of suspicious -- the input_process_creator contract with |
62 | * respect to multithreaded use is not clear, but we hope that it is |
63 | * just a 'normal' factory class that doesn't have any state, and so we |
64 | * can use it from multiple threads simultaneously. |
65 | */ |
66 | ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0); |
67 | auto parser = std::make_unique<InputStreamFromInputFormat>( |
68 | input_processor_creator(read_buffer, header, |
69 | row_input_format_params, format_settings)); |
70 | |
71 | unit.block_ext.block.clear(); |
72 | unit.block_ext.block_missing_values.clear(); |
73 | |
74 | // We don't know how many blocks will be. So we have to read them all |
75 | // until an empty block occured. |
76 | Block block; |
77 | while (!finished && (block = parser->read()) != Block()) |
78 | { |
79 | unit.block_ext.block.emplace_back(block); |
80 | unit.block_ext.block_missing_values.emplace_back(parser->getMissingValues()); |
81 | } |
82 | |
83 | // We suppose we will get at least some blocks for a non-empty buffer, |
84 | // except at the end of file. Also see a matching assert in readImpl(). |
85 | assert(unit.is_last || unit.block_ext.block.size() > 0); |
86 | |
87 | std::unique_lock<std::mutex> lock(mutex); |
88 | unit.status = READY_TO_READ; |
89 | reader_condvar.notify_all(); |
90 | } |
91 | catch (...) |
92 | { |
93 | onBackgroundException(); |
94 | } |
95 | } |
96 | |
97 | void ParallelParsingBlockInputStream::onBackgroundException() |
98 | { |
99 | tryLogCurrentException(__PRETTY_FUNCTION__); |
100 | |
101 | std::unique_lock<std::mutex> lock(mutex); |
102 | if (!background_exception) |
103 | { |
104 | background_exception = std::current_exception(); |
105 | } |
106 | finished = true; |
107 | reader_condvar.notify_all(); |
108 | segmentator_condvar.notify_all(); |
109 | } |
110 | |
111 | Block ParallelParsingBlockInputStream::readImpl() |
112 | { |
113 | if (isCancelledOrThrowIfKilled() || finished) |
114 | { |
115 | /** |
116 | * Check for background exception and rethrow it before we return. |
117 | */ |
118 | std::unique_lock<std::mutex> lock(mutex); |
119 | if (background_exception) |
120 | { |
121 | lock.unlock(); |
122 | cancel(false); |
123 | std::rethrow_exception(background_exception); |
124 | } |
125 | |
126 | return Block{}; |
127 | } |
128 | |
129 | const auto current_unit_number = reader_ticket_number % processing_units.size(); |
130 | auto & unit = processing_units[current_unit_number]; |
131 | |
132 | if (!next_block_in_current_unit.has_value()) |
133 | { |
134 | // We have read out all the Blocks from the previous Processing Unit, |
135 | // wait for the current one to become ready. |
136 | std::unique_lock<std::mutex> lock(mutex); |
137 | reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; }); |
138 | |
139 | if (finished) |
140 | { |
141 | /** |
142 | * Check for background exception and rethrow it before we return. |
143 | */ |
144 | if (background_exception) |
145 | { |
146 | lock.unlock(); |
147 | cancel(false); |
148 | std::rethrow_exception(background_exception); |
149 | } |
150 | |
151 | return Block{}; |
152 | } |
153 | |
154 | assert(unit.status == READY_TO_READ); |
155 | next_block_in_current_unit = 0; |
156 | } |
157 | |
158 | if (unit.block_ext.block.size() == 0) |
159 | { |
160 | /* |
161 | * Can we get zero blocks for an entire segment, when the format parser |
162 | * skips it entire content and does not create any blocks? Probably not, |
163 | * but if we ever do, we should add a loop around the above if, to skip |
164 | * these. Also see a matching assert in the parser thread. |
165 | */ |
166 | assert(unit.is_last); |
167 | finished = true; |
168 | return Block{}; |
169 | } |
170 | |
171 | assert(next_block_in_current_unit.value() < unit.block_ext.block.size()); |
172 | |
173 | Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit)); |
174 | last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]); |
175 | |
176 | next_block_in_current_unit.value() += 1; |
177 | |
178 | if (*next_block_in_current_unit == unit.block_ext.block.size()) |
179 | { |
180 | // Finished reading this Processing Unit, move to the next one. |
181 | next_block_in_current_unit.reset(); |
182 | ++reader_ticket_number; |
183 | |
184 | if (unit.is_last) |
185 | { |
186 | // It it was the last unit, we're finished. |
187 | finished = true; |
188 | } |
189 | else |
190 | { |
191 | // Pass the unit back to the segmentator. |
192 | std::unique_lock<std::mutex> lock(mutex); |
193 | unit.status = READY_TO_INSERT; |
194 | segmentator_condvar.notify_all(); |
195 | } |
196 | } |
197 | |
198 | return res; |
199 | } |
200 | |
201 | |
202 | } |
203 | |