1#include <DataStreams/ParallelParsingBlockInputStream.h>
2
3namespace DB
4{
5
6void ParallelParsingBlockInputStream::segmentatorThreadFunction()
7{
8 setThreadName("Segmentator");
9 try
10 {
11 while (!finished)
12 {
13 const auto current_unit_number = segmentator_ticket_number % processing_units.size();
14 auto & unit = processing_units[current_unit_number];
15
16 {
17 std::unique_lock<std::mutex> lock(mutex);
18 segmentator_condvar.wait(lock,
19 [&]{ return unit.status == READY_TO_INSERT || finished; });
20 }
21
22 if (finished)
23 {
24 break;
25 }
26
27 assert(unit.status == READY_TO_INSERT);
28
29 // Segmentating the original input.
30 unit.segment.resize(0);
31
32 const bool have_more_data = file_segmentation_engine(original_buffer,
33 unit.segment, min_chunk_bytes);
34
35 unit.is_last = !have_more_data;
36 unit.status = READY_TO_PARSE;
37 scheduleParserThreadForUnitWithNumber(current_unit_number);
38 ++segmentator_ticket_number;
39
40 if (!have_more_data)
41 {
42 break;
43 }
44 }
45 }
46 catch (...)
47 {
48 onBackgroundException();
49 }
50}
51
52void ParallelParsingBlockInputStream::parserThreadFunction(size_t current_unit_number)
53{
54 try
55 {
56 setThreadName("ChunkParser");
57
58 auto & unit = processing_units[current_unit_number];
59
60 /*
61 * This is kind of suspicious -- the input_process_creator contract with
62 * respect to multithreaded use is not clear, but we hope that it is
63 * just a 'normal' factory class that doesn't have any state, and so we
64 * can use it from multiple threads simultaneously.
65 */
66 ReadBuffer read_buffer(unit.segment.data(), unit.segment.size(), 0);
67 auto parser = std::make_unique<InputStreamFromInputFormat>(
68 input_processor_creator(read_buffer, header,
69 row_input_format_params, format_settings));
70
71 unit.block_ext.block.clear();
72 unit.block_ext.block_missing_values.clear();
73
74 // We don't know how many blocks will be. So we have to read them all
75 // until an empty block occured.
76 Block block;
77 while (!finished && (block = parser->read()) != Block())
78 {
79 unit.block_ext.block.emplace_back(block);
80 unit.block_ext.block_missing_values.emplace_back(parser->getMissingValues());
81 }
82
83 // We suppose we will get at least some blocks for a non-empty buffer,
84 // except at the end of file. Also see a matching assert in readImpl().
85 assert(unit.is_last || unit.block_ext.block.size() > 0);
86
87 std::unique_lock<std::mutex> lock(mutex);
88 unit.status = READY_TO_READ;
89 reader_condvar.notify_all();
90 }
91 catch (...)
92 {
93 onBackgroundException();
94 }
95}
96
97void ParallelParsingBlockInputStream::onBackgroundException()
98{
99 tryLogCurrentException(__PRETTY_FUNCTION__);
100
101 std::unique_lock<std::mutex> lock(mutex);
102 if (!background_exception)
103 {
104 background_exception = std::current_exception();
105 }
106 finished = true;
107 reader_condvar.notify_all();
108 segmentator_condvar.notify_all();
109}
110
111Block ParallelParsingBlockInputStream::readImpl()
112{
113 if (isCancelledOrThrowIfKilled() || finished)
114 {
115 /**
116 * Check for background exception and rethrow it before we return.
117 */
118 std::unique_lock<std::mutex> lock(mutex);
119 if (background_exception)
120 {
121 lock.unlock();
122 cancel(false);
123 std::rethrow_exception(background_exception);
124 }
125
126 return Block{};
127 }
128
129 const auto current_unit_number = reader_ticket_number % processing_units.size();
130 auto & unit = processing_units[current_unit_number];
131
132 if (!next_block_in_current_unit.has_value())
133 {
134 // We have read out all the Blocks from the previous Processing Unit,
135 // wait for the current one to become ready.
136 std::unique_lock<std::mutex> lock(mutex);
137 reader_condvar.wait(lock, [&](){ return unit.status == READY_TO_READ || finished; });
138
139 if (finished)
140 {
141 /**
142 * Check for background exception and rethrow it before we return.
143 */
144 if (background_exception)
145 {
146 lock.unlock();
147 cancel(false);
148 std::rethrow_exception(background_exception);
149 }
150
151 return Block{};
152 }
153
154 assert(unit.status == READY_TO_READ);
155 next_block_in_current_unit = 0;
156 }
157
158 if (unit.block_ext.block.size() == 0)
159 {
160 /*
161 * Can we get zero blocks for an entire segment, when the format parser
162 * skips it entire content and does not create any blocks? Probably not,
163 * but if we ever do, we should add a loop around the above if, to skip
164 * these. Also see a matching assert in the parser thread.
165 */
166 assert(unit.is_last);
167 finished = true;
168 return Block{};
169 }
170
171 assert(next_block_in_current_unit.value() < unit.block_ext.block.size());
172
173 Block res = std::move(unit.block_ext.block.at(*next_block_in_current_unit));
174 last_block_missing_values = std::move(unit.block_ext.block_missing_values[*next_block_in_current_unit]);
175
176 next_block_in_current_unit.value() += 1;
177
178 if (*next_block_in_current_unit == unit.block_ext.block.size())
179 {
180 // Finished reading this Processing Unit, move to the next one.
181 next_block_in_current_unit.reset();
182 ++reader_ticket_number;
183
184 if (unit.is_last)
185 {
186 // It it was the last unit, we're finished.
187 finished = true;
188 }
189 else
190 {
191 // Pass the unit back to the segmentator.
192 std::unique_lock<std::mutex> lock(mutex);
193 unit.status = READY_TO_INSERT;
194 segmentator_condvar.notify_all();
195 }
196 }
197
198 return res;
199}
200
201
202}
203