1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/ipc/json-internal.h" |
19 | |
20 | #include <cstdint> |
21 | #include <cstdlib> |
22 | #include <memory> |
23 | #include <sstream> |
24 | #include <string> |
25 | #include <type_traits> |
26 | #include <unordered_map> |
27 | #include <utility> |
28 | #include <vector> |
29 | |
30 | #include "arrow/array.h" |
31 | #include "arrow/buffer.h" |
32 | #include "arrow/builder.h" // IWYU pragma: keep |
33 | #include "arrow/ipc/dictionary.h" |
34 | #include "arrow/record_batch.h" |
35 | #include "arrow/status.h" |
36 | #include "arrow/type.h" |
37 | #include "arrow/type_traits.h" |
38 | #include "arrow/util/bit-util.h" |
39 | #include "arrow/util/checked_cast.h" |
40 | #include "arrow/util/decimal.h" |
41 | #include "arrow/util/logging.h" |
42 | #include "arrow/util/string.h" |
43 | #include "arrow/visitor_inline.h" |
44 | |
45 | namespace arrow { |
46 | |
47 | class MemoryPool; |
48 | |
49 | using internal::checked_cast; |
50 | |
51 | namespace ipc { |
52 | namespace internal { |
53 | namespace json { |
54 | |
55 | using ::arrow::ipc::DictionaryMemo; |
56 | using ::arrow::ipc::DictionaryTypeMap; |
57 | |
58 | static std::string GetFloatingPrecisionName(FloatingPoint::Precision precision) { |
59 | switch (precision) { |
60 | case FloatingPoint::HALF: |
61 | return "HALF" ; |
62 | case FloatingPoint::SINGLE: |
63 | return "SINGLE" ; |
64 | case FloatingPoint::DOUBLE: |
65 | return "DOUBLE" ; |
66 | default: |
67 | break; |
68 | } |
69 | return "UNKNOWN" ; |
70 | } |
71 | |
72 | static std::string GetTimeUnitName(TimeUnit::type unit) { |
73 | switch (unit) { |
74 | case TimeUnit::SECOND: |
75 | return "SECOND" ; |
76 | case TimeUnit::MILLI: |
77 | return "MILLISECOND" ; |
78 | case TimeUnit::MICRO: |
79 | return "MICROSECOND" ; |
80 | case TimeUnit::NANO: |
81 | return "NANOSECOND" ; |
82 | default: |
83 | break; |
84 | } |
85 | return "UNKNOWN" ; |
86 | } |
87 | |
88 | class SchemaWriter { |
89 | public: |
90 | explicit SchemaWriter(const Schema& schema, RjWriter* writer) |
91 | : schema_(schema), writer_(writer) {} |
92 | |
93 | Status Write() { |
94 | writer_->Key("schema" ); |
95 | writer_->StartObject(); |
96 | writer_->Key("fields" ); |
97 | writer_->StartArray(); |
98 | for (const std::shared_ptr<Field>& field : schema_.fields()) { |
99 | RETURN_NOT_OK(VisitField(*field)); |
100 | } |
101 | writer_->EndArray(); |
102 | writer_->EndObject(); |
103 | |
104 | // Write dictionaries, if any |
105 | if (dictionary_memo_.size() > 0) { |
106 | writer_->Key("dictionaries" ); |
107 | writer_->StartArray(); |
108 | for (const auto& entry : dictionary_memo_.id_to_dictionary()) { |
109 | RETURN_NOT_OK(WriteDictionary(entry.first, entry.second)); |
110 | } |
111 | writer_->EndArray(); |
112 | } |
113 | return Status::OK(); |
114 | } |
115 | |
116 | Status WriteDictionary(int64_t id, const std::shared_ptr<Array>& dictionary) { |
117 | writer_->StartObject(); |
118 | writer_->Key("id" ); |
119 | writer_->Int(static_cast<int32_t>(id)); |
120 | writer_->Key("data" ); |
121 | |
122 | // Make a dummy record batch. A bit tedious as we have to make a schema |
123 | auto schema = ::arrow::schema({arrow::field("dictionary" , dictionary->type())}); |
124 | auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); |
125 | RETURN_NOT_OK(WriteRecordBatch(*batch, writer_)); |
126 | writer_->EndObject(); |
127 | return Status::OK(); |
128 | } |
129 | |
130 | Status WriteDictionaryMetadata(const DictionaryType& type) { |
131 | int64_t dictionary_id = dictionary_memo_.GetId(type.dictionary()); |
132 | writer_->Key("dictionary" ); |
133 | |
134 | // Emulate DictionaryEncoding from Schema.fbs |
135 | writer_->StartObject(); |
136 | writer_->Key("id" ); |
137 | writer_->Int(static_cast<int32_t>(dictionary_id)); |
138 | writer_->Key("indexType" ); |
139 | |
140 | writer_->StartObject(); |
141 | RETURN_NOT_OK(VisitType(*type.index_type())); |
142 | writer_->EndObject(); |
143 | |
144 | writer_->Key("isOrdered" ); |
145 | writer_->Bool(type.ordered()); |
146 | writer_->EndObject(); |
147 | |
148 | return Status::OK(); |
149 | } |
150 | |
151 | Status VisitField(const Field& field) { |
152 | writer_->StartObject(); |
153 | |
154 | writer_->Key("name" ); |
155 | writer_->String(field.name().c_str()); |
156 | |
157 | writer_->Key("nullable" ); |
158 | writer_->Bool(field.nullable()); |
159 | |
160 | const DataType& type = *field.type(); |
161 | |
162 | // Visit the type |
163 | writer_->Key("type" ); |
164 | writer_->StartObject(); |
165 | RETURN_NOT_OK(VisitType(type)); |
166 | writer_->EndObject(); |
167 | |
168 | if (type.id() == Type::DICTIONARY) { |
169 | const auto& dict_type = checked_cast<const DictionaryType&>(type); |
170 | RETURN_NOT_OK(WriteDictionaryMetadata(dict_type)); |
171 | |
172 | const DataType& dictionary_type = *dict_type.dictionary()->type(); |
173 | RETURN_NOT_OK(WriteChildren(dictionary_type.children())); |
174 | } else { |
175 | RETURN_NOT_OK(WriteChildren(type.children())); |
176 | } |
177 | |
178 | writer_->EndObject(); |
179 | |
180 | return Status::OK(); |
181 | } |
182 | |
183 | Status VisitType(const DataType& type); |
184 | |
185 | template <typename T> |
186 | typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value || |
187 | std::is_base_of<ListType, T>::value || |
188 | std::is_base_of<StructType, T>::value, |
189 | void>::type |
190 | WriteTypeMetadata(const T& type) {} |
191 | |
192 | void WriteTypeMetadata(const Integer& type) { |
193 | writer_->Key("bitWidth" ); |
194 | writer_->Int(type.bit_width()); |
195 | writer_->Key("isSigned" ); |
196 | writer_->Bool(type.is_signed()); |
197 | } |
198 | |
199 | void WriteTypeMetadata(const FloatingPoint& type) { |
200 | writer_->Key("precision" ); |
201 | writer_->String(GetFloatingPrecisionName(type.precision())); |
202 | } |
203 | |
204 | void WriteTypeMetadata(const IntervalType& type) { |
205 | writer_->Key("unit" ); |
206 | switch (type.unit()) { |
207 | case IntervalType::Unit::YEAR_MONTH: |
208 | writer_->String("YEAR_MONTH" ); |
209 | break; |
210 | case IntervalType::Unit::DAY_TIME: |
211 | writer_->String("DAY_TIME" ); |
212 | break; |
213 | } |
214 | } |
215 | |
216 | void WriteTypeMetadata(const TimestampType& type) { |
217 | writer_->Key("unit" ); |
218 | writer_->String(GetTimeUnitName(type.unit())); |
219 | if (type.timezone().size() > 0) { |
220 | writer_->Key("timezone" ); |
221 | writer_->String(type.timezone()); |
222 | } |
223 | } |
224 | |
225 | void WriteTypeMetadata(const TimeType& type) { |
226 | writer_->Key("unit" ); |
227 | writer_->String(GetTimeUnitName(type.unit())); |
228 | writer_->Key("bitWidth" ); |
229 | writer_->Int(type.bit_width()); |
230 | } |
231 | |
232 | void WriteTypeMetadata(const DateType& type) { |
233 | writer_->Key("unit" ); |
234 | switch (type.unit()) { |
235 | case DateUnit::DAY: |
236 | writer_->String("DAY" ); |
237 | break; |
238 | case DateUnit::MILLI: |
239 | writer_->String("MILLISECOND" ); |
240 | break; |
241 | } |
242 | } |
243 | |
244 | void WriteTypeMetadata(const FixedSizeBinaryType& type) { |
245 | writer_->Key("byteWidth" ); |
246 | writer_->Int(type.byte_width()); |
247 | } |
248 | |
249 | void WriteTypeMetadata(const Decimal128Type& type) { |
250 | writer_->Key("precision" ); |
251 | writer_->Int(type.precision()); |
252 | writer_->Key("scale" ); |
253 | writer_->Int(type.scale()); |
254 | } |
255 | |
256 | void WriteTypeMetadata(const UnionType& type) { |
257 | writer_->Key("mode" ); |
258 | switch (type.mode()) { |
259 | case UnionMode::SPARSE: |
260 | writer_->String("SPARSE" ); |
261 | break; |
262 | case UnionMode::DENSE: |
263 | writer_->String("DENSE" ); |
264 | break; |
265 | } |
266 | |
267 | // Write type ids |
268 | writer_->Key("typeIds" ); |
269 | writer_->StartArray(); |
270 | for (size_t i = 0; i < type.type_codes().size(); ++i) { |
271 | writer_->Uint(type.type_codes()[i]); |
272 | } |
273 | writer_->EndArray(); |
274 | } |
275 | |
276 | // TODO(wesm): Other Type metadata |
277 | |
278 | template <typename T> |
279 | void WriteName(const std::string& typeclass, const T& type) { |
280 | writer_->Key("name" ); |
281 | writer_->String(typeclass); |
282 | WriteTypeMetadata(type); |
283 | } |
284 | |
285 | template <typename T> |
286 | Status WritePrimitive(const std::string& typeclass, const T& type) { |
287 | WriteName(typeclass, type); |
288 | return Status::OK(); |
289 | } |
290 | |
291 | template <typename T> |
292 | Status WriteVarBytes(const std::string& typeclass, const T& type) { |
293 | WriteName(typeclass, type); |
294 | return Status::OK(); |
295 | } |
296 | |
297 | Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) { |
298 | writer_->Key("children" ); |
299 | writer_->StartArray(); |
300 | for (const std::shared_ptr<Field>& field : children) { |
301 | RETURN_NOT_OK(VisitField(*field)); |
302 | } |
303 | writer_->EndArray(); |
304 | return Status::OK(); |
305 | } |
306 | |
307 | Status Visit(const NullType& type) { return WritePrimitive("null" , type); } |
308 | Status Visit(const BooleanType& type) { return WritePrimitive("bool" , type); } |
309 | Status Visit(const Integer& type) { return WritePrimitive("int" , type); } |
310 | |
311 | Status Visit(const FloatingPoint& type) { |
312 | return WritePrimitive("floatingpoint" , type); |
313 | } |
314 | |
315 | Status Visit(const DateType& type) { return WritePrimitive("date" , type); } |
316 | Status Visit(const TimeType& type) { return WritePrimitive("time" , type); } |
317 | Status Visit(const StringType& type) { return WriteVarBytes("utf8" , type); } |
318 | Status Visit(const BinaryType& type) { return WriteVarBytes("binary" , type); } |
319 | Status Visit(const FixedSizeBinaryType& type) { |
320 | return WritePrimitive("fixedsizebinary" , type); |
321 | } |
322 | |
323 | Status Visit(const Decimal128Type& type) { return WritePrimitive("decimal" , type); } |
324 | Status Visit(const TimestampType& type) { return WritePrimitive("timestamp" , type); } |
325 | Status Visit(const IntervalType& type) { return WritePrimitive("interval" , type); } |
326 | |
327 | Status Visit(const ListType& type) { |
328 | WriteName("list" , type); |
329 | return Status::OK(); |
330 | } |
331 | |
332 | Status Visit(const StructType& type) { |
333 | WriteName("struct" , type); |
334 | return Status::OK(); |
335 | } |
336 | |
337 | Status Visit(const UnionType& type) { |
338 | WriteName("union" , type); |
339 | return Status::OK(); |
340 | } |
341 | |
342 | Status Visit(const DictionaryType& type) { |
343 | return VisitType(*type.dictionary()->type()); |
344 | } |
345 | |
346 | private: |
347 | DictionaryMemo dictionary_memo_; |
348 | |
349 | const Schema& schema_; |
350 | RjWriter* writer_; |
351 | }; |
352 | |
353 | Status SchemaWriter::VisitType(const DataType& type) { |
354 | return VisitTypeInline(type, this); |
355 | } |
356 | |
357 | class ArrayWriter { |
358 | public: |
359 | ArrayWriter(const std::string& name, const Array& array, RjWriter* writer) |
360 | : name_(name), array_(array), writer_(writer) {} |
361 | |
362 | Status Write() { return VisitArray(name_, array_); } |
363 | |
364 | Status VisitArrayValues(const Array& arr) { return VisitArrayInline(arr, this); } |
365 | |
366 | Status VisitArray(const std::string& name, const Array& arr) { |
367 | writer_->StartObject(); |
368 | writer_->Key("name" ); |
369 | writer_->String(name); |
370 | |
371 | writer_->Key("count" ); |
372 | writer_->Int(static_cast<int32_t>(arr.length())); |
373 | |
374 | RETURN_NOT_OK(VisitArrayValues(arr)); |
375 | |
376 | writer_->EndObject(); |
377 | return Status::OK(); |
378 | } |
379 | |
380 | template <typename T> |
381 | typename std::enable_if<IsSignedInt<T>::value, void>::type WriteDataValues( |
382 | const T& arr) { |
383 | static const char null_string[] = "0" ; |
384 | const auto data = arr.raw_values(); |
385 | for (int64_t i = 0; i < arr.length(); ++i) { |
386 | if (arr.IsValid(i)) { |
387 | writer_->Int64(data[i]); |
388 | } else { |
389 | writer_->RawNumber(null_string, sizeof(null_string)); |
390 | } |
391 | } |
392 | } |
393 | |
394 | template <typename T> |
395 | typename std::enable_if<IsUnsignedInt<T>::value, void>::type WriteDataValues( |
396 | const T& arr) { |
397 | static const char null_string[] = "0" ; |
398 | const auto data = arr.raw_values(); |
399 | for (int64_t i = 0; i < arr.length(); ++i) { |
400 | if (arr.IsValid(i)) { |
401 | writer_->Uint64(data[i]); |
402 | } else { |
403 | writer_->RawNumber(null_string, sizeof(null_string)); |
404 | } |
405 | } |
406 | } |
407 | |
408 | template <typename T> |
409 | typename std::enable_if<IsFloatingPoint<T>::value, void>::type WriteDataValues( |
410 | const T& arr) { |
411 | static const char null_string[] = "0." ; |
412 | const auto data = arr.raw_values(); |
413 | for (int64_t i = 0; i < arr.length(); ++i) { |
414 | if (arr.IsValid(i)) { |
415 | writer_->Double(data[i]); |
416 | } else { |
417 | writer_->RawNumber(null_string, sizeof(null_string)); |
418 | } |
419 | } |
420 | } |
421 | |
422 | // Binary, encode to hexadecimal. UTF8 string write as is |
423 | template <typename T> |
424 | typename std::enable_if<std::is_base_of<BinaryArray, T>::value, void>::type |
425 | WriteDataValues(const T& arr) { |
426 | for (int64_t i = 0; i < arr.length(); ++i) { |
427 | int32_t length; |
428 | const uint8_t* buf = arr.GetValue(i, &length); |
429 | |
430 | if (std::is_base_of<StringArray, T>::value) { |
431 | // Presumed UTF-8 |
432 | writer_->String(reinterpret_cast<const char*>(buf), length); |
433 | } else { |
434 | writer_->String(HexEncode(buf, length)); |
435 | } |
436 | } |
437 | } |
438 | |
439 | void WriteDataValues(const FixedSizeBinaryArray& arr) { |
440 | const int32_t width = arr.byte_width(); |
441 | |
442 | for (int64_t i = 0; i < arr.length(); ++i) { |
443 | const uint8_t* buf = arr.GetValue(i); |
444 | std::string encoded = HexEncode(buf, width); |
445 | writer_->String(encoded); |
446 | } |
447 | } |
448 | |
449 | void WriteDataValues(const Decimal128Array& arr) { |
450 | static const char null_string[] = "0" ; |
451 | for (int64_t i = 0; i < arr.length(); ++i) { |
452 | if (arr.IsValid(i)) { |
453 | const Decimal128 value(arr.GetValue(i)); |
454 | writer_->String(value.ToIntegerString()); |
455 | } else { |
456 | writer_->String(null_string, sizeof(null_string)); |
457 | } |
458 | } |
459 | } |
460 | |
461 | void WriteDataValues(const BooleanArray& arr) { |
462 | for (int64_t i = 0; i < arr.length(); ++i) { |
463 | if (arr.IsValid(i)) { |
464 | writer_->Bool(arr.Value(i)); |
465 | } else { |
466 | writer_->Bool(false); |
467 | } |
468 | } |
469 | } |
470 | |
471 | template <typename T> |
472 | void WriteDataField(const T& arr) { |
473 | writer_->Key("DATA" ); |
474 | writer_->StartArray(); |
475 | WriteDataValues(arr); |
476 | writer_->EndArray(); |
477 | } |
478 | |
479 | template <typename T> |
480 | void WriteIntegerField(const char* name, const T* values, int64_t length) { |
481 | writer_->Key(name); |
482 | writer_->StartArray(); |
483 | for (int i = 0; i < length; ++i) { |
484 | writer_->Int64(values[i]); |
485 | } |
486 | writer_->EndArray(); |
487 | } |
488 | |
489 | void WriteValidityField(const Array& arr) { |
490 | writer_->Key("VALIDITY" ); |
491 | writer_->StartArray(); |
492 | if (arr.null_count() > 0) { |
493 | for (int i = 0; i < arr.length(); ++i) { |
494 | writer_->Int(arr.IsNull(i) ? 0 : 1); |
495 | } |
496 | } else { |
497 | for (int i = 0; i < arr.length(); ++i) { |
498 | writer_->Int(1); |
499 | } |
500 | } |
501 | writer_->EndArray(); |
502 | } |
503 | |
504 | void SetNoChildren() { |
505 | writer_->Key("children" ); |
506 | writer_->StartArray(); |
507 | writer_->EndArray(); |
508 | } |
509 | |
510 | Status WriteChildren(const std::vector<std::shared_ptr<Field>>& fields, |
511 | const std::vector<std::shared_ptr<Array>>& arrays) { |
512 | writer_->Key("children" ); |
513 | writer_->StartArray(); |
514 | for (size_t i = 0; i < fields.size(); ++i) { |
515 | RETURN_NOT_OK(VisitArray(fields[i]->name(), *arrays[i])); |
516 | } |
517 | writer_->EndArray(); |
518 | return Status::OK(); |
519 | } |
520 | |
521 | Status Visit(const NullArray& array) { |
522 | SetNoChildren(); |
523 | return Status::OK(); |
524 | } |
525 | |
526 | template <typename T> |
527 | typename std::enable_if<std::is_base_of<PrimitiveArray, T>::value, Status>::type Visit( |
528 | const T& array) { |
529 | WriteValidityField(array); |
530 | WriteDataField(array); |
531 | SetNoChildren(); |
532 | return Status::OK(); |
533 | } |
534 | |
535 | template <typename T> |
536 | typename std::enable_if<std::is_base_of<BinaryArray, T>::value, Status>::type Visit( |
537 | const T& array) { |
538 | WriteValidityField(array); |
539 | WriteIntegerField("OFFSET" , array.raw_value_offsets(), array.length() + 1); |
540 | WriteDataField(array); |
541 | SetNoChildren(); |
542 | return Status::OK(); |
543 | } |
544 | |
545 | Status Visit(const DictionaryArray& array) { |
546 | return VisitArrayValues(*array.indices()); |
547 | } |
548 | |
549 | Status Visit(const ListArray& array) { |
550 | WriteValidityField(array); |
551 | WriteIntegerField("OFFSET" , array.raw_value_offsets(), array.length() + 1); |
552 | const auto& type = checked_cast<const ListType&>(*array.type()); |
553 | return WriteChildren(type.children(), {array.values()}); |
554 | } |
555 | |
556 | Status Visit(const StructArray& array) { |
557 | WriteValidityField(array); |
558 | const auto& type = checked_cast<const StructType&>(*array.type()); |
559 | std::vector<std::shared_ptr<Array>> children; |
560 | children.reserve(array.num_fields()); |
561 | for (int i = 0; i < array.num_fields(); ++i) { |
562 | children.emplace_back(array.field(i)); |
563 | } |
564 | return WriteChildren(type.children(), children); |
565 | } |
566 | |
567 | Status Visit(const UnionArray& array) { |
568 | WriteValidityField(array); |
569 | const auto& type = checked_cast<const UnionType&>(*array.type()); |
570 | |
571 | WriteIntegerField("TYPE_ID" , array.raw_type_ids(), array.length()); |
572 | if (type.mode() == UnionMode::DENSE) { |
573 | WriteIntegerField("OFFSET" , array.raw_value_offsets(), array.length()); |
574 | } |
575 | std::vector<std::shared_ptr<Array>> children; |
576 | children.reserve(array.num_fields()); |
577 | for (int i = 0; i < array.num_fields(); ++i) { |
578 | children.emplace_back(array.child(i)); |
579 | } |
580 | return WriteChildren(type.children(), children); |
581 | } |
582 | |
583 | private: |
584 | const std::string& name_; |
585 | const Array& array_; |
586 | RjWriter* writer_; |
587 | }; |
588 | |
589 | static Status GetObjectInt(const RjObject& obj, const std::string& key, int* out) { |
590 | const auto& it = obj.FindMember(key); |
591 | RETURN_NOT_INT(key, it, obj); |
592 | *out = it->value.GetInt(); |
593 | return Status::OK(); |
594 | } |
595 | |
596 | static Status GetObjectBool(const RjObject& obj, const std::string& key, bool* out) { |
597 | const auto& it = obj.FindMember(key); |
598 | RETURN_NOT_BOOL(key, it, obj); |
599 | *out = it->value.GetBool(); |
600 | return Status::OK(); |
601 | } |
602 | |
603 | static Status GetObjectString(const RjObject& obj, const std::string& key, |
604 | std::string* out) { |
605 | const auto& it = obj.FindMember(key); |
606 | RETURN_NOT_STRING(key, it, obj); |
607 | *out = it->value.GetString(); |
608 | return Status::OK(); |
609 | } |
610 | |
611 | static Status GetInteger(const rj::Value::ConstObject& json_type, |
612 | std::shared_ptr<DataType>* type) { |
613 | const auto& it_bit_width = json_type.FindMember("bitWidth" ); |
614 | RETURN_NOT_INT("bitWidth" , it_bit_width, json_type); |
615 | |
616 | const auto& it_is_signed = json_type.FindMember("isSigned" ); |
617 | RETURN_NOT_BOOL("isSigned" , it_is_signed, json_type); |
618 | |
619 | bool is_signed = it_is_signed->value.GetBool(); |
620 | int bit_width = it_bit_width->value.GetInt(); |
621 | |
622 | switch (bit_width) { |
623 | case 8: |
624 | *type = is_signed ? int8() : uint8(); |
625 | break; |
626 | case 16: |
627 | *type = is_signed ? int16() : uint16(); |
628 | break; |
629 | case 32: |
630 | *type = is_signed ? int32() : uint32(); |
631 | break; |
632 | case 64: |
633 | *type = is_signed ? int64() : uint64(); |
634 | break; |
635 | default: |
636 | return Status::Invalid("Invalid bit width: " , bit_width); |
637 | } |
638 | return Status::OK(); |
639 | } |
640 | |
641 | static Status GetFloatingPoint(const RjObject& json_type, |
642 | std::shared_ptr<DataType>* type) { |
643 | const auto& it_precision = json_type.FindMember("precision" ); |
644 | RETURN_NOT_STRING("precision" , it_precision, json_type); |
645 | |
646 | std::string precision = it_precision->value.GetString(); |
647 | |
648 | if (precision == "DOUBLE" ) { |
649 | *type = float64(); |
650 | } else if (precision == "SINGLE" ) { |
651 | *type = float32(); |
652 | } else if (precision == "HALF" ) { |
653 | *type = float16(); |
654 | } else { |
655 | return Status::Invalid("Invalid precision: " , precision); |
656 | } |
657 | return Status::OK(); |
658 | } |
659 | |
660 | static Status GetFixedSizeBinary(const RjObject& json_type, |
661 | std::shared_ptr<DataType>* type) { |
662 | const auto& it_byte_width = json_type.FindMember("byteWidth" ); |
663 | RETURN_NOT_INT("byteWidth" , it_byte_width, json_type); |
664 | |
665 | int32_t byte_width = it_byte_width->value.GetInt(); |
666 | *type = fixed_size_binary(byte_width); |
667 | return Status::OK(); |
668 | } |
669 | |
670 | static Status GetDecimal(const RjObject& json_type, std::shared_ptr<DataType>* type) { |
671 | const auto& it_precision = json_type.FindMember("precision" ); |
672 | const auto& it_scale = json_type.FindMember("scale" ); |
673 | |
674 | RETURN_NOT_INT("precision" , it_precision, json_type); |
675 | RETURN_NOT_INT("scale" , it_scale, json_type); |
676 | |
677 | *type = decimal(it_precision->value.GetInt(), it_scale->value.GetInt()); |
678 | return Status::OK(); |
679 | } |
680 | |
681 | static Status GetDate(const RjObject& json_type, std::shared_ptr<DataType>* type) { |
682 | const auto& it_unit = json_type.FindMember("unit" ); |
683 | RETURN_NOT_STRING("unit" , it_unit, json_type); |
684 | |
685 | std::string unit_str = it_unit->value.GetString(); |
686 | |
687 | if (unit_str == "DAY" ) { |
688 | *type = date32(); |
689 | } else if (unit_str == "MILLISECOND" ) { |
690 | *type = date64(); |
691 | } else { |
692 | return Status::Invalid("Invalid date unit: " , unit_str); |
693 | } |
694 | return Status::OK(); |
695 | } |
696 | |
697 | static Status GetTime(const RjObject& json_type, std::shared_ptr<DataType>* type) { |
698 | const auto& it_unit = json_type.FindMember("unit" ); |
699 | RETURN_NOT_STRING("unit" , it_unit, json_type); |
700 | |
701 | const auto& it_bit_width = json_type.FindMember("bitWidth" ); |
702 | RETURN_NOT_INT("bitWidth" , it_bit_width, json_type); |
703 | |
704 | std::string unit_str = it_unit->value.GetString(); |
705 | |
706 | if (unit_str == "SECOND" ) { |
707 | *type = time32(TimeUnit::SECOND); |
708 | } else if (unit_str == "MILLISECOND" ) { |
709 | *type = time32(TimeUnit::MILLI); |
710 | } else if (unit_str == "MICROSECOND" ) { |
711 | *type = time64(TimeUnit::MICRO); |
712 | } else if (unit_str == "NANOSECOND" ) { |
713 | *type = time64(TimeUnit::NANO); |
714 | } else { |
715 | return Status::Invalid("Invalid time unit: " , unit_str); |
716 | } |
717 | |
718 | const auto& fw_type = checked_cast<const FixedWidthType&>(**type); |
719 | |
720 | int bit_width = it_bit_width->value.GetInt(); |
721 | if (bit_width != fw_type.bit_width()) { |
722 | return Status::Invalid("Indicated bit width does not match unit" ); |
723 | } |
724 | |
725 | return Status::OK(); |
726 | } |
727 | |
728 | static Status GetTimestamp(const RjObject& json_type, std::shared_ptr<DataType>* type) { |
729 | const auto& it_unit = json_type.FindMember("unit" ); |
730 | RETURN_NOT_STRING("unit" , it_unit, json_type); |
731 | |
732 | std::string unit_str = it_unit->value.GetString(); |
733 | |
734 | TimeUnit::type unit; |
735 | if (unit_str == "SECOND" ) { |
736 | unit = TimeUnit::SECOND; |
737 | } else if (unit_str == "MILLISECOND" ) { |
738 | unit = TimeUnit::MILLI; |
739 | } else if (unit_str == "MICROSECOND" ) { |
740 | unit = TimeUnit::MICRO; |
741 | } else if (unit_str == "NANOSECOND" ) { |
742 | unit = TimeUnit::NANO; |
743 | } else { |
744 | return Status::Invalid("Invalid time unit: " , unit_str); |
745 | } |
746 | |
747 | const auto& it_tz = json_type.FindMember("timezone" ); |
748 | if (it_tz == json_type.MemberEnd()) { |
749 | *type = timestamp(unit); |
750 | } else { |
751 | *type = timestamp(unit, it_tz->value.GetString()); |
752 | } |
753 | |
754 | return Status::OK(); |
755 | } |
756 | |
757 | static Status GetUnion(const RjObject& json_type, |
758 | const std::vector<std::shared_ptr<Field>>& children, |
759 | std::shared_ptr<DataType>* type) { |
760 | const auto& it_mode = json_type.FindMember("mode" ); |
761 | RETURN_NOT_STRING("mode" , it_mode, json_type); |
762 | |
763 | std::string mode_str = it_mode->value.GetString(); |
764 | UnionMode::type mode; |
765 | |
766 | if (mode_str == "SPARSE" ) { |
767 | mode = UnionMode::SPARSE; |
768 | } else if (mode_str == "DENSE" ) { |
769 | mode = UnionMode::DENSE; |
770 | } else { |
771 | return Status::Invalid("Invalid union mode: " , mode_str); |
772 | } |
773 | |
774 | const auto& it_type_codes = json_type.FindMember("typeIds" ); |
775 | RETURN_NOT_ARRAY("typeIds" , it_type_codes, json_type); |
776 | |
777 | std::vector<uint8_t> type_codes; |
778 | const auto& id_array = it_type_codes->value.GetArray(); |
779 | for (const rj::Value& val : id_array) { |
780 | DCHECK(val.IsUint()); |
781 | type_codes.push_back(static_cast<uint8_t>(val.GetUint())); |
782 | } |
783 | |
784 | *type = union_(children, type_codes, mode); |
785 | |
786 | return Status::OK(); |
787 | } |
788 | |
789 | static Status GetType(const RjObject& json_type, |
790 | const std::vector<std::shared_ptr<Field>>& children, |
791 | std::shared_ptr<DataType>* type) { |
792 | const auto& it_type_name = json_type.FindMember("name" ); |
793 | RETURN_NOT_STRING("name" , it_type_name, json_type); |
794 | |
795 | std::string type_name = it_type_name->value.GetString(); |
796 | |
797 | if (type_name == "int" ) { |
798 | return GetInteger(json_type, type); |
799 | } else if (type_name == "floatingpoint" ) { |
800 | return GetFloatingPoint(json_type, type); |
801 | } else if (type_name == "bool" ) { |
802 | *type = boolean(); |
803 | } else if (type_name == "utf8" ) { |
804 | *type = utf8(); |
805 | } else if (type_name == "binary" ) { |
806 | *type = binary(); |
807 | } else if (type_name == "fixedsizebinary" ) { |
808 | return GetFixedSizeBinary(json_type, type); |
809 | } else if (type_name == "decimal" ) { |
810 | return GetDecimal(json_type, type); |
811 | } else if (type_name == "null" ) { |
812 | *type = null(); |
813 | } else if (type_name == "date" ) { |
814 | return GetDate(json_type, type); |
815 | } else if (type_name == "time" ) { |
816 | return GetTime(json_type, type); |
817 | } else if (type_name == "timestamp" ) { |
818 | return GetTimestamp(json_type, type); |
819 | } else if (type_name == "list" ) { |
820 | if (children.size() != 1) { |
821 | return Status::Invalid("List must have exactly one child" ); |
822 | } |
823 | *type = list(children[0]); |
824 | } else if (type_name == "struct" ) { |
825 | *type = struct_(children); |
826 | } else if (type_name == "union" ) { |
827 | return GetUnion(json_type, children, type); |
828 | } else { |
829 | return Status::Invalid("Unrecognized type name: " , type_name); |
830 | } |
831 | return Status::OK(); |
832 | } |
833 | |
834 | static Status GetField(const rj::Value& obj, const DictionaryMemo* dictionary_memo, |
835 | std::shared_ptr<Field>* field); |
836 | |
837 | static Status GetFieldsFromArray(const rj::Value& obj, |
838 | const DictionaryMemo* dictionary_memo, |
839 | std::vector<std::shared_ptr<Field>>* fields) { |
840 | const auto& values = obj.GetArray(); |
841 | |
842 | fields->resize(values.Size()); |
843 | for (rj::SizeType i = 0; i < fields->size(); ++i) { |
844 | RETURN_NOT_OK(GetField(values[i], dictionary_memo, &(*fields)[i])); |
845 | } |
846 | return Status::OK(); |
847 | } |
848 | |
849 | static Status ParseDictionary(const RjObject& obj, int64_t* id, bool* is_ordered, |
850 | std::shared_ptr<DataType>* index_type) { |
851 | int32_t int32_id; |
852 | RETURN_NOT_OK(GetObjectInt(obj, "id" , &int32_id)); |
853 | *id = int32_id; |
854 | |
855 | RETURN_NOT_OK(GetObjectBool(obj, "isOrdered" , is_ordered)); |
856 | |
857 | const auto& it_index_type = obj.FindMember("indexType" ); |
858 | RETURN_NOT_OBJECT("indexType" , it_index_type, obj); |
859 | |
860 | const auto& json_index_type = it_index_type->value.GetObject(); |
861 | |
862 | std::string type_name; |
863 | RETURN_NOT_OK(GetObjectString(json_index_type, "name" , &type_name)); |
864 | if (type_name != "int" ) { |
865 | return Status::Invalid("Dictionary indices can only be integers" ); |
866 | } |
867 | return GetInteger(json_index_type, index_type); |
868 | } |
869 | |
870 | static Status GetField(const rj::Value& obj, const DictionaryMemo* dictionary_memo, |
871 | std::shared_ptr<Field>* field) { |
872 | if (!obj.IsObject()) { |
873 | return Status::Invalid("Field was not a JSON object" ); |
874 | } |
875 | const auto& json_field = obj.GetObject(); |
876 | |
877 | std::string name; |
878 | bool nullable; |
879 | RETURN_NOT_OK(GetObjectString(json_field, "name" , &name)); |
880 | RETURN_NOT_OK(GetObjectBool(json_field, "nullable" , &nullable)); |
881 | |
882 | std::shared_ptr<DataType> type; |
883 | |
884 | const auto& it_dictionary = json_field.FindMember("dictionary" ); |
885 | if (dictionary_memo != nullptr && it_dictionary != json_field.MemberEnd()) { |
886 | // Field is dictionary encoded. We must have already |
887 | RETURN_NOT_OBJECT("dictionary" , it_dictionary, json_field); |
888 | int64_t dictionary_id = -1; |
889 | bool is_ordered; |
890 | std::shared_ptr<DataType> index_type; |
891 | RETURN_NOT_OK(ParseDictionary(it_dictionary->value.GetObject(), &dictionary_id, |
892 | &is_ordered, &index_type)); |
893 | |
894 | std::shared_ptr<Array> dictionary; |
895 | RETURN_NOT_OK(dictionary_memo->GetDictionary(dictionary_id, &dictionary)); |
896 | |
897 | type = std::make_shared<DictionaryType>(index_type, dictionary, is_ordered); |
898 | } else { |
899 | // If the dictionary_memo was not passed, or if the field is not dictionary |
900 | // encoded, we are interested in the complete type including all children |
901 | |
902 | const auto& it_type = json_field.FindMember("type" ); |
903 | RETURN_NOT_OBJECT("type" , it_type, json_field); |
904 | |
905 | const auto& it_children = json_field.FindMember("children" ); |
906 | RETURN_NOT_ARRAY("children" , it_children, json_field); |
907 | |
908 | std::vector<std::shared_ptr<Field>> children; |
909 | RETURN_NOT_OK(GetFieldsFromArray(it_children->value, dictionary_memo, &children)); |
910 | RETURN_NOT_OK(GetType(it_type->value.GetObject(), children, &type)); |
911 | } |
912 | |
913 | *field = std::make_shared<Field>(name, type, nullable); |
914 | return Status::OK(); |
915 | } |
916 | |
917 | template <typename T> |
918 | inline typename std::enable_if<IsSignedInt<T>::value, typename T::c_type>::type |
919 | UnboxValue(const rj::Value& val) { |
920 | DCHECK(val.IsInt64()); |
921 | return static_cast<typename T::c_type>(val.GetInt64()); |
922 | } |
923 | |
924 | template <typename T> |
925 | inline typename std::enable_if<IsUnsignedInt<T>::value, typename T::c_type>::type |
926 | UnboxValue(const rj::Value& val) { |
927 | DCHECK(val.IsUint()); |
928 | return static_cast<typename T::c_type>(val.GetUint64()); |
929 | } |
930 | |
931 | template <typename T> |
932 | inline typename std::enable_if<IsFloatingPoint<T>::value, typename T::c_type>::type |
933 | UnboxValue(const rj::Value& val) { |
934 | DCHECK(val.IsFloat()); |
935 | return static_cast<typename T::c_type>(val.GetDouble()); |
936 | } |
937 | |
938 | template <typename T> |
939 | inline typename std::enable_if<std::is_base_of<BooleanType, T>::value, bool>::type |
940 | UnboxValue(const rj::Value& val) { |
941 | DCHECK(val.IsBool()); |
942 | return val.GetBool(); |
943 | } |
944 | |
945 | class ArrayReader { |
946 | public: |
947 | explicit ArrayReader(const rj::Value& json_array, const std::shared_ptr<DataType>& type, |
948 | MemoryPool* pool) |
949 | : json_array_(json_array), type_(type), pool_(pool) {} |
950 | |
951 | Status ParseTypeValues(const DataType& type); |
952 | |
953 | Status GetValidityBuffer(const std::vector<bool>& is_valid, int32_t* null_count, |
954 | std::shared_ptr<Buffer>* validity_buffer) { |
955 | int length = static_cast<int>(is_valid.size()); |
956 | |
957 | std::shared_ptr<Buffer> out_buffer; |
958 | RETURN_NOT_OK(AllocateEmptyBitmap(pool_, length, &out_buffer)); |
959 | uint8_t* bitmap = out_buffer->mutable_data(); |
960 | |
961 | *null_count = 0; |
962 | for (int i = 0; i < length; ++i) { |
963 | if (!is_valid[i]) { |
964 | ++(*null_count); |
965 | continue; |
966 | } |
967 | BitUtil::SetBit(bitmap, i); |
968 | } |
969 | |
970 | *validity_buffer = out_buffer; |
971 | return Status::OK(); |
972 | } |
973 | |
974 | template <typename T> |
975 | typename std::enable_if< |
976 | std::is_base_of<PrimitiveCType, T>::value || std::is_base_of<DateType, T>::value || |
977 | std::is_base_of<TimestampType, T>::value || |
978 | std::is_base_of<TimeType, T>::value || std::is_base_of<BooleanType, T>::value, |
979 | Status>::type |
980 | Visit(const T& type) { |
981 | typename TypeTraits<T>::BuilderType builder(type_, pool_); |
982 | |
983 | const auto& json_data = obj_->FindMember("DATA" ); |
984 | RETURN_NOT_ARRAY("DATA" , json_data, *obj_); |
985 | |
986 | const auto& json_data_arr = json_data->value.GetArray(); |
987 | |
988 | DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_); |
989 | for (int i = 0; i < length_; ++i) { |
990 | if (!is_valid_[i]) { |
991 | RETURN_NOT_OK(builder.AppendNull()); |
992 | continue; |
993 | } |
994 | |
995 | const rj::Value& val = json_data_arr[i]; |
996 | RETURN_NOT_OK(builder.Append(UnboxValue<T>(val))); |
997 | } |
998 | |
999 | return builder.Finish(&result_); |
1000 | } |
1001 | |
1002 | template <typename T> |
1003 | typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type Visit( |
1004 | const T& type) { |
1005 | typename TypeTraits<T>::BuilderType builder(pool_); |
1006 | |
1007 | const auto& json_data = obj_->FindMember("DATA" ); |
1008 | RETURN_NOT_ARRAY("DATA" , json_data, *obj_); |
1009 | |
1010 | const auto& json_data_arr = json_data->value.GetArray(); |
1011 | |
1012 | DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_); |
1013 | |
1014 | for (int i = 0; i < length_; ++i) { |
1015 | if (!is_valid_[i]) { |
1016 | RETURN_NOT_OK(builder.AppendNull()); |
1017 | continue; |
1018 | } |
1019 | |
1020 | const rj::Value& val = json_data_arr[i]; |
1021 | DCHECK(val.IsString()); |
1022 | if (std::is_base_of<StringType, T>::value) { |
1023 | RETURN_NOT_OK(builder.Append(val.GetString())); |
1024 | } else { |
1025 | std::string hex_string = val.GetString(); |
1026 | |
1027 | DCHECK(hex_string.size() % 2 == 0) << "Expected base16 hex string" ; |
1028 | int32_t length = static_cast<int>(hex_string.size()) / 2; |
1029 | |
1030 | std::shared_ptr<Buffer> byte_buffer; |
1031 | RETURN_NOT_OK(AllocateBuffer(pool_, length, &byte_buffer)); |
1032 | |
1033 | const char* hex_data = hex_string.c_str(); |
1034 | uint8_t* byte_buffer_data = byte_buffer->mutable_data(); |
1035 | for (int32_t j = 0; j < length; ++j) { |
1036 | RETURN_NOT_OK(ParseHexValue(hex_data + j * 2, &byte_buffer_data[j])); |
1037 | } |
1038 | RETURN_NOT_OK(builder.Append(byte_buffer_data, length)); |
1039 | } |
1040 | } |
1041 | |
1042 | return builder.Finish(&result_); |
1043 | } |
1044 | |
1045 | template <typename T> |
1046 | typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value && |
1047 | !std::is_base_of<Decimal128Type, T>::value, |
1048 | Status>::type |
1049 | Visit(const T& type) { |
1050 | typename TypeTraits<T>::BuilderType builder(type_, pool_); |
1051 | |
1052 | const auto& json_data = obj_->FindMember("DATA" ); |
1053 | RETURN_NOT_ARRAY("DATA" , json_data, *obj_); |
1054 | |
1055 | const auto& json_data_arr = json_data->value.GetArray(); |
1056 | |
1057 | DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_); |
1058 | int32_t byte_width = type.byte_width(); |
1059 | |
1060 | // Allocate space for parsed values |
1061 | std::shared_ptr<Buffer> byte_buffer; |
1062 | RETURN_NOT_OK(AllocateBuffer(pool_, byte_width, &byte_buffer)); |
1063 | uint8_t* byte_buffer_data = byte_buffer->mutable_data(); |
1064 | |
1065 | for (int i = 0; i < length_; ++i) { |
1066 | if (!is_valid_[i]) { |
1067 | RETURN_NOT_OK(builder.AppendNull()); |
1068 | } else { |
1069 | const rj::Value& val = json_data_arr[i]; |
1070 | DCHECK(val.IsString()) |
1071 | << "Found non-string JSON value when parsing FixedSizeBinary value" ; |
1072 | std::string hex_string = val.GetString(); |
1073 | if (static_cast<int32_t>(hex_string.size()) != byte_width * 2) { |
1074 | DCHECK(false) << "Expected size: " << byte_width * 2 |
1075 | << " got: " << hex_string.size(); |
1076 | } |
1077 | const char* hex_data = hex_string.c_str(); |
1078 | |
1079 | for (int32_t j = 0; j < byte_width; ++j) { |
1080 | RETURN_NOT_OK(ParseHexValue(hex_data + j * 2, &byte_buffer_data[j])); |
1081 | } |
1082 | RETURN_NOT_OK(builder.Append(byte_buffer_data)); |
1083 | } |
1084 | } |
1085 | return builder.Finish(&result_); |
1086 | } |
1087 | |
1088 | template <typename T> |
1089 | typename std::enable_if<std::is_base_of<Decimal128Type, T>::value, Status>::type Visit( |
1090 | const T& type) { |
1091 | typename TypeTraits<T>::BuilderType builder(type_, pool_); |
1092 | |
1093 | const auto& json_data = obj_->FindMember("DATA" ); |
1094 | RETURN_NOT_ARRAY("DATA" , json_data, *obj_); |
1095 | |
1096 | const auto& json_data_arr = json_data->value.GetArray(); |
1097 | |
1098 | DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_); |
1099 | |
1100 | for (int i = 0; i < length_; ++i) { |
1101 | if (!is_valid_[i]) { |
1102 | RETURN_NOT_OK(builder.AppendNull()); |
1103 | } else { |
1104 | const rj::Value& val = json_data_arr[i]; |
1105 | DCHECK(val.IsString()) |
1106 | << "Found non-string JSON value when parsing Decimal128 value" ; |
1107 | DCHECK_GT(val.GetStringLength(), 0) |
1108 | << "Empty string found when parsing Decimal128 value" ; |
1109 | |
1110 | Decimal128 value; |
1111 | RETURN_NOT_OK(Decimal128::FromString(val.GetString(), &value)); |
1112 | RETURN_NOT_OK(builder.Append(value)); |
1113 | } |
1114 | } |
1115 | return builder.Finish(&result_); |
1116 | } |
1117 | |
1118 | template <typename T> |
1119 | Status GetIntArray(const RjArray& json_array, const int32_t length, |
1120 | std::shared_ptr<Buffer>* out) { |
1121 | std::shared_ptr<Buffer> buffer; |
1122 | RETURN_NOT_OK(AllocateBuffer(pool_, length * sizeof(T), &buffer)); |
1123 | |
1124 | T* values = reinterpret_cast<T*>(buffer->mutable_data()); |
1125 | for (int i = 0; i < length; ++i) { |
1126 | const rj::Value& val = json_array[i]; |
1127 | DCHECK(val.IsInt()); |
1128 | values[i] = static_cast<T>(val.GetInt()); |
1129 | } |
1130 | |
1131 | *out = buffer; |
1132 | return Status::OK(); |
1133 | } |
1134 | |
1135 | Status Visit(const ListType& type) { |
1136 | int32_t null_count = 0; |
1137 | std::shared_ptr<Buffer> validity_buffer; |
1138 | RETURN_NOT_OK(GetValidityBuffer(is_valid_, &null_count, &validity_buffer)); |
1139 | |
1140 | const auto& json_offsets = obj_->FindMember("OFFSET" ); |
1141 | RETURN_NOT_ARRAY("OFFSET" , json_offsets, *obj_); |
1142 | std::shared_ptr<Buffer> offsets_buffer; |
1143 | RETURN_NOT_OK(GetIntArray<int32_t>(json_offsets->value.GetArray(), length_ + 1, |
1144 | &offsets_buffer)); |
1145 | |
1146 | std::vector<std::shared_ptr<Array>> children; |
1147 | RETURN_NOT_OK(GetChildren(*obj_, type, &children)); |
1148 | DCHECK_EQ(children.size(), 1); |
1149 | |
1150 | result_ = std::make_shared<ListArray>(type_, length_, offsets_buffer, children[0], |
1151 | validity_buffer, null_count); |
1152 | |
1153 | return Status::OK(); |
1154 | } |
1155 | |
1156 | Status Visit(const StructType& type) { |
1157 | int32_t null_count = 0; |
1158 | std::shared_ptr<Buffer> validity_buffer; |
1159 | RETURN_NOT_OK(GetValidityBuffer(is_valid_, &null_count, &validity_buffer)); |
1160 | |
1161 | std::vector<std::shared_ptr<Array>> fields; |
1162 | RETURN_NOT_OK(GetChildren(*obj_, type, &fields)); |
1163 | |
1164 | result_ = std::make_shared<StructArray>(type_, length_, fields, validity_buffer, |
1165 | null_count); |
1166 | |
1167 | return Status::OK(); |
1168 | } |
1169 | |
1170 | Status Visit(const UnionType& type) { |
1171 | int32_t null_count = 0; |
1172 | |
1173 | std::shared_ptr<Buffer> validity_buffer; |
1174 | std::shared_ptr<Buffer> type_id_buffer; |
1175 | std::shared_ptr<Buffer> offsets_buffer; |
1176 | |
1177 | RETURN_NOT_OK(GetValidityBuffer(is_valid_, &null_count, &validity_buffer)); |
1178 | |
1179 | const auto& json_type_ids = obj_->FindMember("TYPE_ID" ); |
1180 | RETURN_NOT_ARRAY("TYPE_ID" , json_type_ids, *obj_); |
1181 | RETURN_NOT_OK( |
1182 | GetIntArray<uint8_t>(json_type_ids->value.GetArray(), length_, &type_id_buffer)); |
1183 | |
1184 | if (type.mode() == UnionMode::DENSE) { |
1185 | const auto& json_offsets = obj_->FindMember("OFFSET" ); |
1186 | RETURN_NOT_ARRAY("OFFSET" , json_offsets, *obj_); |
1187 | RETURN_NOT_OK( |
1188 | GetIntArray<int32_t>(json_offsets->value.GetArray(), length_, &offsets_buffer)); |
1189 | } |
1190 | |
1191 | std::vector<std::shared_ptr<Array>> children; |
1192 | RETURN_NOT_OK(GetChildren(*obj_, type, &children)); |
1193 | |
1194 | result_ = std::make_shared<UnionArray>(type_, length_, children, type_id_buffer, |
1195 | offsets_buffer, validity_buffer, null_count); |
1196 | |
1197 | return Status::OK(); |
1198 | } |
1199 | |
1200 | Status Visit(const NullType& type) { |
1201 | result_ = std::make_shared<NullArray>(length_); |
1202 | return Status::OK(); |
1203 | } |
1204 | |
1205 | Status Visit(const DictionaryType& type) { |
1206 | // This stores the indices in result_ |
1207 | // |
1208 | // XXX(wesm): slight hack |
1209 | auto dict_type = type_; |
1210 | type_ = type.index_type(); |
1211 | RETURN_NOT_OK(ParseTypeValues(*type_)); |
1212 | type_ = dict_type; |
1213 | result_ = std::make_shared<DictionaryArray>(type_, result_); |
1214 | return Status::OK(); |
1215 | } |
1216 | |
1217 | Status GetChildren(const RjObject& obj, const DataType& type, |
1218 | std::vector<std::shared_ptr<Array>>* array) { |
1219 | const auto& json_children = obj.FindMember("children" ); |
1220 | RETURN_NOT_ARRAY("children" , json_children, obj); |
1221 | const auto& json_children_arr = json_children->value.GetArray(); |
1222 | |
1223 | if (type.num_children() != static_cast<int>(json_children_arr.Size())) { |
1224 | return Status::Invalid("Expected " , type.num_children(), " children, but got " , |
1225 | json_children_arr.Size()); |
1226 | } |
1227 | |
1228 | for (int i = 0; i < static_cast<int>(json_children_arr.Size()); ++i) { |
1229 | const rj::Value& json_child = json_children_arr[i]; |
1230 | DCHECK(json_child.IsObject()); |
1231 | |
1232 | std::shared_ptr<Field> child_field = type.child(i); |
1233 | |
1234 | auto it = json_child.FindMember("name" ); |
1235 | RETURN_NOT_STRING("name" , it, json_child); |
1236 | |
1237 | DCHECK_EQ(it->value.GetString(), child_field->name()); |
1238 | std::shared_ptr<Array> child; |
1239 | RETURN_NOT_OK(ReadArray(pool_, json_children_arr[i], child_field->type(), &child)); |
1240 | array->emplace_back(child); |
1241 | } |
1242 | |
1243 | return Status::OK(); |
1244 | } |
1245 | |
1246 | Status GetArray(std::shared_ptr<Array>* out) { |
1247 | if (!json_array_.IsObject()) { |
1248 | return Status::Invalid("Array element was not a JSON object" ); |
1249 | } |
1250 | |
1251 | auto obj = json_array_.GetObject(); |
1252 | obj_ = &obj; |
1253 | |
1254 | RETURN_NOT_OK(GetObjectInt(obj, "count" , &length_)); |
1255 | |
1256 | const auto& json_valid_iter = obj.FindMember("VALIDITY" ); |
1257 | RETURN_NOT_ARRAY("VALIDITY" , json_valid_iter, obj); |
1258 | |
1259 | const auto& json_validity = json_valid_iter->value.GetArray(); |
1260 | DCHECK_EQ(static_cast<int>(json_validity.Size()), length_); |
1261 | for (const rj::Value& val : json_validity) { |
1262 | DCHECK(val.IsInt()); |
1263 | is_valid_.push_back(val.GetInt() != 0); |
1264 | } |
1265 | |
1266 | RETURN_NOT_OK(ParseTypeValues(*type_)); |
1267 | *out = result_; |
1268 | return Status::OK(); |
1269 | } |
1270 | |
1271 | private: |
1272 | const rj::Value& json_array_; |
1273 | const RjObject* obj_; |
1274 | std::shared_ptr<DataType> type_; |
1275 | MemoryPool* pool_; |
1276 | |
1277 | // Parsed common attributes |
1278 | std::vector<bool> is_valid_; |
1279 | int32_t length_; |
1280 | std::shared_ptr<Array> result_; |
1281 | }; |
1282 | |
1283 | Status ArrayReader::ParseTypeValues(const DataType& type) { |
1284 | return VisitTypeInline(type, this); |
1285 | } |
1286 | |
1287 | Status WriteSchema(const Schema& schema, RjWriter* json_writer) { |
1288 | SchemaWriter converter(schema, json_writer); |
1289 | return converter.Write(); |
1290 | } |
1291 | |
1292 | static Status LookForDictionaries(const rj::Value& obj, DictionaryTypeMap* id_to_field) { |
1293 | const auto& json_field = obj.GetObject(); |
1294 | |
1295 | const auto& it_dictionary = json_field.FindMember("dictionary" ); |
1296 | if (it_dictionary == json_field.MemberEnd()) { |
1297 | // Not dictionary-encoded |
1298 | return Status::OK(); |
1299 | } |
1300 | |
1301 | // Dictionary encoded. Construct the field and set in the type map |
1302 | std::shared_ptr<Field> dictionary_field; |
1303 | RETURN_NOT_OK(GetField(obj, nullptr, &dictionary_field)); |
1304 | |
1305 | int id; |
1306 | RETURN_NOT_OK(GetObjectInt(it_dictionary->value.GetObject(), "id" , &id)); |
1307 | (*id_to_field)[id] = dictionary_field; |
1308 | return Status::OK(); |
1309 | } |
1310 | |
1311 | static Status GetDictionaryTypes(const RjArray& fields, DictionaryTypeMap* id_to_field) { |
1312 | for (rj::SizeType i = 0; i < fields.Size(); ++i) { |
1313 | RETURN_NOT_OK(LookForDictionaries(fields[i], id_to_field)); |
1314 | } |
1315 | return Status::OK(); |
1316 | } |
1317 | |
1318 | static Status ReadDictionary(const RjObject& obj, const DictionaryTypeMap& id_to_field, |
1319 | MemoryPool* pool, int64_t* dictionary_id, |
1320 | std::shared_ptr<Array>* out) { |
1321 | int id; |
1322 | RETURN_NOT_OK(GetObjectInt(obj, "id" , &id)); |
1323 | |
1324 | const auto& it_data = obj.FindMember("data" ); |
1325 | RETURN_NOT_OBJECT("data" , it_data, obj); |
1326 | |
1327 | auto it = id_to_field.find(id); |
1328 | if (it == id_to_field.end()) { |
1329 | return Status::Invalid("No dictionary with id " , id); |
1330 | } |
1331 | std::vector<std::shared_ptr<Field>> fields = {it->second}; |
1332 | |
1333 | // We need a schema for the record batch |
1334 | auto dummy_schema = std::make_shared<Schema>(fields); |
1335 | |
1336 | // The dictionary is embedded in a record batch with a single column |
1337 | std::shared_ptr<RecordBatch> batch; |
1338 | RETURN_NOT_OK(ReadRecordBatch(it_data->value, dummy_schema, pool, &batch)); |
1339 | |
1340 | if (batch->num_columns() != 1) { |
1341 | return Status::Invalid("Dictionary record batch must only contain one field" ); |
1342 | } |
1343 | |
1344 | *dictionary_id = id; |
1345 | *out = batch->column(0); |
1346 | return Status::OK(); |
1347 | } |
1348 | |
1349 | static Status ReadDictionaries(const rj::Value& doc, const DictionaryTypeMap& id_to_field, |
1350 | MemoryPool* pool, DictionaryMemo* dictionary_memo) { |
1351 | auto it = doc.FindMember("dictionaries" ); |
1352 | if (it == doc.MemberEnd()) { |
1353 | // No dictionaries |
1354 | return Status::OK(); |
1355 | } |
1356 | |
1357 | RETURN_NOT_ARRAY("dictionaries" , it, doc); |
1358 | const auto& dictionary_array = it->value.GetArray(); |
1359 | |
1360 | for (const rj::Value& val : dictionary_array) { |
1361 | DCHECK(val.IsObject()); |
1362 | int64_t dictionary_id = -1; |
1363 | std::shared_ptr<Array> dictionary; |
1364 | RETURN_NOT_OK( |
1365 | ReadDictionary(val.GetObject(), id_to_field, pool, &dictionary_id, &dictionary)); |
1366 | |
1367 | RETURN_NOT_OK(dictionary_memo->AddDictionary(dictionary_id, dictionary)); |
1368 | } |
1369 | return Status::OK(); |
1370 | } |
1371 | |
1372 | Status ReadSchema(const rj::Value& json_schema, MemoryPool* pool, |
1373 | std::shared_ptr<Schema>* schema) { |
1374 | auto it = json_schema.FindMember("schema" ); |
1375 | RETURN_NOT_OBJECT("schema" , it, json_schema); |
1376 | const auto& obj_schema = it->value.GetObject(); |
1377 | |
1378 | const auto& it_fields = obj_schema.FindMember("fields" ); |
1379 | RETURN_NOT_ARRAY("fields" , it_fields, obj_schema); |
1380 | |
1381 | // Determine the dictionary types |
1382 | DictionaryTypeMap dictionary_types; |
1383 | RETURN_NOT_OK(GetDictionaryTypes(it_fields->value.GetArray(), &dictionary_types)); |
1384 | |
1385 | // Read the dictionaries (if any) and cache in the memo |
1386 | DictionaryMemo dictionary_memo; |
1387 | RETURN_NOT_OK(ReadDictionaries(json_schema, dictionary_types, pool, &dictionary_memo)); |
1388 | |
1389 | std::vector<std::shared_ptr<Field>> fields; |
1390 | RETURN_NOT_OK(GetFieldsFromArray(it_fields->value, &dictionary_memo, &fields)); |
1391 | |
1392 | *schema = std::make_shared<Schema>(fields); |
1393 | return Status::OK(); |
1394 | } |
1395 | |
1396 | Status ReadRecordBatch(const rj::Value& json_obj, const std::shared_ptr<Schema>& schema, |
1397 | MemoryPool* pool, std::shared_ptr<RecordBatch>* batch) { |
1398 | DCHECK(json_obj.IsObject()); |
1399 | const auto& batch_obj = json_obj.GetObject(); |
1400 | |
1401 | auto it = batch_obj.FindMember("count" ); |
1402 | RETURN_NOT_INT("count" , it, batch_obj); |
1403 | int32_t num_rows = static_cast<int32_t>(it->value.GetInt()); |
1404 | |
1405 | it = batch_obj.FindMember("columns" ); |
1406 | RETURN_NOT_ARRAY("columns" , it, batch_obj); |
1407 | const auto& json_columns = it->value.GetArray(); |
1408 | |
1409 | std::vector<std::shared_ptr<Array>> columns(json_columns.Size()); |
1410 | for (int i = 0; i < static_cast<int>(columns.size()); ++i) { |
1411 | const std::shared_ptr<DataType>& type = schema->field(i)->type(); |
1412 | RETURN_NOT_OK(ReadArray(pool, json_columns[i], type, &columns[i])); |
1413 | } |
1414 | |
1415 | *batch = RecordBatch::Make(schema, num_rows, columns); |
1416 | return Status::OK(); |
1417 | } |
1418 | |
1419 | Status WriteRecordBatch(const RecordBatch& batch, RjWriter* writer) { |
1420 | writer->StartObject(); |
1421 | writer->Key("count" ); |
1422 | writer->Int(static_cast<int32_t>(batch.num_rows())); |
1423 | |
1424 | writer->Key("columns" ); |
1425 | writer->StartArray(); |
1426 | |
1427 | for (int i = 0; i < batch.num_columns(); ++i) { |
1428 | const std::shared_ptr<Array>& column = batch.column(i); |
1429 | |
1430 | DCHECK_EQ(batch.num_rows(), column->length()) |
1431 | << "Array length did not match record batch length" ; |
1432 | |
1433 | RETURN_NOT_OK(WriteArray(batch.column_name(i), *column, writer)); |
1434 | } |
1435 | |
1436 | writer->EndArray(); |
1437 | writer->EndObject(); |
1438 | return Status::OK(); |
1439 | } |
1440 | |
1441 | Status WriteArray(const std::string& name, const Array& array, RjWriter* json_writer) { |
1442 | ArrayWriter converter(name, array, json_writer); |
1443 | return converter.Write(); |
1444 | } |
1445 | |
1446 | Status ReadArray(MemoryPool* pool, const rj::Value& json_array, |
1447 | const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) { |
1448 | ArrayReader converter(json_array, type, pool); |
1449 | return converter.GetArray(array); |
1450 | } |
1451 | |
1452 | Status ReadArray(MemoryPool* pool, const rj::Value& json_array, const Schema& schema, |
1453 | std::shared_ptr<Array>* array) { |
1454 | if (!json_array.IsObject()) { |
1455 | return Status::Invalid("Element was not a JSON object" ); |
1456 | } |
1457 | |
1458 | const auto& json_obj = json_array.GetObject(); |
1459 | |
1460 | const auto& it_name = json_obj.FindMember("name" ); |
1461 | RETURN_NOT_STRING("name" , it_name, json_obj); |
1462 | |
1463 | std::string name = it_name->value.GetString(); |
1464 | |
1465 | std::shared_ptr<Field> result = nullptr; |
1466 | for (const std::shared_ptr<Field>& field : schema.fields()) { |
1467 | if (field->name() == name) { |
1468 | result = field; |
1469 | break; |
1470 | } |
1471 | } |
1472 | |
1473 | if (result == nullptr) { |
1474 | return Status::KeyError("Field named " , name, " not found in schema" ); |
1475 | } |
1476 | |
1477 | return ReadArray(pool, json_array, result->type(), array); |
1478 | } |
1479 | |
1480 | } // namespace json |
1481 | } // namespace internal |
1482 | } // namespace ipc |
1483 | } // namespace arrow |
1484 | |