1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <cstddef> |
19 | #include <cstdint> |
20 | #include <memory> |
21 | #include <mutex> |
22 | #include <sstream> |
23 | #include <string> |
24 | #include <utility> |
25 | #include <vector> |
26 | |
27 | #include "arrow/array.h" |
28 | #include "arrow/csv/column-builder.h" |
29 | #include "arrow/csv/converter.h" |
30 | #include "arrow/csv/options.h" |
31 | #include "arrow/memory_pool.h" |
32 | #include "arrow/status.h" |
33 | #include "arrow/table.h" |
34 | #include "arrow/type.h" |
35 | #include "arrow/util/logging.h" |
36 | #include "arrow/util/task-group.h" |
37 | |
38 | namespace arrow { |
39 | namespace csv { |
40 | |
41 | class BlockParser; |
42 | |
43 | using internal::TaskGroup; |
44 | |
45 | void ColumnBuilder::SetTaskGroup(const std::shared_ptr<internal::TaskGroup>& task_group) { |
46 | task_group_ = task_group; |
47 | } |
48 | |
49 | void ColumnBuilder::Append(const std::shared_ptr<BlockParser>& parser) { |
50 | Insert(static_cast<int64_t>(chunks_.size()), parser); |
51 | } |
52 | |
53 | ////////////////////////////////////////////////////////////////////////// |
54 | // Pre-typed column builder implementation |
55 | |
56 | class TypedColumnBuilder : public ColumnBuilder { |
57 | public: |
58 | TypedColumnBuilder(const std::shared_ptr<DataType>& type, int32_t col_index, |
59 | const ConvertOptions& options, MemoryPool* pool, |
60 | const std::shared_ptr<internal::TaskGroup>& task_group) |
61 | : ColumnBuilder(task_group), |
62 | type_(type), |
63 | col_index_(col_index), |
64 | options_(options), |
65 | pool_(pool) {} |
66 | |
67 | Status Init(); |
68 | |
69 | void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override; |
70 | Status Finish(std::shared_ptr<ChunkedArray>* out) override; |
71 | |
72 | protected: |
73 | Status WrapConversionError(const Status& st) { |
74 | if (st.ok()) { |
75 | return st; |
76 | } else { |
77 | std::stringstream ss; |
78 | ss << "In column #" << col_index_ << ": " << st.message(); |
79 | return Status(st.code(), ss.str()); |
80 | } |
81 | } |
82 | |
83 | std::mutex mutex_; |
84 | |
85 | std::shared_ptr<DataType> type_; |
86 | int32_t col_index_; |
87 | ConvertOptions options_; |
88 | MemoryPool* pool_; |
89 | |
90 | std::shared_ptr<Converter> converter_; |
91 | }; |
92 | |
93 | Status TypedColumnBuilder::Init() { |
94 | return Converter::Make(type_, options_, pool_, &converter_); |
95 | } |
96 | |
97 | void TypedColumnBuilder::Insert(int64_t block_index, |
98 | const std::shared_ptr<BlockParser>& parser) { |
99 | DCHECK_NE(converter_, nullptr); |
100 | |
101 | // Create a null Array pointer at the back at the list |
102 | // and spawn a task to initialize it after conversion |
103 | size_t chunk_index = static_cast<size_t>(block_index); |
104 | { |
105 | std::lock_guard<std::mutex> lock(mutex_); |
106 | if (chunks_.size() <= chunk_index) { |
107 | chunks_.resize(chunk_index + 1); |
108 | } |
109 | } |
110 | |
111 | // We're careful that all references in the closure outlive the Append() call |
112 | task_group_->Append([=]() -> Status { |
113 | std::shared_ptr<Array> res; |
114 | RETURN_NOT_OK(WrapConversionError(converter_->Convert(*parser, col_index_, &res))); |
115 | |
116 | std::lock_guard<std::mutex> lock(mutex_); |
117 | // Should not insert an already converted chunk |
118 | DCHECK_EQ(chunks_[chunk_index], nullptr); |
119 | chunks_[chunk_index] = std::move(res); |
120 | return Status::OK(); |
121 | }); |
122 | } |
123 | |
124 | Status TypedColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) { |
125 | // Unnecessary iff all tasks have finished |
126 | std::lock_guard<std::mutex> lock(mutex_); |
127 | |
128 | for (const auto& chunk : chunks_) { |
129 | if (chunk == nullptr) { |
130 | return Status::Invalid("a chunk failed converting for an unknown reason" ); |
131 | } |
132 | } |
133 | *out = std::make_shared<ChunkedArray>(chunks_, type_); |
134 | return Status::OK(); |
135 | } |
136 | |
137 | ////////////////////////////////////////////////////////////////////////// |
138 | // Type-inferring column builder implementation |
139 | |
140 | class InferringColumnBuilder : public ColumnBuilder { |
141 | public: |
142 | InferringColumnBuilder(int32_t col_index, const ConvertOptions& options, |
143 | MemoryPool* pool, |
144 | const std::shared_ptr<internal::TaskGroup>& task_group) |
145 | : ColumnBuilder(task_group), |
146 | col_index_(col_index), |
147 | options_(options), |
148 | pool_(pool) {} |
149 | |
150 | Status Init(); |
151 | |
152 | void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override; |
153 | Status Finish(std::shared_ptr<ChunkedArray>* out) override; |
154 | |
155 | protected: |
156 | Status LoosenType(); |
157 | Status UpdateType(); |
158 | Status TryConvertChunk(size_t chunk_index); |
159 | // This must be called unlocked! |
160 | void ScheduleConvertChunk(size_t chunk_index); |
161 | |
162 | std::mutex mutex_; |
163 | |
164 | int32_t col_index_; |
165 | ConvertOptions options_; |
166 | MemoryPool* pool_; |
167 | std::shared_ptr<Converter> converter_; |
168 | |
169 | // Current inference status |
170 | enum class InferKind { Null, Integer, Real, Timestamp, Text, Binary }; |
171 | |
172 | std::shared_ptr<DataType> infer_type_; |
173 | InferKind infer_kind_; |
174 | bool can_loosen_type_; |
175 | |
176 | // The parsers corresponding to each chunk (for reconverting) |
177 | std::vector<std::shared_ptr<BlockParser>> parsers_; |
178 | }; |
179 | |
180 | Status InferringColumnBuilder::Init() { |
181 | infer_kind_ = InferKind::Null; |
182 | return UpdateType(); |
183 | } |
184 | |
185 | Status InferringColumnBuilder::LoosenType() { |
186 | // We are locked |
187 | |
188 | DCHECK(can_loosen_type_); |
189 | switch (infer_kind_) { |
190 | case InferKind::Null: |
191 | infer_kind_ = InferKind::Integer; |
192 | break; |
193 | case InferKind::Integer: |
194 | infer_kind_ = InferKind::Timestamp; |
195 | break; |
196 | case InferKind::Timestamp: |
197 | infer_kind_ = InferKind::Real; |
198 | break; |
199 | case InferKind::Real: |
200 | infer_kind_ = InferKind::Text; |
201 | break; |
202 | case InferKind::Text: |
203 | infer_kind_ = InferKind::Binary; |
204 | break; |
205 | case InferKind::Binary: |
206 | return Status::UnknownError("Shouldn't come here" ); |
207 | } |
208 | return UpdateType(); |
209 | } |
210 | |
211 | Status InferringColumnBuilder::UpdateType() { |
212 | // We are locked |
213 | |
214 | switch (infer_kind_) { |
215 | case InferKind::Null: |
216 | infer_type_ = null(); |
217 | can_loosen_type_ = true; |
218 | break; |
219 | case InferKind::Integer: |
220 | infer_type_ = int64(); |
221 | can_loosen_type_ = true; |
222 | break; |
223 | case InferKind::Timestamp: |
224 | // We don't support parsing second fractions for now |
225 | infer_type_ = timestamp(TimeUnit::SECOND); |
226 | can_loosen_type_ = true; |
227 | break; |
228 | case InferKind::Real: |
229 | infer_type_ = float64(); |
230 | can_loosen_type_ = true; |
231 | break; |
232 | case InferKind::Text: |
233 | infer_type_ = utf8(); |
234 | can_loosen_type_ = true; |
235 | break; |
236 | case InferKind::Binary: |
237 | infer_type_ = binary(); |
238 | can_loosen_type_ = false; |
239 | break; |
240 | } |
241 | return Converter::Make(infer_type_, options_, pool_, &converter_); |
242 | } |
243 | |
244 | void InferringColumnBuilder::ScheduleConvertChunk(size_t chunk_index) { |
245 | // We're careful that all values in the closure outlive the Append() call |
246 | task_group_->Append([=]() { return TryConvertChunk(chunk_index); }); |
247 | } |
248 | |
249 | Status InferringColumnBuilder::TryConvertChunk(size_t chunk_index) { |
250 | std::unique_lock<std::mutex> lock(mutex_); |
251 | std::shared_ptr<Converter> converter = converter_; |
252 | std::shared_ptr<BlockParser> parser = parsers_[chunk_index]; |
253 | std::shared_ptr<Array> res; |
254 | InferKind kind = infer_kind_; |
255 | |
256 | DCHECK_NE(parser, nullptr); |
257 | |
258 | lock.unlock(); |
259 | Status st = converter->Convert(*parser, col_index_, &res); |
260 | lock.lock(); |
261 | |
262 | if (kind != infer_kind_) { |
263 | // infer_kind_ was changed by another task, reconvert |
264 | lock.unlock(); |
265 | ScheduleConvertChunk(chunk_index); |
266 | return Status::OK(); |
267 | } |
268 | |
269 | if (st.ok()) { |
270 | // Conversion succeeded |
271 | chunks_[chunk_index] = std::move(res); |
272 | if (!can_loosen_type_) { |
273 | // We won't try to reconvert anymore |
274 | parsers_[chunk_index].reset(); |
275 | } |
276 | return Status::OK(); |
277 | } else if (can_loosen_type_) { |
278 | // Conversion failed, try another type |
279 | RETURN_NOT_OK(LoosenType()); |
280 | |
281 | // Reconvert past finished chunks |
282 | // (unfinished chunks will notice by themselves if they need reconverting) |
283 | size_t nchunks = chunks_.size(); |
284 | for (size_t i = 0; i < nchunks; ++i) { |
285 | if (i != chunk_index && chunks_[i]) { |
286 | // We're assuming the chunk was converted using the wrong type |
287 | // (which should be true unless the executor reorders tasks) |
288 | chunks_[i].reset(); |
289 | lock.unlock(); |
290 | ScheduleConvertChunk(i); |
291 | lock.lock(); |
292 | } |
293 | } |
294 | |
295 | // Reconvert this chunk |
296 | lock.unlock(); |
297 | ScheduleConvertChunk(chunk_index); |
298 | |
299 | return Status::OK(); |
300 | } else { |
301 | // Conversion failed but cannot loosen more |
302 | return st; |
303 | } |
304 | } |
305 | |
306 | void InferringColumnBuilder::Insert(int64_t block_index, |
307 | const std::shared_ptr<BlockParser>& parser) { |
308 | // Create a slot for the new chunk and spawn a task to convert it |
309 | size_t chunk_index = static_cast<size_t>(block_index); |
310 | { |
311 | std::lock_guard<std::mutex> lock(mutex_); |
312 | |
313 | DCHECK_NE(converter_, nullptr); |
314 | if (chunks_.size() <= chunk_index) { |
315 | chunks_.resize(chunk_index + 1); |
316 | } |
317 | if (parsers_.size() <= chunk_index) { |
318 | parsers_.resize(chunk_index + 1); |
319 | } |
320 | // Should not insert an already converting chunk |
321 | DCHECK_EQ(parsers_[chunk_index], nullptr); |
322 | parsers_[chunk_index] = parser; |
323 | } |
324 | |
325 | ScheduleConvertChunk(chunk_index); |
326 | } |
327 | |
328 | Status InferringColumnBuilder::Finish(std::shared_ptr<ChunkedArray>* out) { |
329 | // Unnecessary iff all tasks have finished |
330 | std::lock_guard<std::mutex> lock(mutex_); |
331 | |
332 | for (const auto& chunk : chunks_) { |
333 | if (chunk == nullptr) { |
334 | return Status::Invalid("A chunk failed converting for an unknown reason" ); |
335 | } |
336 | // XXX Perhaps we must instead do a last equalization pass |
337 | // in this serial step |
338 | DCHECK_EQ(chunk->type()->id(), infer_type_->id()) |
339 | << "Inference didn't equalize types!" ; |
340 | } |
341 | *out = std::make_shared<ChunkedArray>(chunks_, infer_type_); |
342 | chunks_.clear(); |
343 | parsers_.clear(); |
344 | |
345 | return Status::OK(); |
346 | } |
347 | |
348 | ////////////////////////////////////////////////////////////////////////// |
349 | // Factory functions |
350 | |
351 | Status ColumnBuilder::Make(const std::shared_ptr<DataType>& type, int32_t col_index, |
352 | const ConvertOptions& options, |
353 | const std::shared_ptr<TaskGroup>& task_group, |
354 | std::shared_ptr<ColumnBuilder>* out) { |
355 | auto ptr = |
356 | new TypedColumnBuilder(type, col_index, options, default_memory_pool(), task_group); |
357 | auto res = std::shared_ptr<ColumnBuilder>(ptr); |
358 | RETURN_NOT_OK(ptr->Init()); |
359 | *out = res; |
360 | return Status::OK(); |
361 | } |
362 | |
363 | Status ColumnBuilder::Make(int32_t col_index, const ConvertOptions& options, |
364 | const std::shared_ptr<TaskGroup>& task_group, |
365 | std::shared_ptr<ColumnBuilder>* out) { |
366 | auto ptr = |
367 | new InferringColumnBuilder(col_index, options, default_memory_pool(), task_group); |
368 | auto res = std::shared_ptr<ColumnBuilder>(ptr); |
369 | RETURN_NOT_OK(ptr->Init()); |
370 | *out = res; |
371 | return Status::OK(); |
372 | } |
373 | |
374 | } // namespace csv |
375 | } // namespace arrow |
376 | |