1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/csv/parser.h"
19
20#include <algorithm>
21#include <cstdio>
22#include <sstream>
23#include <utility>
24
25#include "arrow/memory_pool.h"
26#include "arrow/status.h"
27#include "arrow/util/logging.h"
28
29namespace arrow {
30namespace csv {
31
32static Status ParseError(const char* message) {
33 return Status::Invalid("CSV parse error: ", message);
34}
35
36static Status MismatchingColumns(int32_t expected, int32_t actual) {
37 char s[50];
38 snprintf(s, sizeof(s), "Expected %d columns, got %d", expected, actual);
39 return ParseError(s);
40}
41
42static inline bool IsControlChar(uint8_t c) { return c < ' '; }
43
44template <bool Quoting, bool Escaping>
45class SpecializedOptions {
46 public:
47 static constexpr bool quoting = Quoting;
48 static constexpr bool escaping = Escaping;
49};
50
51// A helper class allocating the buffer for parsed values and writing into it
52// without any further resizes, except at the end.
53class BlockParser::PresizedParsedWriter {
54 public:
55 PresizedParsedWriter(MemoryPool* pool, uint32_t size)
56 : parsed_size_(0), parsed_capacity_(size) {
57 ARROW_CHECK_OK(AllocateResizableBuffer(pool, parsed_capacity_, &parsed_buffer_));
58 parsed_ = parsed_buffer_->mutable_data();
59 }
60
61 void Finish(std::shared_ptr<Buffer>* out_parsed) {
62 ARROW_CHECK_OK(parsed_buffer_->Resize(parsed_size_));
63 *out_parsed = parsed_buffer_;
64 }
65
66 void BeginLine() { saved_parsed_size_ = parsed_size_; }
67
68 void PushFieldChar(char c) {
69 DCHECK_LT(parsed_size_, parsed_capacity_);
70 parsed_[parsed_size_++] = static_cast<uint8_t>(c);
71 }
72
73 // Rollback the state that was saved in BeginLine()
74 void RollbackLine() { parsed_size_ = saved_parsed_size_; }
75
76 int64_t size() { return parsed_size_; }
77
78 protected:
79 std::shared_ptr<ResizableBuffer> parsed_buffer_;
80 uint8_t* parsed_;
81 int64_t parsed_size_;
82 int64_t parsed_capacity_;
83 // Checkpointing, for when an incomplete line is encountered at end of block
84 int64_t saved_parsed_size_;
85};
86
87// A helper class handling a growable buffer for values offsets. This class is
88// used when the number of columns is not yet known and we therefore cannot
89// efficiently presize the target area for a given number of rows.
90class BlockParser::ResizableValuesWriter {
91 public:
92 explicit ResizableValuesWriter(MemoryPool* pool)
93 : values_size_(0), values_capacity_(256) {
94 ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
95 &values_buffer_));
96 values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
97 }
98
99 template <typename ParsedWriter>
100 void Start(ParsedWriter& parsed_writer) {
101 PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
102 }
103
104 void Finish(std::shared_ptr<Buffer>* out_values) {
105 ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
106 *out_values = values_buffer_;
107 }
108
109 void BeginLine() { saved_values_size_ = values_size_; }
110
111 void StartField(bool quoted) { quoted_ = quoted; }
112
113 template <typename ParsedWriter>
114 void FinishField(ParsedWriter* parsed_writer) {
115 PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
116 }
117
118 // Rollback the state that was saved in BeginLine()
119 void RollbackLine() { values_size_ = saved_values_size_; }
120
121 protected:
122 void PushValue(ValueDesc v) {
123 if (ARROW_PREDICT_FALSE(values_size_ == values_capacity_)) {
124 values_capacity_ = values_capacity_ * 2;
125 ARROW_CHECK_OK(values_buffer_->Resize(values_capacity_ * sizeof(*values_)));
126 values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
127 }
128 values_[values_size_++] = v;
129 }
130
131 std::shared_ptr<ResizableBuffer> values_buffer_;
132 ValueDesc* values_;
133 int64_t values_size_;
134 int64_t values_capacity_;
135 bool quoted_;
136 // Checkpointing, for when an incomplete line is encountered at end of block
137 int64_t saved_values_size_;
138};
139
140// A helper class allocating the buffer for values offsets and writing into it
141// without any further resizes, except at the end. This class is used once the
142// number of columns is known, as it eliminates resizes and generates simpler,
143// faster CSV parsing code.
144class BlockParser::PresizedValuesWriter {
145 public:
146 PresizedValuesWriter(MemoryPool* pool, int32_t num_rows, int32_t num_cols)
147 : values_size_(0), values_capacity_(1 + num_rows * num_cols) {
148 ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
149 &values_buffer_));
150 values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
151 }
152
153 template <typename ParsedWriter>
154 void Start(ParsedWriter& parsed_writer) {
155 PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
156 }
157
158 void Finish(std::shared_ptr<Buffer>* out_values) {
159 ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
160 *out_values = values_buffer_;
161 }
162
163 void BeginLine() { saved_values_size_ = values_size_; }
164
165 void StartField(bool quoted) { quoted_ = quoted; }
166
167 template <typename ParsedWriter>
168 void FinishField(ParsedWriter* parsed_writer) {
169 PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
170 }
171
172 // Rollback the state that was saved in BeginLine()
173 void RollbackLine() { values_size_ = saved_values_size_; }
174
175 protected:
176 void PushValue(ValueDesc v) {
177 DCHECK_LT(values_size_, values_capacity_);
178 values_[values_size_++] = v;
179 }
180
181 std::shared_ptr<ResizableBuffer> values_buffer_;
182 ValueDesc* values_;
183 int64_t values_size_;
184 const int64_t values_capacity_;
185 bool quoted_;
186 // Checkpointing, for when an incomplete line is encountered at end of block
187 int64_t saved_values_size_;
188};
189
190template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
191Status BlockParser::ParseLine(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
192 const char* data, const char* data_end, bool is_final,
193 const char** out_data) {
194 int32_t num_cols = 0;
195 char c;
196
197 DCHECK_GT(data_end, data);
198
199 auto FinishField = [&]() { values_writer->FinishField(parsed_writer); };
200
201 values_writer->BeginLine();
202 parsed_writer->BeginLine();
203
204 // The parsing state machine
205
206 // Special case empty lines: do we start with a newline separator?
207 c = *data;
208 if (ARROW_PREDICT_FALSE(IsControlChar(c)) && options_.ignore_empty_lines) {
209 if (c == '\r') {
210 data++;
211 if (data < data_end && *data == '\n') {
212 data++;
213 }
214 goto EmptyLine;
215 }
216 if (c == '\n') {
217 data++;
218 goto EmptyLine;
219 }
220 }
221
222FieldStart:
223 // At the start of a field
224 // Quoting is only recognized at start of field
225 if (SpecializedOptions::quoting && ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
226 ++data;
227 values_writer->StartField(true /* quoted */);
228 goto InQuotedField;
229 } else {
230 values_writer->StartField(false /* quoted */);
231 goto InField;
232 }
233
234InField:
235 // Inside a non-quoted part of a field
236 if (ARROW_PREDICT_FALSE(data == data_end)) {
237 goto AbortLine;
238 }
239 c = *data++;
240 if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
241 if (ARROW_PREDICT_FALSE(data == data_end)) {
242 goto AbortLine;
243 }
244 c = *data++;
245 parsed_writer->PushFieldChar(c);
246 goto InField;
247 }
248 if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
249 goto FieldEnd;
250 }
251 if (ARROW_PREDICT_FALSE(IsControlChar(c))) {
252 if (c == '\r') {
253 // In the middle of a newline separator?
254 if (ARROW_PREDICT_TRUE(data < data_end) && *data == '\n') {
255 data++;
256 }
257 goto LineEnd;
258 }
259 if (c == '\n') {
260 goto LineEnd;
261 }
262 }
263 parsed_writer->PushFieldChar(c);
264 goto InField;
265
266InQuotedField:
267 // Inside a quoted part of a field
268 if (ARROW_PREDICT_FALSE(data == data_end)) {
269 goto AbortLine;
270 }
271 c = *data++;
272 if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
273 if (ARROW_PREDICT_FALSE(data == data_end)) {
274 goto AbortLine;
275 }
276 c = *data++;
277 parsed_writer->PushFieldChar(c);
278 goto InQuotedField;
279 }
280 if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
281 if (options_.double_quote && ARROW_PREDICT_TRUE(data < data_end) &&
282 ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
283 // Double-quoting
284 ++data;
285 } else {
286 // End of single-quoting
287 goto InField;
288 }
289 }
290 parsed_writer->PushFieldChar(c);
291 goto InQuotedField;
292
293FieldEnd:
294 // At the end of a field
295 FinishField();
296 ++num_cols;
297 if (ARROW_PREDICT_FALSE(data == data_end)) {
298 goto AbortLine;
299 }
300 goto FieldStart;
301
302LineEnd:
303 // At the end of line
304 FinishField();
305 ++num_cols;
306 if (ARROW_PREDICT_FALSE(num_cols != num_cols_)) {
307 if (num_cols_ == -1) {
308 num_cols_ = num_cols;
309 } else {
310 return MismatchingColumns(num_cols_, num_cols);
311 }
312 }
313 ++num_rows_;
314 *out_data = data;
315 return Status::OK();
316
317AbortLine:
318 // Not a full line except perhaps if in final block
319 if (is_final) {
320 FinishField();
321 ++num_cols;
322 if (num_cols_ == -1) {
323 num_cols_ = num_cols;
324 } else if (num_cols != num_cols_) {
325 return MismatchingColumns(num_cols_, num_cols);
326 }
327 ++num_rows_;
328 *out_data = data;
329 return Status::OK();
330 }
331 // Truncated line at end of block, rewind parsed state
332 values_writer->RollbackLine();
333 parsed_writer->RollbackLine();
334 return Status::OK();
335
336EmptyLine:
337 *out_data = data;
338 return Status::OK();
339}
340
341template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
342Status BlockParser::ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
343 const char* data, const char* data_end, bool is_final,
344 int32_t rows_in_chunk, const char** out_data,
345 bool* finished_parsing) {
346 while (data < data_end && rows_in_chunk > 0) {
347 const char* line_end = data;
348 RETURN_NOT_OK(ParseLine<SpecializedOptions>(values_writer, parsed_writer, data,
349 data_end, is_final, &line_end));
350 if (line_end == data) {
351 // Cannot parse any further
352 *finished_parsing = true;
353 break;
354 }
355 data = line_end;
356 // This will pessimize chunk size a bit if there are empty lines,
357 // but that shouldn't be important
358 --rows_in_chunk;
359 }
360 // Append new buffers and update size
361 std::shared_ptr<Buffer> values_buffer;
362 values_writer->Finish(&values_buffer);
363 if (values_buffer->size() > 0) {
364 values_size_ += static_cast<int32_t>(values_buffer->size() / sizeof(ValueDesc) - 1);
365 values_buffers_.push_back(std::move(values_buffer));
366 }
367 *out_data = data;
368 return Status::OK();
369}
370
371template <typename SpecializedOptions>
372Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is_final,
373 uint32_t* out_size) {
374 num_rows_ = 0;
375 values_size_ = 0;
376 parsed_size_ = 0;
377 values_buffers_.clear();
378 parsed_buffer_.reset();
379 parsed_ = nullptr;
380
381 const char* data = start;
382 const char* data_end = start + size;
383 bool finished_parsing = false;
384
385 PresizedParsedWriter parsed_writer(pool_, size);
386
387 if (num_cols_ == -1) {
388 // Can't presize values when the number of columns is not known, first parse
389 // a single line
390 const int32_t rows_in_chunk = 1;
391 ResizableValuesWriter values_writer(pool_);
392 values_writer.Start(parsed_writer);
393
394 RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
395 data_end, is_final, rows_in_chunk, &data,
396 &finished_parsing));
397 if (num_cols_ == -1) {
398 return ParseError("Empty CSV file or block: cannot infer number of columns");
399 }
400 }
401 while (!finished_parsing && data < data_end && num_rows_ < max_num_rows_) {
402 // We know the number of columns, so can presize a values array for
403 // a given number of rows
404 DCHECK_GE(num_cols_, 0);
405
406 int32_t rows_in_chunk;
407 if (num_cols_ > 0) {
408 rows_in_chunk = std::min(32768 / num_cols_, max_num_rows_ - num_rows_);
409 } else {
410 rows_in_chunk = std::min(32768, max_num_rows_ - num_rows_);
411 }
412
413 PresizedValuesWriter values_writer(pool_, rows_in_chunk, num_cols_);
414 values_writer.Start(parsed_writer);
415
416 RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
417 data_end, is_final, rows_in_chunk, &data,
418 &finished_parsing));
419 }
420
421 parsed_writer.Finish(&parsed_buffer_);
422 parsed_size_ = static_cast<int32_t>(parsed_buffer_->size());
423 parsed_ = parsed_buffer_->data();
424
425 DCHECK_EQ(values_size_, num_rows_ * num_cols_);
426 if (num_cols_ == -1) {
427 DCHECK_EQ(num_rows_, 0);
428 }
429#ifndef NDEBUG
430 if (num_rows_ > 0) {
431 DCHECK_GT(values_buffers_.size(), 0);
432 auto& last_values_buffer = values_buffers_.back();
433 auto last_values = reinterpret_cast<const ValueDesc*>(last_values_buffer->data());
434 auto last_values_size = last_values_buffer->size() / sizeof(ValueDesc);
435 auto check_parsed_size =
436 static_cast<int32_t>(last_values[last_values_size - 1].offset);
437 DCHECK_EQ(parsed_size_, check_parsed_size);
438 } else {
439 DCHECK_EQ(parsed_size_, 0);
440 }
441#endif
442 *out_size = static_cast<uint32_t>(data - start);
443 return Status::OK();
444}
445
446Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final,
447 uint32_t* out_size) {
448 if (options_.quoting) {
449 if (options_.escaping) {
450 return DoParseSpecialized<SpecializedOptions<true, true>>(start, size, is_final,
451 out_size);
452 } else {
453 return DoParseSpecialized<SpecializedOptions<true, false>>(start, size, is_final,
454 out_size);
455 }
456 } else {
457 if (options_.escaping) {
458 return DoParseSpecialized<SpecializedOptions<false, true>>(start, size, is_final,
459 out_size);
460 } else {
461 return DoParseSpecialized<SpecializedOptions<false, false>>(start, size, is_final,
462 out_size);
463 }
464 }
465}
466
467Status BlockParser::Parse(const char* data, uint32_t size, uint32_t* out_size) {
468 return DoParse(data, size, false /* is_final */, out_size);
469}
470
471Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_size) {
472 return DoParse(data, size, true /* is_final */, out_size);
473}
474
475BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols,
476 int32_t max_num_rows)
477 : pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
478
479BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
480 : BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {}
481
482} // namespace csv
483} // namespace arrow
484