1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/csv/reader.h"
19
20#include <cstdint>
21#include <cstring>
22#include <limits>
23#include <memory>
24#include <sstream>
25#include <string>
26#include <unordered_map>
27#include <utility>
28#include <vector>
29
30#include "arrow/buffer.h"
31#include "arrow/csv/chunker.h"
32#include "arrow/csv/column-builder.h"
33#include "arrow/csv/options.h"
34#include "arrow/csv/parser.h"
35#include "arrow/io/readahead.h"
36#include "arrow/status.h"
37#include "arrow/table.h"
38#include "arrow/type.h"
39#include "arrow/util/logging.h"
40#include "arrow/util/macros.h"
41#include "arrow/util/task-group.h"
42#include "arrow/util/thread-pool.h"
43
44namespace arrow {
45
46class MemoryPool;
47
48namespace io {
49
50class InputStream;
51
52} // namespace io
53
54namespace csv {
55
56using internal::GetCpuThreadPool;
57using internal::ThreadPool;
58using io::internal::ReadaheadBuffer;
59using io::internal::ReadaheadSpooler;
60
61static constexpr int64_t kDefaultLeftPadding = 2048; // 2 kB
62static constexpr int64_t kDefaultRightPadding = 16;
63
64/////////////////////////////////////////////////////////////////////////
65// Base class for common functionality
66
67class BaseTableReader : public csv::TableReader {
68 public:
69 BaseTableReader(MemoryPool* pool, const ReadOptions& read_options,
70 const ParseOptions& parse_options,
71 const ConvertOptions& convert_options)
72 : pool_(pool),
73 read_options_(read_options),
74 parse_options_(parse_options),
75 convert_options_(convert_options) {}
76
77 protected:
78 // Read a next data block, stitch it to trailing data
79 Status ReadNextBlock() {
80 bool trailing_data = cur_size_ > 0;
81 ReadaheadBuffer rh;
82
83 if (trailing_data) {
84 if (readahead_->GetLeftPadding() < cur_size_) {
85 // Growth heuristic to try and ensure sufficient left padding
86 // in subsequent reads
87 readahead_->SetLeftPadding(cur_size_ * 3 / 2);
88 }
89 }
90
91 RETURN_NOT_OK(readahead_->Read(&rh));
92 if (!rh.buffer) {
93 // EOF, let caller finish with existing data
94 eof_ = true;
95 return Status::OK();
96 }
97
98 std::shared_ptr<Buffer> new_block = rh.buffer;
99 uint8_t* new_data = rh.buffer->mutable_data() + rh.left_padding;
100 int64_t new_size = rh.buffer->size() - rh.left_padding - rh.right_padding;
101 DCHECK_GT(new_size, 0); // ensured by ReadaheadSpooler
102
103 if (trailing_cr_ && new_data[0] == '\n') {
104 // Skip '\r\n' line separator that started at the end of previous block
105 ++new_data;
106 --new_size;
107 }
108 trailing_cr_ = (new_data[new_size - 1] == '\r');
109
110 if (trailing_data) {
111 // Try to copy trailing data at the beginning of new block
112 if (cur_size_ <= rh.left_padding) {
113 // Can left-extend new block inside padding area
114 new_data -= cur_size_;
115 new_size += cur_size_;
116 std::memcpy(new_data, cur_data_, cur_size_);
117 } else {
118 // Need to allocate bigger block and concatenate trailing + present data
119 RETURN_NOT_OK(
120 AllocateBuffer(pool_, cur_size_ + new_size + rh.right_padding, &new_block));
121 std::memcpy(new_block->mutable_data(), cur_data_, cur_size_);
122 std::memcpy(new_block->mutable_data() + cur_size_, new_data, new_size);
123 std::memset(new_block->mutable_data() + cur_size_ + new_size, 0,
124 rh.right_padding);
125 new_data = new_block->mutable_data();
126 new_size = cur_size_ + new_size;
127 }
128 }
129 cur_block_ = new_block;
130 cur_data_ = new_data;
131 cur_size_ = new_size;
132 return Status::OK();
133 }
134
135 // Read header and column names from current block, create column builders
136 Status ProcessHeader() {
137 DCHECK_GT(cur_size_, 0);
138 if (parse_options_.header_rows == 0) {
139 // TODO allow passing names and/or generate column numbers?
140 return Status::Invalid("header_rows == 0 needs explicit column names");
141 }
142
143 BlockParser parser(pool_, parse_options_, num_cols_, parse_options_.header_rows);
144
145 uint32_t parsed_size = 0;
146 RETURN_NOT_OK(parser.Parse(reinterpret_cast<const char*>(cur_data_),
147 static_cast<uint32_t>(cur_size_), &parsed_size));
148 if (parser.num_rows() != parse_options_.header_rows) {
149 return Status::Invalid(
150 "Could not read header rows from CSV file, either "
151 "file is too short or header is larger than block size");
152 }
153 if (parser.num_cols() == 0) {
154 return Status::Invalid("No columns in CSV file");
155 }
156 num_cols_ = parser.num_cols();
157 DCHECK_GT(num_cols_, 0);
158
159 for (int32_t col_index = 0; col_index < num_cols_; ++col_index) {
160 auto visit = [&](const uint8_t* data, uint32_t size, bool quoted) -> Status {
161 DCHECK_EQ(column_names_.size(), static_cast<uint32_t>(col_index));
162 column_names_.emplace_back(reinterpret_cast<const char*>(data), size);
163 return Status::OK();
164 };
165 RETURN_NOT_OK(parser.VisitColumn(col_index, visit));
166 std::shared_ptr<ColumnBuilder> builder;
167 // Does the named column have a fixed type?
168 auto it = convert_options_.column_types.find(column_names_[col_index]);
169 if (it == convert_options_.column_types.end()) {
170 RETURN_NOT_OK(
171 ColumnBuilder::Make(col_index, convert_options_, task_group_, &builder));
172 } else {
173 RETURN_NOT_OK(ColumnBuilder::Make(it->second, col_index, convert_options_,
174 task_group_, &builder));
175 }
176 column_builders_.push_back(builder);
177 }
178
179 // Skip parsed header rows
180 cur_data_ += parsed_size;
181 cur_size_ -= parsed_size;
182 return Status::OK();
183 }
184
185 // Trigger conversion of parsed block data
186 Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) {
187 for (auto& builder : column_builders_) {
188 builder->Insert(block_index, parser);
189 }
190 return Status::OK();
191 }
192
193 Status MakeTable(std::shared_ptr<Table>* out) {
194 DCHECK_GT(num_cols_, 0);
195 DCHECK_EQ(column_names_.size(), static_cast<uint32_t>(num_cols_));
196 DCHECK_EQ(column_builders_.size(), static_cast<uint32_t>(num_cols_));
197
198 std::vector<std::shared_ptr<Field>> fields;
199 std::vector<std::shared_ptr<Column>> columns;
200
201 for (int32_t i = 0; i < num_cols_; ++i) {
202 std::shared_ptr<ChunkedArray> array;
203 RETURN_NOT_OK(column_builders_[i]->Finish(&array));
204 columns.push_back(std::make_shared<Column>(column_names_[i], array));
205 fields.push_back(columns.back()->field());
206 }
207 *out = Table::Make(schema(fields), columns);
208 return Status::OK();
209 }
210
211 MemoryPool* pool_;
212 ReadOptions read_options_;
213 ParseOptions parse_options_;
214 ConvertOptions convert_options_;
215
216 int32_t num_cols_ = -1;
217 std::shared_ptr<ReadaheadSpooler> readahead_;
218 // Column names
219 std::vector<std::string> column_names_;
220 std::shared_ptr<internal::TaskGroup> task_group_;
221 std::vector<std::shared_ptr<ColumnBuilder>> column_builders_;
222
223 // Current block and data pointer
224 std::shared_ptr<Buffer> cur_block_;
225 const uint8_t* cur_data_ = nullptr;
226 int64_t cur_size_ = 0;
227 // Index of current block inside data stream
228 int64_t cur_block_index_ = 0;
229 // Whether there was a trailing CR at the end of last parsed line
230 bool trailing_cr_ = false;
231 // Whether we reached input stream EOF. There may still be data left to
232 // process in current block.
233 bool eof_ = false;
234};
235
236/////////////////////////////////////////////////////////////////////////
237// Serial TableReader implementation
238
239class SerialTableReader : public BaseTableReader {
240 public:
241 SerialTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
242 const ReadOptions& read_options, const ParseOptions& parse_options,
243 const ConvertOptions& convert_options)
244 : BaseTableReader(pool, read_options, parse_options, convert_options) {
245 // Since we're converting serially, no need to readahead more than one block
246 int32_t block_queue_size = 1;
247 readahead_ = std::make_shared<ReadaheadSpooler>(
248 pool_, input, read_options_.block_size, block_queue_size, kDefaultLeftPadding,
249 kDefaultRightPadding);
250 }
251
252 Status Read(std::shared_ptr<Table>* out) {
253 task_group_ = internal::TaskGroup::MakeSerial();
254
255 // First block
256 RETURN_NOT_OK(ReadNextBlock());
257 if (eof_) {
258 return Status::Invalid("Empty CSV file");
259 }
260 RETURN_NOT_OK(ProcessHeader());
261
262 static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
263 auto parser =
264 std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
265 while (!eof_) {
266 // Consume current block
267 uint32_t parsed_size = 0;
268 RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(cur_data_),
269 static_cast<uint32_t>(cur_size_), &parsed_size));
270 if (parser->num_rows() > 0) {
271 // Got some data
272 RETURN_NOT_OK(ProcessData(parser, cur_block_index_++));
273 cur_data_ += parsed_size;
274 cur_size_ -= parsed_size;
275 if (!task_group_->ok()) {
276 // Conversion error => early exit
277 break;
278 }
279 } else {
280 // Need to fetch more data to get at least one row
281 RETURN_NOT_OK(ReadNextBlock());
282 }
283 }
284 if (eof_ && cur_size_ > 0) {
285 // Parse remaining data
286 uint32_t parsed_size = 0;
287 RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_),
288 static_cast<uint32_t>(cur_size_), &parsed_size));
289 if (parser->num_rows() > 0) {
290 RETURN_NOT_OK(ProcessData(parser, cur_block_index_++));
291 }
292 }
293
294 // Finish conversion, create schema and table
295 RETURN_NOT_OK(task_group_->Finish());
296 return MakeTable(out);
297 }
298};
299
300/////////////////////////////////////////////////////////////////////////
301// Parallel TableReader implementation
302
303class ThreadedTableReader : public BaseTableReader {
304 public:
305 ThreadedTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
306 ThreadPool* thread_pool, const ReadOptions& read_options,
307 const ParseOptions& parse_options,
308 const ConvertOptions& convert_options)
309 : BaseTableReader(pool, read_options, parse_options, convert_options),
310 thread_pool_(thread_pool) {
311 // Readahead one block per worker thread
312 int32_t block_queue_size = thread_pool->GetCapacity();
313 readahead_ = std::make_shared<ReadaheadSpooler>(
314 pool_, input, read_options_.block_size, block_queue_size, kDefaultLeftPadding,
315 kDefaultRightPadding);
316 }
317
318 ~ThreadedTableReader() {
319 if (task_group_) {
320 // In case of error, make sure all pending tasks are finished before
321 // we start destroying BaseTableReader members
322 ARROW_UNUSED(task_group_->Finish());
323 }
324 }
325
326 Status Read(std::shared_ptr<Table>* out) {
327 task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_);
328 static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
329 Chunker chunker(parse_options_);
330
331 // Get first block and process header serially
332 RETURN_NOT_OK(ReadNextBlock());
333 if (eof_) {
334 return Status::Invalid("Empty CSV file");
335 }
336 RETURN_NOT_OK(ProcessHeader());
337
338 while (!eof_ && task_group_->ok()) {
339 // Consume current chunk
340 uint32_t chunk_size = 0;
341 RETURN_NOT_OK(chunker.Process(reinterpret_cast<const char*>(cur_data_),
342 static_cast<uint32_t>(cur_size_), &chunk_size));
343 if (chunk_size > 0) {
344 // Got a chunk of rows
345 const uint8_t* chunk_data = cur_data_;
346 std::shared_ptr<Buffer> chunk_buffer = cur_block_;
347 int64_t chunk_index = cur_block_index_;
348
349 // "mutable" allows to modify captured by-copy chunk_buffer
350 task_group_->Append([=]() mutable -> Status {
351 auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_cols_,
352 max_num_rows);
353 uint32_t parsed_size = 0;
354 RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(chunk_data),
355 chunk_size, &parsed_size));
356 if (parsed_size != chunk_size) {
357 DCHECK_EQ(parsed_size, chunk_size);
358 return Status::Invalid("Chunker and parser disagree on block size: ",
359 chunk_size, " vs ", parsed_size);
360 }
361 RETURN_NOT_OK(ProcessData(parser, chunk_index));
362 // Keep chunk buffer alive within closure and release it at the end
363 chunk_buffer.reset();
364 return Status::OK();
365 });
366 cur_data_ += chunk_size;
367 cur_size_ -= chunk_size;
368 cur_block_index_++;
369 } else {
370 // Need to fetch more data to get at least one row
371 RETURN_NOT_OK(ReadNextBlock());
372 }
373 }
374
375 // Finish all pending parallel tasks
376 RETURN_NOT_OK(task_group_->Finish());
377
378 if (eof_ && cur_size_ > 0) {
379 // Parse remaining data (serial)
380 task_group_ = internal::TaskGroup::MakeSerial();
381 for (auto& builder : column_builders_) {
382 builder->SetTaskGroup(task_group_);
383 }
384 auto parser =
385 std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
386 uint32_t parsed_size = 0;
387 RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_),
388 static_cast<uint32_t>(cur_size_), &parsed_size));
389 if (parser->num_rows() > 0) {
390 RETURN_NOT_OK(ProcessData(parser, cur_block_index_++));
391 }
392 RETURN_NOT_OK(task_group_->Finish());
393 }
394
395 // Create schema and table
396 return MakeTable(out);
397 }
398
399 protected:
400 ThreadPool* thread_pool_;
401};
402
403/////////////////////////////////////////////////////////////////////////
404// TableReader factory function
405
406Status TableReader::Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
407 const ReadOptions& read_options,
408 const ParseOptions& parse_options,
409 const ConvertOptions& convert_options,
410 std::shared_ptr<TableReader>* out) {
411 std::shared_ptr<TableReader> result;
412 if (read_options.use_threads) {
413 result = std::make_shared<ThreadedTableReader>(
414 pool, input, GetCpuThreadPool(), read_options, parse_options, convert_options);
415 *out = result;
416 return Status::OK();
417 } else {
418 result = std::make_shared<SerialTableReader>(pool, input, read_options, parse_options,
419 convert_options);
420 *out = result;
421 return Status::OK();
422 }
423}
424
425} // namespace csv
426} // namespace arrow
427