1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/table.h"
19
20#include <algorithm>
21#include <cstdlib>
22#include <limits>
23#include <memory>
24#include <sstream>
25#include <utility>
26
27#include "arrow/array.h"
28#include "arrow/record_batch.h"
29#include "arrow/status.h"
30#include "arrow/type.h"
31#include "arrow/util/logging.h"
32#include "arrow/util/stl.h"
33
34namespace arrow {
35
36// ----------------------------------------------------------------------
37// ChunkedArray and Column methods
38
39ChunkedArray::ChunkedArray(const ArrayVector& chunks) : chunks_(chunks) {
40 length_ = 0;
41 null_count_ = 0;
42 DCHECK_GT(chunks.size(), 0)
43 << "cannot construct ChunkedArray from empty vector and omitted type";
44 type_ = chunks[0]->type();
45 for (const std::shared_ptr<Array>& chunk : chunks) {
46 length_ += chunk->length();
47 null_count_ += chunk->null_count();
48 }
49}
50
51ChunkedArray::ChunkedArray(const ArrayVector& chunks,
52 const std::shared_ptr<DataType>& type)
53 : chunks_(chunks), type_(type) {
54 length_ = 0;
55 null_count_ = 0;
56 for (const std::shared_ptr<Array>& chunk : chunks) {
57 length_ += chunk->length();
58 null_count_ += chunk->null_count();
59 }
60}
61
62bool ChunkedArray::Equals(const ChunkedArray& other) const {
63 if (length_ != other.length()) {
64 return false;
65 }
66 if (null_count_ != other.null_count()) {
67 return false;
68 }
69 if (length_ == 0) {
70 return type_->Equals(other.type_);
71 }
72
73 // Check contents of the underlying arrays. This checks for equality of
74 // the underlying data independently of the chunk size.
75 int this_chunk_idx = 0;
76 int64_t this_start_idx = 0;
77 int other_chunk_idx = 0;
78 int64_t other_start_idx = 0;
79
80 int64_t elements_compared = 0;
81 while (elements_compared < length_) {
82 const std::shared_ptr<Array> this_array = chunks_[this_chunk_idx];
83 const std::shared_ptr<Array> other_array = other.chunk(other_chunk_idx);
84 int64_t common_length = std::min(this_array->length() - this_start_idx,
85 other_array->length() - other_start_idx);
86 if (!this_array->RangeEquals(this_start_idx, this_start_idx + common_length,
87 other_start_idx, other_array)) {
88 return false;
89 }
90
91 elements_compared += common_length;
92
93 // If we have exhausted the current chunk, proceed to the next one individually.
94 if (this_start_idx + common_length == this_array->length()) {
95 this_chunk_idx++;
96 this_start_idx = 0;
97 } else {
98 this_start_idx += common_length;
99 }
100
101 if (other_start_idx + common_length == other_array->length()) {
102 other_chunk_idx++;
103 other_start_idx = 0;
104 } else {
105 other_start_idx += common_length;
106 }
107 }
108 return true;
109}
110
111bool ChunkedArray::Equals(const std::shared_ptr<ChunkedArray>& other) const {
112 if (this == other.get()) {
113 return true;
114 }
115 if (!other) {
116 return false;
117 }
118 return Equals(*other.get());
119}
120
121std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset, int64_t length) const {
122 DCHECK_LE(offset, length_);
123
124 int curr_chunk = 0;
125 while (curr_chunk < num_chunks() && offset >= chunk(curr_chunk)->length()) {
126 offset -= chunk(curr_chunk)->length();
127 curr_chunk++;
128 }
129
130 ArrayVector new_chunks;
131 while (curr_chunk < num_chunks() && length > 0) {
132 new_chunks.push_back(chunk(curr_chunk)->Slice(offset, length));
133 length -= chunk(curr_chunk)->length() - offset;
134 offset = 0;
135 curr_chunk++;
136 }
137
138 return std::make_shared<ChunkedArray>(new_chunks, type_);
139}
140
141std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset) const {
142 return Slice(offset, length_);
143}
144
145Status ChunkedArray::Flatten(MemoryPool* pool,
146 std::vector<std::shared_ptr<ChunkedArray>>* out) const {
147 std::vector<std::shared_ptr<ChunkedArray>> flattened;
148 if (type()->id() != Type::STRUCT) {
149 // Emulate non-existent copy constructor
150 flattened.emplace_back(std::make_shared<ChunkedArray>(chunks_, type_));
151 *out = flattened;
152 return Status::OK();
153 }
154 std::vector<ArrayVector> flattened_chunks;
155 for (const auto& chunk : chunks_) {
156 ArrayVector res;
157 RETURN_NOT_OK(dynamic_cast<const StructArray&>(*chunk).Flatten(pool, &res));
158 if (!flattened_chunks.size()) {
159 // First chunk
160 for (const auto& array : res) {
161 flattened_chunks.push_back({array});
162 }
163 } else {
164 DCHECK_EQ(flattened_chunks.size(), res.size());
165 for (size_t i = 0; i < res.size(); ++i) {
166 flattened_chunks[i].push_back(res[i]);
167 }
168 }
169 }
170 for (const auto& vec : flattened_chunks) {
171 flattened.emplace_back(std::make_shared<ChunkedArray>(vec));
172 }
173 *out = flattened;
174 return Status::OK();
175}
176
177Column::Column(const std::shared_ptr<Field>& field, const ArrayVector& chunks)
178 : field_(field) {
179 data_ = std::make_shared<ChunkedArray>(chunks, field->type());
180}
181
182Column::Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& data)
183 : field_(field) {
184 if (!data) {
185 data_ = std::make_shared<ChunkedArray>(ArrayVector({}), field->type());
186 } else {
187 data_ = std::make_shared<ChunkedArray>(ArrayVector({data}), field->type());
188 }
189}
190
191Column::Column(const std::string& name, const std::shared_ptr<Array>& data)
192 : Column(::arrow::field(name, data->type()), data) {}
193
194Column::Column(const std::string& name, const std::shared_ptr<ChunkedArray>& data)
195 : Column(::arrow::field(name, data->type()), data) {}
196
197Column::Column(const std::shared_ptr<Field>& field,
198 const std::shared_ptr<ChunkedArray>& data)
199 : field_(field), data_(data) {}
200
201Status Column::Flatten(MemoryPool* pool,
202 std::vector<std::shared_ptr<Column>>* out) const {
203 std::vector<std::shared_ptr<Column>> flattened;
204 std::vector<std::shared_ptr<Field>> flattened_fields = field_->Flatten();
205 std::vector<std::shared_ptr<ChunkedArray>> flattened_data;
206 RETURN_NOT_OK(data_->Flatten(pool, &flattened_data));
207 DCHECK_EQ(flattened_fields.size(), flattened_data.size());
208 for (size_t i = 0; i < flattened_fields.size(); ++i) {
209 flattened.push_back(std::make_shared<Column>(flattened_fields[i], flattened_data[i]));
210 }
211 *out = flattened;
212 return Status::OK();
213}
214
215bool Column::Equals(const Column& other) const {
216 if (!field_->Equals(other.field())) {
217 return false;
218 }
219 return data_->Equals(other.data());
220}
221
222bool Column::Equals(const std::shared_ptr<Column>& other) const {
223 if (this == other.get()) {
224 return true;
225 }
226 if (!other) {
227 return false;
228 }
229
230 return Equals(*other.get());
231}
232
233Status Column::ValidateData() {
234 for (int i = 0; i < data_->num_chunks(); ++i) {
235 std::shared_ptr<DataType> type = data_->chunk(i)->type();
236 if (!this->type()->Equals(type)) {
237 return Status::Invalid("In chunk ", i, " expected type ", this->type()->ToString(),
238 " but saw ", type->ToString());
239 }
240 }
241 return Status::OK();
242}
243
244// ----------------------------------------------------------------------
245// Table methods
246
247/// \class SimpleTable
248/// \brief A basic, non-lazy in-memory table, like SimpleRecordBatch
249class SimpleTable : public Table {
250 public:
251 SimpleTable(const std::shared_ptr<Schema>& schema,
252 const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows = -1)
253 : columns_(columns) {
254 schema_ = schema;
255 if (num_rows < 0) {
256 if (columns.size() == 0) {
257 num_rows_ = 0;
258 } else {
259 num_rows_ = columns[0]->length();
260 }
261 } else {
262 num_rows_ = num_rows;
263 }
264 }
265
266 SimpleTable(const std::shared_ptr<Schema>& schema,
267 const std::vector<std::shared_ptr<Array>>& columns, int64_t num_rows = -1) {
268 schema_ = schema;
269 if (num_rows < 0) {
270 if (columns.size() == 0) {
271 num_rows_ = 0;
272 } else {
273 num_rows_ = columns[0]->length();
274 }
275 } else {
276 num_rows_ = num_rows;
277 }
278
279 columns_.resize(columns.size());
280 for (size_t i = 0; i < columns.size(); ++i) {
281 columns_[i] =
282 std::make_shared<Column>(schema->field(static_cast<int>(i)), columns[i]);
283 }
284 }
285
286 std::shared_ptr<Column> column(int i) const override { return columns_[i]; }
287
288 Status RemoveColumn(int i, std::shared_ptr<Table>* out) const override {
289 std::shared_ptr<Schema> new_schema;
290 RETURN_NOT_OK(schema_->RemoveField(i, &new_schema));
291
292 *out = Table::Make(new_schema, internal::DeleteVectorElement(columns_, i),
293 this->num_rows());
294 return Status::OK();
295 }
296
297 Status AddColumn(int i, const std::shared_ptr<Column>& col,
298 std::shared_ptr<Table>* out) const override {
299 DCHECK(col != nullptr);
300
301 if (col->length() != num_rows_) {
302 return Status::Invalid(
303 "Added column's length must match table's length. Expected length ", num_rows_,
304 " but got length ", col->length());
305 }
306
307 std::shared_ptr<Schema> new_schema;
308 RETURN_NOT_OK(schema_->AddField(i, col->field(), &new_schema));
309
310 *out = Table::Make(new_schema, internal::AddVectorElement(columns_, i, col));
311 return Status::OK();
312 }
313
314 Status SetColumn(int i, const std::shared_ptr<Column>& col,
315 std::shared_ptr<Table>* out) const override {
316 DCHECK(col != nullptr);
317
318 if (col->length() != num_rows_) {
319 return Status::Invalid(
320 "Added column's length must match table's length. Expected length ", num_rows_,
321 " but got length ", col->length());
322 }
323
324 std::shared_ptr<Schema> new_schema;
325 RETURN_NOT_OK(schema_->SetField(i, col->field(), &new_schema));
326
327 *out = Table::Make(new_schema, internal::ReplaceVectorElement(columns_, i, col));
328 return Status::OK();
329 }
330
331 std::shared_ptr<Table> ReplaceSchemaMetadata(
332 const std::shared_ptr<const KeyValueMetadata>& metadata) const override {
333 auto new_schema = schema_->AddMetadata(metadata);
334 return Table::Make(new_schema, columns_);
335 }
336
337 Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const override {
338 std::vector<std::shared_ptr<Field>> flattened_fields;
339 std::vector<std::shared_ptr<Column>> flattened_columns;
340 for (const auto& column : columns_) {
341 std::vector<std::shared_ptr<Column>> new_columns;
342 RETURN_NOT_OK(column->Flatten(pool, &new_columns));
343 for (const auto& new_col : new_columns) {
344 flattened_fields.push_back(new_col->field());
345 flattened_columns.push_back(new_col);
346 }
347 }
348 auto flattened_schema =
349 std::make_shared<Schema>(flattened_fields, schema_->metadata());
350 *out = Table::Make(flattened_schema, flattened_columns);
351 return Status::OK();
352 }
353
354 Status Validate() const override {
355 // Make sure columns and schema are consistent
356 if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
357 return Status::Invalid("Number of columns did not match schema");
358 }
359 for (int i = 0; i < num_columns(); ++i) {
360 const Column* col = columns_[i].get();
361 if (col == nullptr) {
362 return Status::Invalid("Column ", i, " was null");
363 }
364 if (!col->field()->Equals(*schema_->field(i))) {
365 return Status::Invalid("Column field ", i, " named ", col->name(),
366 " is inconsistent with schema");
367 }
368 }
369
370 // Make sure columns are all the same length
371 for (int i = 0; i < num_columns(); ++i) {
372 const Column* col = columns_[i].get();
373 if (col->length() != num_rows_) {
374 return Status::Invalid("Column ", i, " named ", col->name(), " expected length ",
375 num_rows_, " but got length ", col->length());
376 }
377 }
378 return Status::OK();
379 }
380
381 private:
382 std::vector<std::shared_ptr<Column>> columns_;
383};
384
385Table::Table() : num_rows_(0) {}
386
387std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema,
388 const std::vector<std::shared_ptr<Column>>& columns,
389 int64_t num_rows) {
390 return std::make_shared<SimpleTable>(schema, columns, num_rows);
391}
392
393std::shared_ptr<Table> Table::Make(const std::shared_ptr<Schema>& schema,
394 const std::vector<std::shared_ptr<Array>>& arrays,
395 int64_t num_rows) {
396 return std::make_shared<SimpleTable>(schema, arrays, num_rows);
397}
398
399Status Table::FromRecordBatches(const std::shared_ptr<Schema>& schema,
400 const std::vector<std::shared_ptr<RecordBatch>>& batches,
401 std::shared_ptr<Table>* table) {
402 const int nbatches = static_cast<int>(batches.size());
403 const int ncolumns = static_cast<int>(schema->num_fields());
404
405 for (int i = 0; i < nbatches; ++i) {
406 if (!batches[i]->schema()->Equals(*schema, false)) {
407 return Status::Invalid("Schema at index ", static_cast<int>(i),
408 " was different: \n", schema->ToString(), "\nvs\n",
409 batches[i]->schema()->ToString());
410 }
411 }
412
413 std::vector<std::shared_ptr<Column>> columns(ncolumns);
414 std::vector<std::shared_ptr<Array>> column_arrays(nbatches);
415
416 for (int i = 0; i < ncolumns; ++i) {
417 for (int j = 0; j < nbatches; ++j) {
418 column_arrays[j] = batches[j]->column(i);
419 }
420 columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
421 }
422
423 *table = Table::Make(schema, columns);
424 return Status::OK();
425}
426
427Status Table::FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>>& batches,
428 std::shared_ptr<Table>* table) {
429 if (batches.size() == 0) {
430 return Status::Invalid("Must pass at least one record batch");
431 }
432
433 return FromRecordBatches(batches[0]->schema(), batches, table);
434}
435
436Status ConcatenateTables(const std::vector<std::shared_ptr<Table>>& tables,
437 std::shared_ptr<Table>* table) {
438 if (tables.size() == 0) {
439 return Status::Invalid("Must pass at least one table");
440 }
441
442 std::shared_ptr<Schema> schema = tables[0]->schema();
443
444 const int ntables = static_cast<int>(tables.size());
445 const int ncolumns = static_cast<int>(schema->num_fields());
446
447 for (int i = 1; i < ntables; ++i) {
448 if (!tables[i]->schema()->Equals(*schema, false)) {
449 return Status::Invalid("Schema at index ", static_cast<int>(i),
450 " was different: \n", schema->ToString(), "\nvs\n",
451 tables[i]->schema()->ToString());
452 }
453 }
454
455 std::vector<std::shared_ptr<Column>> columns(ncolumns);
456 for (int i = 0; i < ncolumns; ++i) {
457 std::vector<std::shared_ptr<Array>> column_arrays;
458 for (int j = 0; j < ntables; ++j) {
459 const std::vector<std::shared_ptr<Array>>& chunks =
460 tables[j]->column(i)->data()->chunks();
461 for (const auto& chunk : chunks) {
462 column_arrays.push_back(chunk);
463 }
464 }
465 columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
466 }
467 *table = Table::Make(schema, columns);
468 return Status::OK();
469}
470
471bool Table::Equals(const Table& other) const {
472 if (this == &other) {
473 return true;
474 }
475 if (!schema_->Equals(*other.schema())) {
476 return false;
477 }
478 if (this->num_columns() != other.num_columns()) {
479 return false;
480 }
481
482 for (int i = 0; i < this->num_columns(); i++) {
483 if (!this->column(i)->Equals(other.column(i))) {
484 return false;
485 }
486 }
487 return true;
488}
489
490// ----------------------------------------------------------------------
491// Convert a table to a sequence of record batches
492
493class TableBatchReader::TableBatchReaderImpl {
494 public:
495 explicit TableBatchReaderImpl(const Table& table)
496 : table_(table),
497 column_data_(table.num_columns()),
498 chunk_numbers_(table.num_columns(), 0),
499 chunk_offsets_(table.num_columns(), 0),
500 absolute_row_position_(0),
501 max_chunksize_(std::numeric_limits<int64_t>::max()) {
502 for (int i = 0; i < table.num_columns(); ++i) {
503 column_data_[i] = table.column(i)->data().get();
504 }
505 }
506
507 Status ReadNext(std::shared_ptr<RecordBatch>* out) {
508 if (absolute_row_position_ == table_.num_rows()) {
509 *out = nullptr;
510 return Status::OK();
511 }
512
513 // Determine the minimum contiguous slice across all columns
514 int64_t chunksize = std::min(table_.num_rows(), max_chunksize_);
515 std::vector<const Array*> chunks(table_.num_columns());
516 for (int i = 0; i < table_.num_columns(); ++i) {
517 auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get();
518 int64_t chunk_remaining = chunk->length() - chunk_offsets_[i];
519
520 if (chunk_remaining < chunksize) {
521 chunksize = chunk_remaining;
522 }
523
524 chunks[i] = chunk;
525 }
526
527 // Slice chunks and advance chunk index as appropriate
528 std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns());
529
530 for (int i = 0; i < table_.num_columns(); ++i) {
531 // Exhausted chunk
532 const Array* chunk = chunks[i];
533 const int64_t offset = chunk_offsets_[i];
534 std::shared_ptr<ArrayData> slice_data;
535 if ((chunk->length() - offset) == chunksize) {
536 ++chunk_numbers_[i];
537 chunk_offsets_[i] = 0;
538 if (offset > 0) {
539 // Need to slice
540 slice_data = chunk->Slice(offset, chunksize)->data();
541 } else {
542 // No slice
543 slice_data = chunk->data();
544 }
545 } else {
546 chunk_offsets_[i] += chunksize;
547 slice_data = chunk->Slice(offset, chunksize)->data();
548 }
549 batch_data[i] = std::move(slice_data);
550 }
551
552 absolute_row_position_ += chunksize;
553 *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data));
554
555 return Status::OK();
556 }
557
558 std::shared_ptr<Schema> schema() const { return table_.schema(); }
559
560 void set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; }
561
562 private:
563 const Table& table_;
564 std::vector<ChunkedArray*> column_data_;
565 std::vector<int> chunk_numbers_;
566 std::vector<int64_t> chunk_offsets_;
567 int64_t absolute_row_position_;
568 int64_t max_chunksize_;
569};
570
571TableBatchReader::TableBatchReader(const Table& table) {
572 impl_.reset(new TableBatchReaderImpl(table));
573}
574
575TableBatchReader::~TableBatchReader() {}
576
577std::shared_ptr<Schema> TableBatchReader::schema() const { return impl_->schema(); }
578
579void TableBatchReader::set_chunksize(int64_t chunksize) {
580 impl_->set_chunksize(chunksize);
581}
582
583Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) {
584 return impl_->ReadNext(out);
585}
586
587} // namespace arrow
588