1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/table.h" |
19 | |
20 | #include <algorithm> |
21 | #include <cstdlib> |
22 | #include <limits> |
23 | #include <memory> |
24 | #include <sstream> |
25 | #include <utility> |
26 | |
27 | #include "arrow/array.h" |
28 | #include "arrow/record_batch.h" |
29 | #include "arrow/status.h" |
30 | #include "arrow/type.h" |
31 | #include "arrow/util/logging.h" |
32 | #include "arrow/util/stl.h" |
33 | |
34 | namespace arrow { |
35 | |
36 | // ---------------------------------------------------------------------- |
37 | // ChunkedArray and Column methods |
38 | |
39 | ChunkedArray::ChunkedArray(const ArrayVector& chunks) : chunks_(chunks) { |
40 | length_ = 0; |
41 | null_count_ = 0; |
42 | DCHECK_GT(chunks.size(), 0) |
43 | << "cannot construct ChunkedArray from empty vector and omitted type" ; |
44 | type_ = chunks[0]->type(); |
45 | for (const std::shared_ptr<Array>& chunk : chunks) { |
46 | length_ += chunk->length(); |
47 | null_count_ += chunk->null_count(); |
48 | } |
49 | } |
50 | |
51 | ChunkedArray::ChunkedArray(const ArrayVector& chunks, |
52 | const std::shared_ptr<DataType>& type) |
53 | : chunks_(chunks), type_(type) { |
54 | length_ = 0; |
55 | null_count_ = 0; |
56 | for (const std::shared_ptr<Array>& chunk : chunks) { |
57 | length_ += chunk->length(); |
58 | null_count_ += chunk->null_count(); |
59 | } |
60 | } |
61 | |
62 | bool ChunkedArray::Equals(const ChunkedArray& other) const { |
63 | if (length_ != other.length()) { |
64 | return false; |
65 | } |
66 | if (null_count_ != other.null_count()) { |
67 | return false; |
68 | } |
69 | if (length_ == 0) { |
70 | return type_->Equals(other.type_); |
71 | } |
72 | |
73 | // Check contents of the underlying arrays. This checks for equality of |
74 | // the underlying data independently of the chunk size. |
75 | int this_chunk_idx = 0; |
76 | int64_t this_start_idx = 0; |
77 | int other_chunk_idx = 0; |
78 | int64_t other_start_idx = 0; |
79 | |
80 | int64_t elements_compared = 0; |
81 | while (elements_compared < length_) { |
82 | const std::shared_ptr<Array> this_array = chunks_[this_chunk_idx]; |
83 | const std::shared_ptr<Array> other_array = other.chunk(other_chunk_idx); |
84 | int64_t common_length = std::min(this_array->length() - this_start_idx, |
85 | other_array->length() - other_start_idx); |
86 | if (!this_array->RangeEquals(this_start_idx, this_start_idx + common_length, |
87 | other_start_idx, other_array)) { |
88 | return false; |
89 | } |
90 | |
91 | elements_compared += common_length; |
92 | |
93 | // If we have exhausted the current chunk, proceed to the next one individually. |
94 | if (this_start_idx + common_length == this_array->length()) { |
95 | this_chunk_idx++; |
96 | this_start_idx = 0; |
97 | } else { |
98 | this_start_idx += common_length; |
99 | } |
100 | |
101 | if (other_start_idx + common_length == other_array->length()) { |
102 | other_chunk_idx++; |
103 | other_start_idx = 0; |
104 | } else { |
105 | other_start_idx += common_length; |
106 | } |
107 | } |
108 | return true; |
109 | } |
110 | |
111 | bool ChunkedArray::Equals(const std::shared_ptr<ChunkedArray>& other) const { |
112 | if (this == other.get()) { |
113 | return true; |
114 | } |
115 | if (!other) { |
116 | return false; |
117 | } |
118 | return Equals(*other.get()); |
119 | } |
120 | |
121 | std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset, int64_t length) const { |
122 | DCHECK_LE(offset, length_); |
123 | |
124 | int curr_chunk = 0; |
125 | while (curr_chunk < num_chunks() && offset >= chunk(curr_chunk)->length()) { |
126 | offset -= chunk(curr_chunk)->length(); |
127 | curr_chunk++; |
128 | } |
129 | |
130 | ArrayVector new_chunks; |
131 | while (curr_chunk < num_chunks() && length > 0) { |
132 | new_chunks.push_back(chunk(curr_chunk)->Slice(offset, length)); |
133 | length -= chunk(curr_chunk)->length() - offset; |
134 | offset = 0; |
135 | curr_chunk++; |
136 | } |
137 | |
138 | return std::make_shared<ChunkedArray>(new_chunks, type_); |
139 | } |
140 | |
141 | std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset) const { |
142 | return Slice(offset, length_); |
143 | } |
144 | |
145 | Status ChunkedArray::Flatten(MemoryPool* pool, |
146 | std::vector<std::shared_ptr<ChunkedArray>>* out) const { |
147 | std::vector<std::shared_ptr<ChunkedArray>> flattened; |
148 | if (type()->id() != Type::STRUCT) { |
149 | // Emulate non-existent copy constructor |
150 | flattened.emplace_back(std::make_shared<ChunkedArray>(chunks_, type_)); |
151 | *out = flattened; |
152 | return Status::OK(); |
153 | } |
154 | std::vector<ArrayVector> flattened_chunks; |
155 | for (const auto& chunk : chunks_) { |
156 | ArrayVector res; |
157 | RETURN_NOT_OK(dynamic_cast<const StructArray&>(*chunk).Flatten(pool, &res)); |
158 | if (!flattened_chunks.size()) { |
159 | // First chunk |
160 | for (const auto& array : res) { |
161 | flattened_chunks.push_back({array}); |
162 | } |
163 | } else { |
164 | DCHECK_EQ(flattened_chunks.size(), res.size()); |
165 | for (size_t i = 0; i < res.size(); ++i) { |
166 | flattened_chunks[i].push_back(res[i]); |
167 | } |
168 | } |
169 | } |
170 | for (const auto& vec : flattened_chunks) { |
171 | flattened.emplace_back(std::make_shared<ChunkedArray>(vec)); |
172 | } |
173 | *out = flattened; |
174 | return Status::OK(); |
175 | } |
176 | |
177 | Column::Column(const std::shared_ptr<Field>& field, const ArrayVector& chunks) |
178 | : field_(field) { |
179 | data_ = std::make_shared<ChunkedArray>(chunks, field->type()); |
180 | } |
181 | |
182 | Column::Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& data) |
183 | : field_(field) { |
184 | if (!data) { |
185 | data_ = std::make_shared<ChunkedArray>(ArrayVector({}), field->type()); |
186 | } else { |
187 | data_ = std::make_shared<ChunkedArray>(ArrayVector({data}), field->type()); |
188 | } |
189 | } |
190 | |
191 | Column::Column(const std::string& name, const std::shared_ptr<Array>& data) |
192 | : Column(::arrow::field(name, data->type()), data) {} |
193 | |
194 | Column::Column(const std::string& name, const std::shared_ptr<ChunkedArray>& data) |
195 | : Column(::arrow::field(name, data->type()), data) {} |
196 | |
197 | Column::Column(const std::shared_ptr<Field>& field, |
198 | const std::shared_ptr<ChunkedArray>& data) |
199 | : field_(field), data_(data) {} |
200 | |
201 | Status Column::Flatten(MemoryPool* pool, |
202 | std::vector<std::shared_ptr<Column>>* out) const { |
203 | std::vector<std::shared_ptr<Column>> flattened; |
204 | std::vector<std::shared_ptr<Field>> flattened_fields = field_->Flatten(); |
205 | std::vector<std::shared_ptr<ChunkedArray>> flattened_data; |
206 | RETURN_NOT_OK(data_->Flatten(pool, &flattened_data)); |
207 | DCHECK_EQ(flattened_fields.size(), flattened_data.size()); |
208 | for (size_t i = 0; i < flattened_fields.size(); ++i) { |
209 | flattened.push_back(std::make_shared<Column>(flattened_fields[i], flattened_data[i])); |
210 | } |
211 | *out = flattened; |
212 | return Status::OK(); |
213 | } |
214 | |
215 | bool Column::Equals(const Column& other) const { |
216 | if (!field_->Equals(other.field())) { |
217 | return false; |
218 | } |
219 | return data_->Equals(other.data()); |
220 | } |
221 | |
222 | bool Column::Equals(const std::shared_ptr<Column>& other) const { |
223 | if (this == other.get()) { |
224 | return true; |
225 | } |
226 | if (!other) { |
227 | return false; |
228 | } |
229 | |
230 | return Equals(*other.get()); |
231 | } |
232 | |
233 | Status Column::ValidateData() { |
234 | for (int i = 0; i < data_->num_chunks(); ++i) { |
235 | std::shared_ptr<DataType> type = data_->chunk(i)->type(); |
236 | if (!this->type()->Equals(type)) { |
237 | return Status::Invalid("In chunk " , i, " expected type " , this->type()->ToString(), |
238 | " but saw " , type->ToString()); |
239 | } |
240 | } |
241 | return Status::OK(); |
242 | } |
243 | |
244 | // ---------------------------------------------------------------------- |
245 | // Table methods |
246 | |
247 | /// \class SimpleTable |
248 | /// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch |
249 | class SimpleTable : public Table { |
250 | public: |
251 | SimpleTable(const std::shared_ptr<Schema>& schema, |
252 | const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows = -1) |
253 | : columns_(columns) { |
254 | schema_ = schema; |
255 | if (num_rows < 0) { |
256 | if (columns.size() == 0) { |
257 | num_rows_ = 0; |
258 | } else { |
259 | num_rows_ = columns[0]->length(); |
260 | } |
261 | } else { |
262 | num_rows_ = num_rows; |
263 | } |
264 | } |
265 | |
266 | SimpleTable(const std::shared_ptr<Schema>& schema, |
267 | const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) { |
268 | schema_ = schema; |
269 | if (num_rows < 0) { |
270 | if (columns.size() == 0) { |
271 | num_rows_ = 0; |
272 | } else { |
273 | num_rows_ = columns[0]->length(); |
274 | } |
275 | } else { |
276 | num_rows_ = num_rows; |
277 | } |
278 | |
279 | columns_.resize(columns.size()); |
280 | for (size_t i = 0; i < columns.size(); ++i) { |
281 | columns_[i] = |
282 | std::make_shared<Column>(schema->field(static_cast<int>(i)), columns[i]); |
283 | } |
284 | } |
285 | |
286 | std::shared_ptr<Column> column(int i) const override { return columns_[i]; } |
287 | |
288 | Status RemoveColumn(int i, std::shared_ptr<Table>* out) const override { |
289 | std::shared_ptr<Schema> new_schema; |
290 | RETURN_NOT_OK(schema_->RemoveField(i, &new_schema)); |
291 | |
292 | *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i), |
293 | this->num_rows()); |
294 | return Status::OK(); |
295 | } |
296 | |
297 | Status AddColumn(int i, const std::shared_ptr<Column>& col, |
298 | std::shared_ptr<Table>* out) const override { |
299 | DCHECK(col != nullptr); |
300 | |
301 | if (col->length() != num_rows_) { |
302 | return Status::Invalid( |
303 | "Added column's length must match table's length. Expected length " , num_rows_, |
304 | " but got length " , col->length()); |
305 | } |
306 | |
307 | std::shared_ptr<Schema> new_schema; |
308 | RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema)); |
309 | |
310 | *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, col)); |
311 | return Status::OK(); |
312 | } |
313 | |
314 | Status SetColumn(int i, const std::shared_ptr<Column>& col, |
315 | std::shared_ptr<Table>* out) const override { |
316 | DCHECK(col != nullptr); |
317 | |
318 | if (col->length() != num_rows_) { |
319 | return Status::Invalid( |
320 | "Added column's length must match table's length. Expected length " , num_rows_, |
321 | " but got length " , col->length()); |
322 | } |
323 | |
324 | std::shared_ptr<Schema> new_schema; |
325 | RETURN_NOT_OK(schema_->SetField(i, col->field(), &new_schema)); |
326 | |
327 | *out = Table::Make(new_schema, internal::ReplaceVectorElement(columns_, i, col)); |
328 | return Status::OK(); |
329 | } |
330 | |
331 | std::shared_ptr<Table> ReplaceSchemaMetadata( |
332 | const std::shared_ptr<const KeyValueMetadata>& metadata) const override { |
333 | auto new_schema = schema_->AddMetadata(metadata); |
334 | return Table::Make(new_schema, columns_); |
335 | } |
336 | |
337 | Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const override { |
338 | std::vector<std::shared_ptr<Field>> flattened_fields; |
339 | std::vector<std::shared_ptr<Column>> flattened_columns; |
340 | for (const auto& column : columns_) { |
341 | std::vector<std::shared_ptr<Column>> new_columns; |
342 | RETURN_NOT_OK(column->Flatten(pool, &new_columns)); |
343 | for (const auto& new_col : new_columns) { |
344 | flattened_fields.push_back(new_col->field()); |
345 | flattened_columns.push_back(new_col); |
346 | } |
347 | } |
348 | auto flattened_schema = |
349 | std::make_shared<Schema>(flattened_fields, schema_->metadata()); |
350 | *out = Table::Make(flattened_schema, flattened_columns); |
351 | return Status::OK(); |
352 | } |
353 | |
354 | Status Validate() const override { |
355 | // Make sure columns and schema are consistent |
356 | if (static_cast<int>(columns_.size()) != schema_->num_fields()) { |
357 | return Status::Invalid("Number of columns did not match schema" ); |
358 | } |
359 | for (int i = 0; i < num_columns(); ++i) { |
360 | const Column* col = columns_[i].get(); |
361 | if (col == nullptr) { |
362 | return Status::Invalid("Column " , i, " was null" ); |
363 | } |
364 | if (!col->field()->Equals(*schema_->field(i))) { |
365 | return Status::Invalid("Column field " , i, " named " , col->name(), |
366 | " is inconsistent with schema" ); |
367 | } |
368 | } |
369 | |
370 | // Make sure columns are all the same length |
371 | for (int i = 0; i < num_columns(); ++i) { |
372 | const Column* col = columns_[i].get(); |
373 | if (col->length() != num_rows_) { |
374 | return Status::Invalid("Column " , i, " named " , col->name(), " expected length " , |
375 | num_rows_, " but got length " , col->length()); |
376 | } |
377 | } |
378 | return Status::OK(); |
379 | } |
380 | |
381 | private: |
382 | std::vector<std::shared_ptr<Column>> columns_; |
383 | }; |
384 | |
385 | Table::Table() : num_rows_(0) {} |
386 | |
387 | std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema, |
388 | const std::vector<std::shared_ptr<Column>>& columns, |
389 | int64_t num_rows) { |
390 | return std::make_shared<SimpleTable>(schema, columns, num_rows); |
391 | } |
392 | |
393 | std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema, |
394 | const std::vector<std::shared_ptr<Array>>& arrays, |
395 | int64_t num_rows) { |
396 | return std::make_shared<SimpleTable>(schema, arrays, num_rows); |
397 | } |
398 | |
399 | Status Table::FromRecordBatches(const std::shared_ptr<Schema>& schema, |
400 | const std::vector<std::shared_ptr<RecordBatch>>& batches, |
401 | std::shared_ptr<Table>* table) { |
402 | const int nbatches = static_cast<int>(batches.size()); |
403 | const int ncolumns = static_cast<int>(schema->num_fields()); |
404 | |
405 | for (int i = 0; i < nbatches; ++i) { |
406 | if (!batches[i]->schema()->Equals(*schema, false)) { |
407 | return Status::Invalid("Schema at index " , static_cast<int>(i), |
408 | " was different: \n" , schema->ToString(), "\nvs\n" , |
409 | batches[i]->schema()->ToString()); |
410 | } |
411 | } |
412 | |
413 | std::vector<std::shared_ptr<Column>> columns(ncolumns); |
414 | std::vector<std::shared_ptr<Array>> column_arrays(nbatches); |
415 | |
416 | for (int i = 0; i < ncolumns; ++i) { |
417 | for (int j = 0; j < nbatches; ++j) { |
418 | column_arrays[j] = batches[j]->column(i); |
419 | } |
420 | columns[i] = std::make_shared<Column>(schema->field(i), column_arrays); |
421 | } |
422 | |
423 | *table = Table::Make(schema, columns); |
424 | return Status::OK(); |
425 | } |
426 | |
427 | Status Table::FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>>& batches, |
428 | std::shared_ptr<Table>* table) { |
429 | if (batches.size() == 0) { |
430 | return Status::Invalid("Must pass at least one record batch" ); |
431 | } |
432 | |
433 | return FromRecordBatches(batches[0]->schema(), batches, table); |
434 | } |
435 | |
436 | Status ConcatenateTables(const std::vector<std::shared_ptr<Table>>& tables, |
437 | std::shared_ptr<Table>* table) { |
438 | if (tables.size() == 0) { |
439 | return Status::Invalid("Must pass at least one table" ); |
440 | } |
441 | |
442 | std::shared_ptr<Schema> schema = tables[0]->schema(); |
443 | |
444 | const int ntables = static_cast<int>(tables.size()); |
445 | const int ncolumns = static_cast<int>(schema->num_fields()); |
446 | |
447 | for (int i = 1; i < ntables; ++i) { |
448 | if (!tables[i]->schema()->Equals(*schema, false)) { |
449 | return Status::Invalid("Schema at index " , static_cast<int>(i), |
450 | " was different: \n" , schema->ToString(), "\nvs\n" , |
451 | tables[i]->schema()->ToString()); |
452 | } |
453 | } |
454 | |
455 | std::vector<std::shared_ptr<Column>> columns(ncolumns); |
456 | for (int i = 0; i < ncolumns; ++i) { |
457 | std::vector<std::shared_ptr<Array>> column_arrays; |
458 | for (int j = 0; j < ntables; ++j) { |
459 | const std::vector<std::shared_ptr<Array>>& chunks = |
460 | tables[j]->column(i)->data()->chunks(); |
461 | for (const auto& chunk : chunks) { |
462 | column_arrays.push_back(chunk); |
463 | } |
464 | } |
465 | columns[i] = std::make_shared<Column>(schema->field(i), column_arrays); |
466 | } |
467 | *table = Table::Make(schema, columns); |
468 | return Status::OK(); |
469 | } |
470 | |
471 | bool Table::Equals(const Table& other) const { |
472 | if (this == &other) { |
473 | return true; |
474 | } |
475 | if (!schema_->Equals(*other.schema())) { |
476 | return false; |
477 | } |
478 | if (this->num_columns() != other.num_columns()) { |
479 | return false; |
480 | } |
481 | |
482 | for (int i = 0; i < this->num_columns(); i++) { |
483 | if (!this->column(i)->Equals(other.column(i))) { |
484 | return false; |
485 | } |
486 | } |
487 | return true; |
488 | } |
489 | |
490 | // ---------------------------------------------------------------------- |
491 | // Convert a table to a sequence of record batches |
492 | |
493 | class TableBatchReader::TableBatchReaderImpl { |
494 | public: |
495 | explicit TableBatchReaderImpl(const Table& table) |
496 | : table_(table), |
497 | column_data_(table.num_columns()), |
498 | chunk_numbers_(table.num_columns(), 0), |
499 | chunk_offsets_(table.num_columns(), 0), |
500 | absolute_row_position_(0), |
501 | max_chunksize_(std::numeric_limits<int64_t>::max()) { |
502 | for (int i = 0; i < table.num_columns(); ++i) { |
503 | column_data_[i] = table.column(i)->data().get(); |
504 | } |
505 | } |
506 | |
507 | Status ReadNext(std::shared_ptr<RecordBatch>* out) { |
508 | if (absolute_row_position_ == table_.num_rows()) { |
509 | *out = nullptr; |
510 | return Status::OK(); |
511 | } |
512 | |
513 | // Determine the minimum contiguous slice across all columns |
514 | int64_t chunksize = std::min(table_.num_rows(), max_chunksize_); |
515 | std::vector<const Array*> chunks(table_.num_columns()); |
516 | for (int i = 0; i < table_.num_columns(); ++i) { |
517 | auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get(); |
518 | int64_t chunk_remaining = chunk->length() - chunk_offsets_[i]; |
519 | |
520 | if (chunk_remaining < chunksize) { |
521 | chunksize = chunk_remaining; |
522 | } |
523 | |
524 | chunks[i] = chunk; |
525 | } |
526 | |
527 | // Slice chunks and advance chunk index as appropriate |
528 | std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns()); |
529 | |
530 | for (int i = 0; i < table_.num_columns(); ++i) { |
531 | // Exhausted chunk |
532 | const Array* chunk = chunks[i]; |
533 | const int64_t offset = chunk_offsets_[i]; |
534 | std::shared_ptr<ArrayData> slice_data; |
535 | if ((chunk->length() - offset) == chunksize) { |
536 | ++chunk_numbers_[i]; |
537 | chunk_offsets_[i] = 0; |
538 | if (offset > 0) { |
539 | // Need to slice |
540 | slice_data = chunk->Slice(offset, chunksize)->data(); |
541 | } else { |
542 | // No slice |
543 | slice_data = chunk->data(); |
544 | } |
545 | } else { |
546 | chunk_offsets_[i] += chunksize; |
547 | slice_data = chunk->Slice(offset, chunksize)->data(); |
548 | } |
549 | batch_data[i] = std::move(slice_data); |
550 | } |
551 | |
552 | absolute_row_position_ += chunksize; |
553 | *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); |
554 | |
555 | return Status::OK(); |
556 | } |
557 | |
558 | std::shared_ptr<Schema> schema() const { return table_.schema(); } |
559 | |
560 | void set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; } |
561 | |
562 | private: |
563 | const Table& table_; |
564 | std::vector<ChunkedArray*> column_data_; |
565 | std::vector<int> chunk_numbers_; |
566 | std::vector<int64_t> chunk_offsets_; |
567 | int64_t absolute_row_position_; |
568 | int64_t max_chunksize_; |
569 | }; |
570 | |
571 | TableBatchReader::TableBatchReader(const Table& table) { |
572 | impl_.reset(new TableBatchReaderImpl(table)); |
573 | } |
574 | |
575 | TableBatchReader::~TableBatchReader() {} |
576 | |
577 | std::shared_ptr<Schema> TableBatchReader::schema() const { return impl_->schema(); } |
578 | |
579 | void TableBatchReader::set_chunksize(int64_t chunksize) { |
580 | impl_->set_chunksize(chunksize); |
581 | } |
582 | |
583 | Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) { |
584 | return impl_->ReadNext(out); |
585 | } |
586 | |
587 | } // namespace arrow |
588 | |