1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/csv/parser.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdio> |
22 | #include <sstream> |
23 | #include <utility> |
24 | |
25 | #include "arrow/memory_pool.h" |
26 | #include "arrow/status.h" |
27 | #include "arrow/util/logging.h" |
28 | |
29 | namespace arrow { |
30 | namespace csv { |
31 | |
32 | static Status ParseError(const char* message) { |
33 | return Status::Invalid("CSV parse error: " , message); |
34 | } |
35 | |
36 | static Status MismatchingColumns(int32_t expected, int32_t actual) { |
37 | char s[50]; |
38 | snprintf(s, sizeof(s), "Expected %d columns, got %d" , expected, actual); |
39 | return ParseError(s); |
40 | } |
41 | |
42 | static inline bool IsControlChar(uint8_t c) { return c < ' '; } |
43 | |
44 | template <bool Quoting, bool Escaping> |
45 | class SpecializedOptions { |
46 | public: |
47 | static constexpr bool quoting = Quoting; |
48 | static constexpr bool escaping = Escaping; |
49 | }; |
50 | |
51 | // A helper class allocating the buffer for parsed values and writing into it |
52 | // without any further resizes, except at the end. |
53 | class BlockParser::PresizedParsedWriter { |
54 | public: |
55 | PresizedParsedWriter(MemoryPool* pool, uint32_t size) |
56 | : parsed_size_(0), parsed_capacity_(size) { |
57 | ARROW_CHECK_OK(AllocateResizableBuffer(pool, parsed_capacity_, &parsed_buffer_)); |
58 | parsed_ = parsed_buffer_->mutable_data(); |
59 | } |
60 | |
61 | void Finish(std::shared_ptr<Buffer>* out_parsed) { |
62 | ARROW_CHECK_OK(parsed_buffer_->Resize(parsed_size_)); |
63 | *out_parsed = parsed_buffer_; |
64 | } |
65 | |
66 | void BeginLine() { saved_parsed_size_ = parsed_size_; } |
67 | |
68 | void PushFieldChar(char c) { |
69 | DCHECK_LT(parsed_size_, parsed_capacity_); |
70 | parsed_[parsed_size_++] = static_cast<uint8_t>(c); |
71 | } |
72 | |
73 | // Rollback the state that was saved in BeginLine() |
74 | void RollbackLine() { parsed_size_ = saved_parsed_size_; } |
75 | |
76 | int64_t size() { return parsed_size_; } |
77 | |
78 | protected: |
79 | std::shared_ptr<ResizableBuffer> parsed_buffer_; |
80 | uint8_t* parsed_; |
81 | int64_t parsed_size_; |
82 | int64_t parsed_capacity_; |
83 | // Checkpointing, for when an incomplete line is encountered at end of block |
84 | int64_t saved_parsed_size_; |
85 | }; |
86 | |
87 | // A helper class handling a growable buffer for values offsets. This class is |
88 | // used when the number of columns is not yet known and we therefore cannot |
89 | // efficiently presize the target area for a given number of rows. |
90 | class BlockParser::ResizableValuesWriter { |
91 | public: |
92 | explicit ResizableValuesWriter(MemoryPool* pool) |
93 | : values_size_(0), values_capacity_(256) { |
94 | ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_), |
95 | &values_buffer_)); |
96 | values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data()); |
97 | } |
98 | |
99 | template <typename ParsedWriter> |
100 | void Start(ParsedWriter& parsed_writer) { |
101 | PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false}); |
102 | } |
103 | |
104 | void Finish(std::shared_ptr<Buffer>* out_values) { |
105 | ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_))); |
106 | *out_values = values_buffer_; |
107 | } |
108 | |
109 | void BeginLine() { saved_values_size_ = values_size_; } |
110 | |
111 | void StartField(bool quoted) { quoted_ = quoted; } |
112 | |
113 | template <typename ParsedWriter> |
114 | void FinishField(ParsedWriter* parsed_writer) { |
115 | PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_}); |
116 | } |
117 | |
118 | // Rollback the state that was saved in BeginLine() |
119 | void RollbackLine() { values_size_ = saved_values_size_; } |
120 | |
121 | protected: |
122 | void PushValue(ValueDesc v) { |
123 | if (ARROW_PREDICT_FALSE(values_size_ == values_capacity_)) { |
124 | values_capacity_ = values_capacity_ * 2; |
125 | ARROW_CHECK_OK(values_buffer_->Resize(values_capacity_ * sizeof(*values_))); |
126 | values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data()); |
127 | } |
128 | values_[values_size_++] = v; |
129 | } |
130 | |
131 | std::shared_ptr<ResizableBuffer> values_buffer_; |
132 | ValueDesc* values_; |
133 | int64_t values_size_; |
134 | int64_t values_capacity_; |
135 | bool quoted_; |
136 | // Checkpointing, for when an incomplete line is encountered at end of block |
137 | int64_t saved_values_size_; |
138 | }; |
139 | |
140 | // A helper class allocating the buffer for values offsets and writing into it |
141 | // without any further resizes, except at the end. This class is used once the |
142 | // number of columns is known, as it eliminates resizes and generates simpler, |
143 | // faster CSV parsing code. |
144 | class BlockParser::PresizedValuesWriter { |
145 | public: |
146 | PresizedValuesWriter(MemoryPool* pool, int32_t num_rows, int32_t num_cols) |
147 | : values_size_(0), values_capacity_(1 + num_rows * num_cols) { |
148 | ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_), |
149 | &values_buffer_)); |
150 | values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data()); |
151 | } |
152 | |
153 | template <typename ParsedWriter> |
154 | void Start(ParsedWriter& parsed_writer) { |
155 | PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false}); |
156 | } |
157 | |
158 | void Finish(std::shared_ptr<Buffer>* out_values) { |
159 | ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_))); |
160 | *out_values = values_buffer_; |
161 | } |
162 | |
163 | void BeginLine() { saved_values_size_ = values_size_; } |
164 | |
165 | void StartField(bool quoted) { quoted_ = quoted; } |
166 | |
167 | template <typename ParsedWriter> |
168 | void FinishField(ParsedWriter* parsed_writer) { |
169 | PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_}); |
170 | } |
171 | |
172 | // Rollback the state that was saved in BeginLine() |
173 | void RollbackLine() { values_size_ = saved_values_size_; } |
174 | |
175 | protected: |
176 | void PushValue(ValueDesc v) { |
177 | DCHECK_LT(values_size_, values_capacity_); |
178 | values_[values_size_++] = v; |
179 | } |
180 | |
181 | std::shared_ptr<ResizableBuffer> values_buffer_; |
182 | ValueDesc* values_; |
183 | int64_t values_size_; |
184 | const int64_t values_capacity_; |
185 | bool quoted_; |
186 | // Checkpointing, for when an incomplete line is encountered at end of block |
187 | int64_t saved_values_size_; |
188 | }; |
189 | |
190 | template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter> |
191 | Status BlockParser::ParseLine(ValuesWriter* values_writer, ParsedWriter* parsed_writer, |
192 | const char* data, const char* data_end, bool is_final, |
193 | const char** out_data) { |
194 | int32_t num_cols = 0; |
195 | char c; |
196 | |
197 | DCHECK_GT(data_end, data); |
198 | |
199 | auto FinishField = [&]() { values_writer->FinishField(parsed_writer); }; |
200 | |
201 | values_writer->BeginLine(); |
202 | parsed_writer->BeginLine(); |
203 | |
204 | // The parsing state machine |
205 | |
206 | // Special case empty lines: do we start with a newline separator? |
207 | c = *data; |
208 | if (ARROW_PREDICT_FALSE(IsControlChar(c)) && options_.ignore_empty_lines) { |
209 | if (c == '\r') { |
210 | data++; |
211 | if (data < data_end && *data == '\n') { |
212 | data++; |
213 | } |
214 | goto EmptyLine; |
215 | } |
216 | if (c == '\n') { |
217 | data++; |
218 | goto EmptyLine; |
219 | } |
220 | } |
221 | |
222 | FieldStart: |
223 | // At the start of a field |
224 | // Quoting is only recognized at start of field |
225 | if (SpecializedOptions::quoting && ARROW_PREDICT_FALSE(*data == options_.quote_char)) { |
226 | ++data; |
227 | values_writer->StartField(true /* quoted */); |
228 | goto InQuotedField; |
229 | } else { |
230 | values_writer->StartField(false /* quoted */); |
231 | goto InField; |
232 | } |
233 | |
234 | InField: |
235 | // Inside a non-quoted part of a field |
236 | if (ARROW_PREDICT_FALSE(data == data_end)) { |
237 | goto AbortLine; |
238 | } |
239 | c = *data++; |
240 | if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) { |
241 | if (ARROW_PREDICT_FALSE(data == data_end)) { |
242 | goto AbortLine; |
243 | } |
244 | c = *data++; |
245 | parsed_writer->PushFieldChar(c); |
246 | goto InField; |
247 | } |
248 | if (ARROW_PREDICT_FALSE(c == options_.delimiter)) { |
249 | goto FieldEnd; |
250 | } |
251 | if (ARROW_PREDICT_FALSE(IsControlChar(c))) { |
252 | if (c == '\r') { |
253 | // In the middle of a newline separator? |
254 | if (ARROW_PREDICT_TRUE(data < data_end) && *data == '\n') { |
255 | data++; |
256 | } |
257 | goto LineEnd; |
258 | } |
259 | if (c == '\n') { |
260 | goto LineEnd; |
261 | } |
262 | } |
263 | parsed_writer->PushFieldChar(c); |
264 | goto InField; |
265 | |
266 | InQuotedField: |
267 | // Inside a quoted part of a field |
268 | if (ARROW_PREDICT_FALSE(data == data_end)) { |
269 | goto AbortLine; |
270 | } |
271 | c = *data++; |
272 | if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) { |
273 | if (ARROW_PREDICT_FALSE(data == data_end)) { |
274 | goto AbortLine; |
275 | } |
276 | c = *data++; |
277 | parsed_writer->PushFieldChar(c); |
278 | goto InQuotedField; |
279 | } |
280 | if (ARROW_PREDICT_FALSE(c == options_.quote_char)) { |
281 | if (options_.double_quote && ARROW_PREDICT_TRUE(data < data_end) && |
282 | ARROW_PREDICT_FALSE(*data == options_.quote_char)) { |
283 | // Double-quoting |
284 | ++data; |
285 | } else { |
286 | // End of single-quoting |
287 | goto InField; |
288 | } |
289 | } |
290 | parsed_writer->PushFieldChar(c); |
291 | goto InQuotedField; |
292 | |
293 | FieldEnd: |
294 | // At the end of a field |
295 | FinishField(); |
296 | ++num_cols; |
297 | if (ARROW_PREDICT_FALSE(data == data_end)) { |
298 | goto AbortLine; |
299 | } |
300 | goto FieldStart; |
301 | |
302 | LineEnd: |
303 | // At the end of line |
304 | FinishField(); |
305 | ++num_cols; |
306 | if (ARROW_PREDICT_FALSE(num_cols != num_cols_)) { |
307 | if (num_cols_ == -1) { |
308 | num_cols_ = num_cols; |
309 | } else { |
310 | return MismatchingColumns(num_cols_, num_cols); |
311 | } |
312 | } |
313 | ++num_rows_; |
314 | *out_data = data; |
315 | return Status::OK(); |
316 | |
317 | AbortLine: |
318 | // Not a full line except perhaps if in final block |
319 | if (is_final) { |
320 | FinishField(); |
321 | ++num_cols; |
322 | if (num_cols_ == -1) { |
323 | num_cols_ = num_cols; |
324 | } else if (num_cols != num_cols_) { |
325 | return MismatchingColumns(num_cols_, num_cols); |
326 | } |
327 | ++num_rows_; |
328 | *out_data = data; |
329 | return Status::OK(); |
330 | } |
331 | // Truncated line at end of block, rewind parsed state |
332 | values_writer->RollbackLine(); |
333 | parsed_writer->RollbackLine(); |
334 | return Status::OK(); |
335 | |
336 | EmptyLine: |
337 | *out_data = data; |
338 | return Status::OK(); |
339 | } |
340 | |
341 | template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter> |
342 | Status BlockParser::ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed_writer, |
343 | const char* data, const char* data_end, bool is_final, |
344 | int32_t rows_in_chunk, const char** out_data, |
345 | bool* finished_parsing) { |
346 | while (data < data_end && rows_in_chunk > 0) { |
347 | const char* line_end = data; |
348 | RETURN_NOT_OK(ParseLine<SpecializedOptions>(values_writer, parsed_writer, data, |
349 | data_end, is_final, &line_end)); |
350 | if (line_end == data) { |
351 | // Cannot parse any further |
352 | *finished_parsing = true; |
353 | break; |
354 | } |
355 | data = line_end; |
356 | // This will pessimize chunk size a bit if there are empty lines, |
357 | // but that shouldn't be important |
358 | --rows_in_chunk; |
359 | } |
360 | // Append new buffers and update size |
361 | std::shared_ptr<Buffer> values_buffer; |
362 | values_writer->Finish(&values_buffer); |
363 | if (values_buffer->size() > 0) { |
364 | values_size_ += static_cast<int32_t>(values_buffer->size() / sizeof(ValueDesc) - 1); |
365 | values_buffers_.push_back(std::move(values_buffer)); |
366 | } |
367 | *out_data = data; |
368 | return Status::OK(); |
369 | } |
370 | |
371 | template <typename SpecializedOptions> |
372 | Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is_final, |
373 | uint32_t* out_size) { |
374 | num_rows_ = 0; |
375 | values_size_ = 0; |
376 | parsed_size_ = 0; |
377 | values_buffers_.clear(); |
378 | parsed_buffer_.reset(); |
379 | parsed_ = nullptr; |
380 | |
381 | const char* data = start; |
382 | const char* data_end = start + size; |
383 | bool finished_parsing = false; |
384 | |
385 | PresizedParsedWriter parsed_writer(pool_, size); |
386 | |
387 | if (num_cols_ == -1) { |
388 | // Can't presize values when the number of columns is not known, first parse |
389 | // a single line |
390 | const int32_t rows_in_chunk = 1; |
391 | ResizableValuesWriter values_writer(pool_); |
392 | values_writer.Start(parsed_writer); |
393 | |
394 | RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data, |
395 | data_end, is_final, rows_in_chunk, &data, |
396 | &finished_parsing)); |
397 | if (num_cols_ == -1) { |
398 | return ParseError("Empty CSV file or block: cannot infer number of columns" ); |
399 | } |
400 | } |
401 | while (!finished_parsing && data < data_end && num_rows_ < max_num_rows_) { |
402 | // We know the number of columns, so can presize a values array for |
403 | // a given number of rows |
404 | DCHECK_GE(num_cols_, 0); |
405 | |
406 | int32_t rows_in_chunk; |
407 | if (num_cols_ > 0) { |
408 | rows_in_chunk = std::min(32768 / num_cols_, max_num_rows_ - num_rows_); |
409 | } else { |
410 | rows_in_chunk = std::min(32768, max_num_rows_ - num_rows_); |
411 | } |
412 | |
413 | PresizedValuesWriter values_writer(pool_, rows_in_chunk, num_cols_); |
414 | values_writer.Start(parsed_writer); |
415 | |
416 | RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data, |
417 | data_end, is_final, rows_in_chunk, &data, |
418 | &finished_parsing)); |
419 | } |
420 | |
421 | parsed_writer.Finish(&parsed_buffer_); |
422 | parsed_size_ = static_cast<int32_t>(parsed_buffer_->size()); |
423 | parsed_ = parsed_buffer_->data(); |
424 | |
425 | DCHECK_EQ(values_size_, num_rows_ * num_cols_); |
426 | if (num_cols_ == -1) { |
427 | DCHECK_EQ(num_rows_, 0); |
428 | } |
429 | #ifndef NDEBUG |
430 | if (num_rows_ > 0) { |
431 | DCHECK_GT(values_buffers_.size(), 0); |
432 | auto& last_values_buffer = values_buffers_.back(); |
433 | auto last_values = reinterpret_cast<const ValueDesc*>(last_values_buffer->data()); |
434 | auto last_values_size = last_values_buffer->size() / sizeof(ValueDesc); |
435 | auto check_parsed_size = |
436 | static_cast<int32_t>(last_values[last_values_size - 1].offset); |
437 | DCHECK_EQ(parsed_size_, check_parsed_size); |
438 | } else { |
439 | DCHECK_EQ(parsed_size_, 0); |
440 | } |
441 | #endif |
442 | *out_size = static_cast<uint32_t>(data - start); |
443 | return Status::OK(); |
444 | } |
445 | |
446 | Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final, |
447 | uint32_t* out_size) { |
448 | if (options_.quoting) { |
449 | if (options_.escaping) { |
450 | return DoParseSpecialized<SpecializedOptions<true, true>>(start, size, is_final, |
451 | out_size); |
452 | } else { |
453 | return DoParseSpecialized<SpecializedOptions<true, false>>(start, size, is_final, |
454 | out_size); |
455 | } |
456 | } else { |
457 | if (options_.escaping) { |
458 | return DoParseSpecialized<SpecializedOptions<false, true>>(start, size, is_final, |
459 | out_size); |
460 | } else { |
461 | return DoParseSpecialized<SpecializedOptions<false, false>>(start, size, is_final, |
462 | out_size); |
463 | } |
464 | } |
465 | } |
466 | |
467 | Status BlockParser::Parse(const char* data, uint32_t size, uint32_t* out_size) { |
468 | return DoParse(data, size, false /* is_final */, out_size); |
469 | } |
470 | |
471 | Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_size) { |
472 | return DoParse(data, size, true /* is_final */, out_size); |
473 | } |
474 | |
475 | BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols, |
476 | int32_t max_num_rows) |
477 | : pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {} |
478 | |
479 | BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows) |
480 | : BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {} |
481 | |
482 | } // namespace csv |
483 | } // namespace arrow |
484 | |