1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/csv/reader.h" |
19 | |
20 | #include <cstdint> |
21 | #include <cstring> |
22 | #include <limits> |
23 | #include <memory> |
24 | #include <sstream> |
25 | #include <string> |
26 | #include <unordered_map> |
27 | #include <utility> |
28 | #include <vector> |
29 | |
30 | #include "arrow/buffer.h" |
31 | #include "arrow/csv/chunker.h" |
32 | #include "arrow/csv/column-builder.h" |
33 | #include "arrow/csv/options.h" |
34 | #include "arrow/csv/parser.h" |
35 | #include "arrow/io/readahead.h" |
36 | #include "arrow/status.h" |
37 | #include "arrow/table.h" |
38 | #include "arrow/type.h" |
39 | #include "arrow/util/logging.h" |
40 | #include "arrow/util/macros.h" |
41 | #include "arrow/util/task-group.h" |
42 | #include "arrow/util/thread-pool.h" |
43 | |
44 | namespace arrow { |
45 | |
46 | class MemoryPool; |
47 | |
48 | namespace io { |
49 | |
50 | class InputStream; |
51 | |
52 | } // namespace io |
53 | |
54 | namespace csv { |
55 | |
56 | using internal::GetCpuThreadPool; |
57 | using internal::ThreadPool; |
58 | using io::internal::ReadaheadBuffer; |
59 | using io::internal::ReadaheadSpooler; |
60 | |
61 | static constexpr int64_t kDefaultLeftPadding = 2048; // 2 kB |
62 | static constexpr int64_t kDefaultRightPadding = 16; |
63 | |
64 | ///////////////////////////////////////////////////////////////////////// |
65 | // Base class for common functionality |
66 | |
67 | class BaseTableReader : public csv::TableReader { |
68 | public: |
69 | BaseTableReader(MemoryPool* pool, const ReadOptions& read_options, |
70 | const ParseOptions& parse_options, |
71 | const ConvertOptions& convert_options) |
72 | : pool_(pool), |
73 | read_options_(read_options), |
74 | parse_options_(parse_options), |
75 | convert_options_(convert_options) {} |
76 | |
77 | protected: |
78 | // Read a next data block, stitch it to trailing data |
79 | Status ReadNextBlock() { |
80 | bool trailing_data = cur_size_ > 0; |
81 | ReadaheadBuffer rh; |
82 | |
83 | if (trailing_data) { |
84 | if (readahead_->GetLeftPadding() < cur_size_) { |
85 | // Growth heuristic to try and ensure sufficient left padding |
86 | // in subsequent reads |
87 | readahead_->SetLeftPadding(cur_size_ * 3 / 2); |
88 | } |
89 | } |
90 | |
91 | RETURN_NOT_OK(readahead_->Read(&rh)); |
92 | if (!rh.buffer) { |
93 | // EOF, let caller finish with existing data |
94 | eof_ = true; |
95 | return Status::OK(); |
96 | } |
97 | |
98 | std::shared_ptr<Buffer> new_block = rh.buffer; |
99 | uint8_t* new_data = rh.buffer->mutable_data() + rh.left_padding; |
100 | int64_t new_size = rh.buffer->size() - rh.left_padding - rh.right_padding; |
101 | DCHECK_GT(new_size, 0); // ensured by ReadaheadSpooler |
102 | |
103 | if (trailing_cr_ && new_data[0] == '\n') { |
104 | // Skip '\r\n' line separator that started at the end of previous block |
105 | ++new_data; |
106 | --new_size; |
107 | } |
108 | trailing_cr_ = (new_data[new_size - 1] == '\r'); |
109 | |
110 | if (trailing_data) { |
111 | // Try to copy trailing data at the beginning of new block |
112 | if (cur_size_ <= rh.left_padding) { |
113 | // Can left-extend new block inside padding area |
114 | new_data -= cur_size_; |
115 | new_size += cur_size_; |
116 | std::memcpy(new_data, cur_data_, cur_size_); |
117 | } else { |
118 | // Need to allocate bigger block and concatenate trailing + present data |
119 | RETURN_NOT_OK( |
120 | AllocateBuffer(pool_, cur_size_ + new_size + rh.right_padding, &new_block)); |
121 | std::memcpy(new_block->mutable_data(), cur_data_, cur_size_); |
122 | std::memcpy(new_block->mutable_data() + cur_size_, new_data, new_size); |
123 | std::memset(new_block->mutable_data() + cur_size_ + new_size, 0, |
124 | rh.right_padding); |
125 | new_data = new_block->mutable_data(); |
126 | new_size = cur_size_ + new_size; |
127 | } |
128 | } |
129 | cur_block_ = new_block; |
130 | cur_data_ = new_data; |
131 | cur_size_ = new_size; |
132 | return Status::OK(); |
133 | } |
134 | |
135 | // Read header and column names from current block, create column builders |
136 | Status () { |
137 | DCHECK_GT(cur_size_, 0); |
138 | if (parse_options_.header_rows == 0) { |
139 | // TODO allow passing names and/or generate column numbers? |
140 | return Status::Invalid("header_rows == 0 needs explicit column names" ); |
141 | } |
142 | |
143 | BlockParser parser(pool_, parse_options_, num_cols_, parse_options_.header_rows); |
144 | |
145 | uint32_t parsed_size = 0; |
146 | RETURN_NOT_OK(parser.Parse(reinterpret_cast<const char*>(cur_data_), |
147 | static_cast<uint32_t>(cur_size_), &parsed_size)); |
148 | if (parser.num_rows() != parse_options_.header_rows) { |
149 | return Status::Invalid( |
150 | "Could not read header rows from CSV file, either " |
151 | "file is too short or header is larger than block size" ); |
152 | } |
153 | if (parser.num_cols() == 0) { |
154 | return Status::Invalid("No columns in CSV file" ); |
155 | } |
156 | num_cols_ = parser.num_cols(); |
157 | DCHECK_GT(num_cols_, 0); |
158 | |
159 | for (int32_t col_index = 0; col_index < num_cols_; ++col_index) { |
160 | auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status { |
161 | DCHECK_EQ(column_names_.size(), static_cast<uint32_t>(col_index)); |
162 | column_names_.emplace_back(reinterpret_cast<const char*>(data), size); |
163 | return Status::OK(); |
164 | }; |
165 | RETURN_NOT_OK(parser.VisitColumn(col_index, visit)); |
166 | std::shared_ptr<ColumnBuilder> builder; |
167 | // Does the named column have a fixed type? |
168 | auto it = convert_options_.column_types.find(column_names_[col_index]); |
169 | if (it == convert_options_.column_types.end()) { |
170 | RETURN_NOT_OK( |
171 | ColumnBuilder::Make(col_index, convert_options_, task_group_, &builder)); |
172 | } else { |
173 | RETURN_NOT_OK(ColumnBuilder::Make(it->second, col_index, convert_options_, |
174 | task_group_, &builder)); |
175 | } |
176 | column_builders_.push_back(builder); |
177 | } |
178 | |
179 | // Skip parsed header rows |
180 | cur_data_ += parsed_size; |
181 | cur_size_ -= parsed_size; |
182 | return Status::OK(); |
183 | } |
184 | |
185 | // Trigger conversion of parsed block data |
186 | Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) { |
187 | for (auto& builder : column_builders_) { |
188 | builder->Insert(block_index, parser); |
189 | } |
190 | return Status::OK(); |
191 | } |
192 | |
193 | Status MakeTable(std::shared_ptr<Table>* out) { |
194 | DCHECK_GT(num_cols_, 0); |
195 | DCHECK_EQ(column_names_.size(), static_cast<uint32_t>(num_cols_)); |
196 | DCHECK_EQ(column_builders_.size(), static_cast<uint32_t>(num_cols_)); |
197 | |
198 | std::vector<std::shared_ptr<Field>> fields; |
199 | std::vector<std::shared_ptr<Column>> columns; |
200 | |
201 | for (int32_t i = 0; i < num_cols_; ++i) { |
202 | std::shared_ptr<ChunkedArray> array; |
203 | RETURN_NOT_OK(column_builders_[i]->Finish(&array)); |
204 | columns.push_back(std::make_shared<Column>(column_names_[i], array)); |
205 | fields.push_back(columns.back()->field()); |
206 | } |
207 | *out = Table::Make(schema(fields), columns); |
208 | return Status::OK(); |
209 | } |
210 | |
211 | MemoryPool* pool_; |
212 | ReadOptions read_options_; |
213 | ParseOptions parse_options_; |
214 | ConvertOptions convert_options_; |
215 | |
216 | int32_t num_cols_ = -1; |
217 | std::shared_ptr<ReadaheadSpooler> readahead_; |
218 | // Column names |
219 | std::vector<std::string> column_names_; |
220 | std::shared_ptr<internal::TaskGroup> task_group_; |
221 | std::vector<std::shared_ptr<ColumnBuilder>> column_builders_; |
222 | |
223 | // Current block and data pointer |
224 | std::shared_ptr<Buffer> cur_block_; |
225 | const uint8_t* cur_data_ = nullptr; |
226 | int64_t cur_size_ = 0; |
227 | // Index of current block inside data stream |
228 | int64_t cur_block_index_ = 0; |
229 | // Whether there was a trailing CR at the end of last parsed line |
230 | bool trailing_cr_ = false; |
231 | // Whether we reached input stream EOF. There may still be data left to |
232 | // process in current block. |
233 | bool eof_ = false; |
234 | }; |
235 | |
236 | ///////////////////////////////////////////////////////////////////////// |
237 | // Serial TableReader implementation |
238 | |
239 | class SerialTableReader : public BaseTableReader { |
240 | public: |
241 | SerialTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream> input, |
242 | const ReadOptions& read_options, const ParseOptions& parse_options, |
243 | const ConvertOptions& convert_options) |
244 | : BaseTableReader(pool, read_options, parse_options, convert_options) { |
245 | // Since we're converting serially, no need to readahead more than one block |
246 | int32_t block_queue_size = 1; |
247 | readahead_ = std::make_shared<ReadaheadSpooler>( |
248 | pool_, input, read_options_.block_size, block_queue_size, kDefaultLeftPadding, |
249 | kDefaultRightPadding); |
250 | } |
251 | |
252 | Status Read(std::shared_ptr<Table>* out) { |
253 | task_group_ = internal::TaskGroup::MakeSerial(); |
254 | |
255 | // First block |
256 | RETURN_NOT_OK(ReadNextBlock()); |
257 | if (eof_) { |
258 | return Status::Invalid("Empty CSV file" ); |
259 | } |
260 | RETURN_NOT_OK(ProcessHeader()); |
261 | |
262 | static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max(); |
263 | auto parser = |
264 | std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows); |
265 | while (!eof_) { |
266 | // Consume current block |
267 | uint32_t parsed_size = 0; |
268 | RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(cur_data_), |
269 | static_cast<uint32_t>(cur_size_), &parsed_size)); |
270 | if (parser->num_rows() > 0) { |
271 | // Got some data |
272 | RETURN_NOT_OK(ProcessData(parser, cur_block_index_++)); |
273 | cur_data_ += parsed_size; |
274 | cur_size_ -= parsed_size; |
275 | if (!task_group_->ok()) { |
276 | // Conversion error => early exit |
277 | break; |
278 | } |
279 | } else { |
280 | // Need to fetch more data to get at least one row |
281 | RETURN_NOT_OK(ReadNextBlock()); |
282 | } |
283 | } |
284 | if (eof_ && cur_size_ > 0) { |
285 | // Parse remaining data |
286 | uint32_t parsed_size = 0; |
287 | RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_), |
288 | static_cast<uint32_t>(cur_size_), &parsed_size)); |
289 | if (parser->num_rows() > 0) { |
290 | RETURN_NOT_OK(ProcessData(parser, cur_block_index_++)); |
291 | } |
292 | } |
293 | |
294 | // Finish conversion, create schema and table |
295 | RETURN_NOT_OK(task_group_->Finish()); |
296 | return MakeTable(out); |
297 | } |
298 | }; |
299 | |
300 | ///////////////////////////////////////////////////////////////////////// |
301 | // Parallel TableReader implementation |
302 | |
303 | class ThreadedTableReader : public BaseTableReader { |
304 | public: |
305 | ThreadedTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream> input, |
306 | ThreadPool* thread_pool, const ReadOptions& read_options, |
307 | const ParseOptions& parse_options, |
308 | const ConvertOptions& convert_options) |
309 | : BaseTableReader(pool, read_options, parse_options, convert_options), |
310 | thread_pool_(thread_pool) { |
311 | // Readahead one block per worker thread |
312 | int32_t block_queue_size = thread_pool->GetCapacity(); |
313 | readahead_ = std::make_shared<ReadaheadSpooler>( |
314 | pool_, input, read_options_.block_size, block_queue_size, kDefaultLeftPadding, |
315 | kDefaultRightPadding); |
316 | } |
317 | |
318 | ~ThreadedTableReader() { |
319 | if (task_group_) { |
320 | // In case of error, make sure all pending tasks are finished before |
321 | // we start destroying BaseTableReader members |
322 | ARROW_UNUSED(task_group_->Finish()); |
323 | } |
324 | } |
325 | |
326 | Status Read(std::shared_ptr<Table>* out) { |
327 | task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_); |
328 | static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max(); |
329 | Chunker chunker(parse_options_); |
330 | |
331 | // Get first block and process header serially |
332 | RETURN_NOT_OK(ReadNextBlock()); |
333 | if (eof_) { |
334 | return Status::Invalid("Empty CSV file" ); |
335 | } |
336 | RETURN_NOT_OK(ProcessHeader()); |
337 | |
338 | while (!eof_ && task_group_->ok()) { |
339 | // Consume current chunk |
340 | uint32_t chunk_size = 0; |
341 | RETURN_NOT_OK(chunker.Process(reinterpret_cast<const char*>(cur_data_), |
342 | static_cast<uint32_t>(cur_size_), &chunk_size)); |
343 | if (chunk_size > 0) { |
344 | // Got a chunk of rows |
345 | const uint8_t* chunk_data = cur_data_; |
346 | std::shared_ptr<Buffer> chunk_buffer = cur_block_; |
347 | int64_t chunk_index = cur_block_index_; |
348 | |
349 | // "mutable" allows to modify captured by-copy chunk_buffer |
350 | task_group_->Append([=]() mutable -> Status { |
351 | auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, |
352 | max_num_rows); |
353 | uint32_t parsed_size = 0; |
354 | RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(chunk_data), |
355 | chunk_size, &parsed_size)); |
356 | if (parsed_size != chunk_size) { |
357 | DCHECK_EQ(parsed_size, chunk_size); |
358 | return Status::Invalid("Chunker and parser disagree on block size: " , |
359 | chunk_size, " vs " , parsed_size); |
360 | } |
361 | RETURN_NOT_OK(ProcessData(parser, chunk_index)); |
362 | // Keep chunk buffer alive within closure and release it at the end |
363 | chunk_buffer.reset(); |
364 | return Status::OK(); |
365 | }); |
366 | cur_data_ += chunk_size; |
367 | cur_size_ -= chunk_size; |
368 | cur_block_index_++; |
369 | } else { |
370 | // Need to fetch more data to get at least one row |
371 | RETURN_NOT_OK(ReadNextBlock()); |
372 | } |
373 | } |
374 | |
375 | // Finish all pending parallel tasks |
376 | RETURN_NOT_OK(task_group_->Finish()); |
377 | |
378 | if (eof_ && cur_size_ > 0) { |
379 | // Parse remaining data (serial) |
380 | task_group_ = internal::TaskGroup::MakeSerial(); |
381 | for (auto& builder : column_builders_) { |
382 | builder->SetTaskGroup(task_group_); |
383 | } |
384 | auto parser = |
385 | std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows); |
386 | uint32_t parsed_size = 0; |
387 | RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_), |
388 | static_cast<uint32_t>(cur_size_), &parsed_size)); |
389 | if (parser->num_rows() > 0) { |
390 | RETURN_NOT_OK(ProcessData(parser, cur_block_index_++)); |
391 | } |
392 | RETURN_NOT_OK(task_group_->Finish()); |
393 | } |
394 | |
395 | // Create schema and table |
396 | return MakeTable(out); |
397 | } |
398 | |
399 | protected: |
400 | ThreadPool* thread_pool_; |
401 | }; |
402 | |
403 | ///////////////////////////////////////////////////////////////////////// |
404 | // TableReader factory function |
405 | |
406 | Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input, |
407 | const ReadOptions& read_options, |
408 | const ParseOptions& parse_options, |
409 | const ConvertOptions& convert_options, |
410 | std::shared_ptr<TableReader>* out) { |
411 | std::shared_ptr<TableReader> result; |
412 | if (read_options.use_threads) { |
413 | result = std::make_shared<ThreadedTableReader>( |
414 | pool, input, GetCpuThreadPool(), read_options, parse_options, convert_options); |
415 | *out = result; |
416 | return Status::OK(); |
417 | } else { |
418 | result = std::make_shared<SerialTableReader>(pool, input, read_options, parse_options, |
419 | convert_options); |
420 | *out = result; |
421 | return Status::OK(); |
422 | } |
423 | } |
424 | |
425 | } // namespace csv |
426 | } // namespace arrow |
427 | |