1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <cstddef>
19#include <cstdint>
20#include <memory>
21#include <mutex>
22#include <sstream>
23#include <string>
24#include <utility>
25#include <vector>
26
27#include "arrow/array.h"
28#include "arrow/csv/column-builder.h"
29#include "arrow/csv/converter.h"
30#include "arrow/csv/options.h"
31#include "arrow/memory_pool.h"
32#include "arrow/status.h"
33#include "arrow/table.h"
34#include "arrow/type.h"
35#include "arrow/util/logging.h"
36#include "arrow/util/task-group.h"
37
38namespace arrow {
39namespace csv {
40
41class BlockParser;
42
43using internal::TaskGroup;
44
45void ColumnBuilder::SetTaskGroup(const std::shared_ptr<internal::TaskGroup>& task_group) {
46 task_group_ = task_group;
47}
48
49void ColumnBuilder::Append(const std::shared_ptr<BlockParser>& parser) {
50 Insert(static_cast<int64_t>(chunks_.size()), parser);
51}
52
53//////////////////////////////////////////////////////////////////////////
54// Pre-typed column builder implementation
55
56class TypedColumnBuilder : public ColumnBuilder {
57 public:
58 TypedColumnBuilder(const std::shared_ptr<DataType>& type, int32_t col_index,
59 const ConvertOptions& options, MemoryPool* pool,
60 const std::shared_ptr<internal::TaskGroup>& task_group)
61 : ColumnBuilder(task_group),
62 type_(type),
63 col_index_(col_index),
64 options_(options),
65 pool_(pool) {}
66
67 Status Init();
68
69 void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
70 Status Finish(std::shared_ptr<ChunkedArray>* out) override;
71
72 protected:
73 Status WrapConversionError(const Status& st) {
74 if (st.ok()) {
75 return st;
76 } else {
77 std::stringstream ss;
78 ss << "In column #" << col_index_ << ": " << st.message();
79 return Status(st.code(), ss.str());
80 }
81 }
82
83 std::mutex mutex_;
84
85 std::shared_ptr<DataType> type_;
86 int32_t col_index_;
87 ConvertOptions options_;
88 MemoryPool* pool_;
89
90 std::shared_ptr<Converter> converter_;
91};
92
93Status TypedColumnBuilder::Init() {
94 return Converter::Make(type_, options_, pool_, &converter_);
95}
96
97void TypedColumnBuilder::Insert(int64_t block_index,
98 const std::shared_ptr<BlockParser>& parser) {
99 DCHECK_NE(converter_, nullptr);
100
101 // Create a null Array pointer at the back at the list
102 // and spawn a task to initialize it after conversion
103 size_t chunk_index = static_cast<size_t>(block_index);
104 {
105 std::lock_guard<std::mutex> lock(mutex_);
106 if (chunks_.size() <= chunk_index) {
107 chunks_.resize(chunk_index + 1);
108 }
109 }
110
111 // We're careful that all references in the closure outlive the Append() call
112 task_group_->Append([=]() -> Status {
113 std::shared_ptr<Array> res;
114 RETURN_NOT_OK(WrapConversionError(converter_->Convert(*parser, col_index_, &res)));
115
116 std::lock_guard<std::mutex> lock(mutex_);
117 // Should not insert an already converted chunk
118 DCHECK_EQ(chunks_[chunk_index], nullptr);
119 chunks_[chunk_index] = std::move(res);
120 return Status::OK();
121 });
122}
123
124Status TypedColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) {
125 // Unnecessary iff all tasks have finished
126 std::lock_guard<std::mutex> lock(mutex_);
127
128 for (const auto& chunk : chunks_) {
129 if (chunk == nullptr) {
130 return Status::Invalid("a chunk failed converting for an unknown reason");
131 }
132 }
133 *out = std::make_shared<ChunkedArray>(chunks_, type_);
134 return Status::OK();
135}
136
137//////////////////////////////////////////////////////////////////////////
138// Type-inferring column builder implementation
139
140class InferringColumnBuilder : public ColumnBuilder {
141 public:
142 InferringColumnBuilder(int32_t col_index, const ConvertOptions& options,
143 MemoryPool* pool,
144 const std::shared_ptr<internal::TaskGroup>& task_group)
145 : ColumnBuilder(task_group),
146 col_index_(col_index),
147 options_(options),
148 pool_(pool) {}
149
150 Status Init();
151
152 void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
153 Status Finish(std::shared_ptr<ChunkedArray>* out) override;
154
155 protected:
156 Status LoosenType();
157 Status UpdateType();
158 Status TryConvertChunk(size_t chunk_index);
159 // This must be called unlocked!
160 void ScheduleConvertChunk(size_t chunk_index);
161
162 std::mutex mutex_;
163
164 int32_t col_index_;
165 ConvertOptions options_;
166 MemoryPool* pool_;
167 std::shared_ptr<Converter> converter_;
168
169 // Current inference status
170 enum class InferKind { Null, Integer, Real, Timestamp, Text, Binary };
171
172 std::shared_ptr<DataType> infer_type_;
173 InferKind infer_kind_;
174 bool can_loosen_type_;
175
176 // The parsers corresponding to each chunk (for reconverting)
177 std::vector<std::shared_ptr<BlockParser>> parsers_;
178};
179
180Status InferringColumnBuilder::Init() {
181 infer_kind_ = InferKind::Null;
182 return UpdateType();
183}
184
185Status InferringColumnBuilder::LoosenType() {
186 // We are locked
187
188 DCHECK(can_loosen_type_);
189 switch (infer_kind_) {
190 case InferKind::Null:
191 infer_kind_ = InferKind::Integer;
192 break;
193 case InferKind::Integer:
194 infer_kind_ = InferKind::Timestamp;
195 break;
196 case InferKind::Timestamp:
197 infer_kind_ = InferKind::Real;
198 break;
199 case InferKind::Real:
200 infer_kind_ = InferKind::Text;
201 break;
202 case InferKind::Text:
203 infer_kind_ = InferKind::Binary;
204 break;
205 case InferKind::Binary:
206 return Status::UnknownError("Shouldn't come here");
207 }
208 return UpdateType();
209}
210
211Status InferringColumnBuilder::UpdateType() {
212 // We are locked
213
214 switch (infer_kind_) {
215 case InferKind::Null:
216 infer_type_ = null();
217 can_loosen_type_ = true;
218 break;
219 case InferKind::Integer:
220 infer_type_ = int64();
221 can_loosen_type_ = true;
222 break;
223 case InferKind::Timestamp:
224 // We don't support parsing second fractions for now
225 infer_type_ = timestamp(TimeUnit::SECOND);
226 can_loosen_type_ = true;
227 break;
228 case InferKind::Real:
229 infer_type_ = float64();
230 can_loosen_type_ = true;
231 break;
232 case InferKind::Text:
233 infer_type_ = utf8();
234 can_loosen_type_ = true;
235 break;
236 case InferKind::Binary:
237 infer_type_ = binary();
238 can_loosen_type_ = false;
239 break;
240 }
241 return Converter::Make(infer_type_, options_, pool_, &converter_);
242}
243
244void InferringColumnBuilder::ScheduleConvertChunk(size_t chunk_index) {
245 // We're careful that all values in the closure outlive the Append() call
246 task_group_->Append([=]() { return TryConvertChunk(chunk_index); });
247}
248
249Status InferringColumnBuilder::TryConvertChunk(size_t chunk_index) {
250 std::unique_lock<std::mutex> lock(mutex_);
251 std::shared_ptr<Converter> converter = converter_;
252 std::shared_ptr<BlockParser> parser = parsers_[chunk_index];
253 std::shared_ptr<Array> res;
254 InferKind kind = infer_kind_;
255
256 DCHECK_NE(parser, nullptr);
257
258 lock.unlock();
259 Status st = converter->Convert(*parser, col_index_, &res);
260 lock.lock();
261
262 if (kind != infer_kind_) {
263 // infer_kind_ was changed by another task, reconvert
264 lock.unlock();
265 ScheduleConvertChunk(chunk_index);
266 return Status::OK();
267 }
268
269 if (st.ok()) {
270 // Conversion succeeded
271 chunks_[chunk_index] = std::move(res);
272 if (!can_loosen_type_) {
273 // We won't try to reconvert anymore
274 parsers_[chunk_index].reset();
275 }
276 return Status::OK();
277 } else if (can_loosen_type_) {
278 // Conversion failed, try another type
279 RETURN_NOT_OK(LoosenType());
280
281 // Reconvert past finished chunks
282 // (unfinished chunks will notice by themselves if they need reconverting)
283 size_t nchunks = chunks_.size();
284 for (size_t i = 0; i < nchunks; ++i) {
285 if (i != chunk_index && chunks_[i]) {
286 // We're assuming the chunk was converted using the wrong type
287 // (which should be true unless the executor reorders tasks)
288 chunks_[i].reset();
289 lock.unlock();
290 ScheduleConvertChunk(i);
291 lock.lock();
292 }
293 }
294
295 // Reconvert this chunk
296 lock.unlock();
297 ScheduleConvertChunk(chunk_index);
298
299 return Status::OK();
300 } else {
301 // Conversion failed but cannot loosen more
302 return st;
303 }
304}
305
306void InferringColumnBuilder::Insert(int64_t block_index,
307 const std::shared_ptr<BlockParser>& parser) {
308 // Create a slot for the new chunk and spawn a task to convert it
309 size_t chunk_index = static_cast<size_t>(block_index);
310 {
311 std::lock_guard<std::mutex> lock(mutex_);
312
313 DCHECK_NE(converter_, nullptr);
314 if (chunks_.size() <= chunk_index) {
315 chunks_.resize(chunk_index + 1);
316 }
317 if (parsers_.size() <= chunk_index) {
318 parsers_.resize(chunk_index + 1);
319 }
320 // Should not insert an already converting chunk
321 DCHECK_EQ(parsers_[chunk_index], nullptr);
322 parsers_[chunk_index] = parser;
323 }
324
325 ScheduleConvertChunk(chunk_index);
326}
327
328Status InferringColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) {
329 // Unnecessary iff all tasks have finished
330 std::lock_guard<std::mutex> lock(mutex_);
331
332 for (const auto& chunk : chunks_) {
333 if (chunk == nullptr) {
334 return Status::Invalid("A chunk failed converting for an unknown reason");
335 }
336 // XXX Perhaps we must instead do a last equalization pass
337 // in this serial step
338 DCHECK_EQ(chunk->type()->id(), infer_type_->id())
339 << "Inference didn't equalize types!";
340 }
341 *out = std::make_shared<ChunkedArray>(chunks_, infer_type_);
342 chunks_.clear();
343 parsers_.clear();
344
345 return Status::OK();
346}
347
348//////////////////////////////////////////////////////////////////////////
349// Factory functions
350
351Status ColumnBuilder::Make(const std::shared_ptr<DataType>& type, int32_t col_index,
352 const ConvertOptions& options,
353 const std::shared_ptr<TaskGroup>& task_group,
354 std::shared_ptr<ColumnBuilder>* out) {
355 auto ptr =
356 new TypedColumnBuilder(type, col_index, options, default_memory_pool(), task_group);
357 auto res = std::shared_ptr<ColumnBuilder>(ptr);
358 RETURN_NOT_OK(ptr->Init());
359 *out = res;
360 return Status::OK();
361}
362
363Status ColumnBuilder::Make(int32_t col_index, const ConvertOptions& options,
364 const std::shared_ptr<TaskGroup>& task_group,
365 std::shared_ptr<ColumnBuilder>* out) {
366 auto ptr =
367 new InferringColumnBuilder(col_index, options, default_memory_pool(), task_group);
368 auto res = std::shared_ptr<ColumnBuilder>(ptr);
369 RETURN_NOT_OK(ptr->Init());
370 *out = res;
371 return Status::OK();
372}
373
374} // namespace csv
375} // namespace arrow
376