1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/ipc/json-internal.h"
19
20#include <cstdint>
21#include <cstdlib>
22#include <memory>
23#include <sstream>
24#include <string>
25#include <type_traits>
26#include <unordered_map>
27#include <utility>
28#include <vector>
29
30#include "arrow/array.h"
31#include "arrow/buffer.h"
32#include "arrow/builder.h" // IWYU pragma: keep
33#include "arrow/ipc/dictionary.h"
34#include "arrow/record_batch.h"
35#include "arrow/status.h"
36#include "arrow/type.h"
37#include "arrow/type_traits.h"
38#include "arrow/util/bit-util.h"
39#include "arrow/util/checked_cast.h"
40#include "arrow/util/decimal.h"
41#include "arrow/util/logging.h"
42#include "arrow/util/string.h"
43#include "arrow/visitor_inline.h"
44
45namespace arrow {
46
47class MemoryPool;
48
49using internal::checked_cast;
50
51namespace ipc {
52namespace internal {
53namespace json {
54
55using ::arrow::ipc::DictionaryMemo;
56using ::arrow::ipc::DictionaryTypeMap;
57
58static std::string GetFloatingPrecisionName(FloatingPoint::Precision precision) {
59 switch (precision) {
60 case FloatingPoint::HALF:
61 return "HALF";
62 case FloatingPoint::SINGLE:
63 return "SINGLE";
64 case FloatingPoint::DOUBLE:
65 return "DOUBLE";
66 default:
67 break;
68 }
69 return "UNKNOWN";
70}
71
72static std::string GetTimeUnitName(TimeUnit::type unit) {
73 switch (unit) {
74 case TimeUnit::SECOND:
75 return "SECOND";
76 case TimeUnit::MILLI:
77 return "MILLISECOND";
78 case TimeUnit::MICRO:
79 return "MICROSECOND";
80 case TimeUnit::NANO:
81 return "NANOSECOND";
82 default:
83 break;
84 }
85 return "UNKNOWN";
86}
87
88class SchemaWriter {
89 public:
90 explicit SchemaWriter(const Schema& schema, RjWriter* writer)
91 : schema_(schema), writer_(writer) {}
92
93 Status Write() {
94 writer_->Key("schema");
95 writer_->StartObject();
96 writer_->Key("fields");
97 writer_->StartArray();
98 for (const std::shared_ptr<Field>& field : schema_.fields()) {
99 RETURN_NOT_OK(VisitField(*field));
100 }
101 writer_->EndArray();
102 writer_->EndObject();
103
104 // Write dictionaries, if any
105 if (dictionary_memo_.size() > 0) {
106 writer_->Key("dictionaries");
107 writer_->StartArray();
108 for (const auto& entry : dictionary_memo_.id_to_dictionary()) {
109 RETURN_NOT_OK(WriteDictionary(entry.first, entry.second));
110 }
111 writer_->EndArray();
112 }
113 return Status::OK();
114 }
115
116 Status WriteDictionary(int64_t id, const std::shared_ptr<Array>& dictionary) {
117 writer_->StartObject();
118 writer_->Key("id");
119 writer_->Int(static_cast<int32_t>(id));
120 writer_->Key("data");
121
122 // Make a dummy record batch. A bit tedious as we have to make a schema
123 auto schema = ::arrow::schema({arrow::field("dictionary", dictionary->type())});
124 auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary});
125 RETURN_NOT_OK(WriteRecordBatch(*batch, writer_));
126 writer_->EndObject();
127 return Status::OK();
128 }
129
130 Status WriteDictionaryMetadata(const DictionaryType& type) {
131 int64_t dictionary_id = dictionary_memo_.GetId(type.dictionary());
132 writer_->Key("dictionary");
133
134 // Emulate DictionaryEncoding from Schema.fbs
135 writer_->StartObject();
136 writer_->Key("id");
137 writer_->Int(static_cast<int32_t>(dictionary_id));
138 writer_->Key("indexType");
139
140 writer_->StartObject();
141 RETURN_NOT_OK(VisitType(*type.index_type()));
142 writer_->EndObject();
143
144 writer_->Key("isOrdered");
145 writer_->Bool(type.ordered());
146 writer_->EndObject();
147
148 return Status::OK();
149 }
150
151 Status VisitField(const Field& field) {
152 writer_->StartObject();
153
154 writer_->Key("name");
155 writer_->String(field.name().c_str());
156
157 writer_->Key("nullable");
158 writer_->Bool(field.nullable());
159
160 const DataType& type = *field.type();
161
162 // Visit the type
163 writer_->Key("type");
164 writer_->StartObject();
165 RETURN_NOT_OK(VisitType(type));
166 writer_->EndObject();
167
168 if (type.id() == Type::DICTIONARY) {
169 const auto& dict_type = checked_cast<const DictionaryType&>(type);
170 RETURN_NOT_OK(WriteDictionaryMetadata(dict_type));
171
172 const DataType& dictionary_type = *dict_type.dictionary()->type();
173 RETURN_NOT_OK(WriteChildren(dictionary_type.children()));
174 } else {
175 RETURN_NOT_OK(WriteChildren(type.children()));
176 }
177
178 writer_->EndObject();
179
180 return Status::OK();
181 }
182
183 Status VisitType(const DataType& type);
184
185 template <typename T>
186 typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value ||
187 std::is_base_of<ListType, T>::value ||
188 std::is_base_of<StructType, T>::value,
189 void>::type
190 WriteTypeMetadata(const T& type) {}
191
192 void WriteTypeMetadata(const Integer& type) {
193 writer_->Key("bitWidth");
194 writer_->Int(type.bit_width());
195 writer_->Key("isSigned");
196 writer_->Bool(type.is_signed());
197 }
198
199 void WriteTypeMetadata(const FloatingPoint& type) {
200 writer_->Key("precision");
201 writer_->String(GetFloatingPrecisionName(type.precision()));
202 }
203
204 void WriteTypeMetadata(const IntervalType& type) {
205 writer_->Key("unit");
206 switch (type.unit()) {
207 case IntervalType::Unit::YEAR_MONTH:
208 writer_->String("YEAR_MONTH");
209 break;
210 case IntervalType::Unit::DAY_TIME:
211 writer_->String("DAY_TIME");
212 break;
213 }
214 }
215
216 void WriteTypeMetadata(const TimestampType& type) {
217 writer_->Key("unit");
218 writer_->String(GetTimeUnitName(type.unit()));
219 if (type.timezone().size() > 0) {
220 writer_->Key("timezone");
221 writer_->String(type.timezone());
222 }
223 }
224
225 void WriteTypeMetadata(const TimeType& type) {
226 writer_->Key("unit");
227 writer_->String(GetTimeUnitName(type.unit()));
228 writer_->Key("bitWidth");
229 writer_->Int(type.bit_width());
230 }
231
232 void WriteTypeMetadata(const DateType& type) {
233 writer_->Key("unit");
234 switch (type.unit()) {
235 case DateUnit::DAY:
236 writer_->String("DAY");
237 break;
238 case DateUnit::MILLI:
239 writer_->String("MILLISECOND");
240 break;
241 }
242 }
243
244 void WriteTypeMetadata(const FixedSizeBinaryType& type) {
245 writer_->Key("byteWidth");
246 writer_->Int(type.byte_width());
247 }
248
249 void WriteTypeMetadata(const Decimal128Type& type) {
250 writer_->Key("precision");
251 writer_->Int(type.precision());
252 writer_->Key("scale");
253 writer_->Int(type.scale());
254 }
255
256 void WriteTypeMetadata(const UnionType& type) {
257 writer_->Key("mode");
258 switch (type.mode()) {
259 case UnionMode::SPARSE:
260 writer_->String("SPARSE");
261 break;
262 case UnionMode::DENSE:
263 writer_->String("DENSE");
264 break;
265 }
266
267 // Write type ids
268 writer_->Key("typeIds");
269 writer_->StartArray();
270 for (size_t i = 0; i < type.type_codes().size(); ++i) {
271 writer_->Uint(type.type_codes()[i]);
272 }
273 writer_->EndArray();
274 }
275
276 // TODO(wesm): Other Type metadata
277
278 template <typename T>
279 void WriteName(const std::string& typeclass, const T& type) {
280 writer_->Key("name");
281 writer_->String(typeclass);
282 WriteTypeMetadata(type);
283 }
284
285 template <typename T>
286 Status WritePrimitive(const std::string& typeclass, const T& type) {
287 WriteName(typeclass, type);
288 return Status::OK();
289 }
290
291 template <typename T>
292 Status WriteVarBytes(const std::string& typeclass, const T& type) {
293 WriteName(typeclass, type);
294 return Status::OK();
295 }
296
297 Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) {
298 writer_->Key("children");
299 writer_->StartArray();
300 for (const std::shared_ptr<Field>& field : children) {
301 RETURN_NOT_OK(VisitField(*field));
302 }
303 writer_->EndArray();
304 return Status::OK();
305 }
306
307 Status Visit(const NullType& type) { return WritePrimitive("null", type); }
308 Status Visit(const BooleanType& type) { return WritePrimitive("bool", type); }
309 Status Visit(const Integer& type) { return WritePrimitive("int", type); }
310
311 Status Visit(const FloatingPoint& type) {
312 return WritePrimitive("floatingpoint", type);
313 }
314
315 Status Visit(const DateType& type) { return WritePrimitive("date", type); }
316 Status Visit(const TimeType& type) { return WritePrimitive("time", type); }
317 Status Visit(const StringType& type) { return WriteVarBytes("utf8", type); }
318 Status Visit(const BinaryType& type) { return WriteVarBytes("binary", type); }
319 Status Visit(const FixedSizeBinaryType& type) {
320 return WritePrimitive("fixedsizebinary", type);
321 }
322
323 Status Visit(const Decimal128Type& type) { return WritePrimitive("decimal", type); }
324 Status Visit(const TimestampType& type) { return WritePrimitive("timestamp", type); }
325 Status Visit(const IntervalType& type) { return WritePrimitive("interval", type); }
326
327 Status Visit(const ListType& type) {
328 WriteName("list", type);
329 return Status::OK();
330 }
331
332 Status Visit(const StructType& type) {
333 WriteName("struct", type);
334 return Status::OK();
335 }
336
337 Status Visit(const UnionType& type) {
338 WriteName("union", type);
339 return Status::OK();
340 }
341
342 Status Visit(const DictionaryType& type) {
343 return VisitType(*type.dictionary()->type());
344 }
345
346 private:
347 DictionaryMemo dictionary_memo_;
348
349 const Schema& schema_;
350 RjWriter* writer_;
351};
352
353Status SchemaWriter::VisitType(const DataType& type) {
354 return VisitTypeInline(type, this);
355}
356
357class ArrayWriter {
358 public:
359 ArrayWriter(const std::string& name, const Array& array, RjWriter* writer)
360 : name_(name), array_(array), writer_(writer) {}
361
362 Status Write() { return VisitArray(name_, array_); }
363
364 Status VisitArrayValues(const Array& arr) { return VisitArrayInline(arr, this); }
365
366 Status VisitArray(const std::string& name, const Array& arr) {
367 writer_->StartObject();
368 writer_->Key("name");
369 writer_->String(name);
370
371 writer_->Key("count");
372 writer_->Int(static_cast<int32_t>(arr.length()));
373
374 RETURN_NOT_OK(VisitArrayValues(arr));
375
376 writer_->EndObject();
377 return Status::OK();
378 }
379
380 template <typename T>
381 typename std::enable_if<IsSignedInt<T>::value, void>::type WriteDataValues(
382 const T& arr) {
383 static const char null_string[] = "0";
384 const auto data = arr.raw_values();
385 for (int64_t i = 0; i < arr.length(); ++i) {
386 if (arr.IsValid(i)) {
387 writer_->Int64(data[i]);
388 } else {
389 writer_->RawNumber(null_string, sizeof(null_string));
390 }
391 }
392 }
393
394 template <typename T>
395 typename std::enable_if<IsUnsignedInt<T>::value, void>::type WriteDataValues(
396 const T& arr) {
397 static const char null_string[] = "0";
398 const auto data = arr.raw_values();
399 for (int64_t i = 0; i < arr.length(); ++i) {
400 if (arr.IsValid(i)) {
401 writer_->Uint64(data[i]);
402 } else {
403 writer_->RawNumber(null_string, sizeof(null_string));
404 }
405 }
406 }
407
408 template <typename T>
409 typename std::enable_if<IsFloatingPoint<T>::value, void>::type WriteDataValues(
410 const T& arr) {
411 static const char null_string[] = "0.";
412 const auto data = arr.raw_values();
413 for (int64_t i = 0; i < arr.length(); ++i) {
414 if (arr.IsValid(i)) {
415 writer_->Double(data[i]);
416 } else {
417 writer_->RawNumber(null_string, sizeof(null_string));
418 }
419 }
420 }
421
422 // Binary, encode to hexadecimal. UTF8 string write as is
423 template <typename T>
424 typename std::enable_if<std::is_base_of<BinaryArray, T>::value, void>::type
425 WriteDataValues(const T& arr) {
426 for (int64_t i = 0; i < arr.length(); ++i) {
427 int32_t length;
428 const uint8_t* buf = arr.GetValue(i, &length);
429
430 if (std::is_base_of<StringArray, T>::value) {
431 // Presumed UTF-8
432 writer_->String(reinterpret_cast<const char*>(buf), length);
433 } else {
434 writer_->String(HexEncode(buf, length));
435 }
436 }
437 }
438
439 void WriteDataValues(const FixedSizeBinaryArray& arr) {
440 const int32_t width = arr.byte_width();
441
442 for (int64_t i = 0; i < arr.length(); ++i) {
443 const uint8_t* buf = arr.GetValue(i);
444 std::string encoded = HexEncode(buf, width);
445 writer_->String(encoded);
446 }
447 }
448
449 void WriteDataValues(const Decimal128Array& arr) {
450 static const char null_string[] = "0";
451 for (int64_t i = 0; i < arr.length(); ++i) {
452 if (arr.IsValid(i)) {
453 const Decimal128 value(arr.GetValue(i));
454 writer_->String(value.ToIntegerString());
455 } else {
456 writer_->String(null_string, sizeof(null_string));
457 }
458 }
459 }
460
461 void WriteDataValues(const BooleanArray& arr) {
462 for (int64_t i = 0; i < arr.length(); ++i) {
463 if (arr.IsValid(i)) {
464 writer_->Bool(arr.Value(i));
465 } else {
466 writer_->Bool(false);
467 }
468 }
469 }
470
471 template <typename T>
472 void WriteDataField(const T& arr) {
473 writer_->Key("DATA");
474 writer_->StartArray();
475 WriteDataValues(arr);
476 writer_->EndArray();
477 }
478
479 template <typename T>
480 void WriteIntegerField(const char* name, const T* values, int64_t length) {
481 writer_->Key(name);
482 writer_->StartArray();
483 for (int i = 0; i < length; ++i) {
484 writer_->Int64(values[i]);
485 }
486 writer_->EndArray();
487 }
488
489 void WriteValidityField(const Array& arr) {
490 writer_->Key("VALIDITY");
491 writer_->StartArray();
492 if (arr.null_count() > 0) {
493 for (int i = 0; i < arr.length(); ++i) {
494 writer_->Int(arr.IsNull(i) ? 0 : 1);
495 }
496 } else {
497 for (int i = 0; i < arr.length(); ++i) {
498 writer_->Int(1);
499 }
500 }
501 writer_->EndArray();
502 }
503
504 void SetNoChildren() {
505 writer_->Key("children");
506 writer_->StartArray();
507 writer_->EndArray();
508 }
509
510 Status WriteChildren(const std::vector<std::shared_ptr<Field>>& fields,
511 const std::vector<std::shared_ptr<Array>>& arrays) {
512 writer_->Key("children");
513 writer_->StartArray();
514 for (size_t i = 0; i < fields.size(); ++i) {
515 RETURN_NOT_OK(VisitArray(fields[i]->name(), *arrays[i]));
516 }
517 writer_->EndArray();
518 return Status::OK();
519 }
520
521 Status Visit(const NullArray& array) {
522 SetNoChildren();
523 return Status::OK();
524 }
525
526 template <typename T>
527 typename std::enable_if<std::is_base_of<PrimitiveArray, T>::value, Status>::type Visit(
528 const T& array) {
529 WriteValidityField(array);
530 WriteDataField(array);
531 SetNoChildren();
532 return Status::OK();
533 }
534
535 template <typename T>
536 typename std::enable_if<std::is_base_of<BinaryArray, T>::value, Status>::type Visit(
537 const T& array) {
538 WriteValidityField(array);
539 WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
540 WriteDataField(array);
541 SetNoChildren();
542 return Status::OK();
543 }
544
545 Status Visit(const DictionaryArray& array) {
546 return VisitArrayValues(*array.indices());
547 }
548
549 Status Visit(const ListArray& array) {
550 WriteValidityField(array);
551 WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
552 const auto& type = checked_cast<const ListType&>(*array.type());
553 return WriteChildren(type.children(), {array.values()});
554 }
555
556 Status Visit(const StructArray& array) {
557 WriteValidityField(array);
558 const auto& type = checked_cast<const StructType&>(*array.type());
559 std::vector<std::shared_ptr<Array>> children;
560 children.reserve(array.num_fields());
561 for (int i = 0; i < array.num_fields(); ++i) {
562 children.emplace_back(array.field(i));
563 }
564 return WriteChildren(type.children(), children);
565 }
566
567 Status Visit(const UnionArray& array) {
568 WriteValidityField(array);
569 const auto& type = checked_cast<const UnionType&>(*array.type());
570
571 WriteIntegerField("TYPE_ID", array.raw_type_ids(), array.length());
572 if (type.mode() == UnionMode::DENSE) {
573 WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length());
574 }
575 std::vector<std::shared_ptr<Array>> children;
576 children.reserve(array.num_fields());
577 for (int i = 0; i < array.num_fields(); ++i) {
578 children.emplace_back(array.child(i));
579 }
580 return WriteChildren(type.children(), children);
581 }
582
583 private:
584 const std::string& name_;
585 const Array& array_;
586 RjWriter* writer_;
587};
588
589static Status GetObjectInt(const RjObject& obj, const std::string& key, int* out) {
590 const auto& it = obj.FindMember(key);
591 RETURN_NOT_INT(key, it, obj);
592 *out = it->value.GetInt();
593 return Status::OK();
594}
595
596static Status GetObjectBool(const RjObject& obj, const std::string& key, bool* out) {
597 const auto& it = obj.FindMember(key);
598 RETURN_NOT_BOOL(key, it, obj);
599 *out = it->value.GetBool();
600 return Status::OK();
601}
602
603static Status GetObjectString(const RjObject& obj, const std::string& key,
604 std::string* out) {
605 const auto& it = obj.FindMember(key);
606 RETURN_NOT_STRING(key, it, obj);
607 *out = it->value.GetString();
608 return Status::OK();
609}
610
611static Status GetInteger(const rj::Value::ConstObject& json_type,
612 std::shared_ptr<DataType>* type) {
613 const auto& it_bit_width = json_type.FindMember("bitWidth");
614 RETURN_NOT_INT("bitWidth", it_bit_width, json_type);
615
616 const auto& it_is_signed = json_type.FindMember("isSigned");
617 RETURN_NOT_BOOL("isSigned", it_is_signed, json_type);
618
619 bool is_signed = it_is_signed->value.GetBool();
620 int bit_width = it_bit_width->value.GetInt();
621
622 switch (bit_width) {
623 case 8:
624 *type = is_signed ? int8() : uint8();
625 break;
626 case 16:
627 *type = is_signed ? int16() : uint16();
628 break;
629 case 32:
630 *type = is_signed ? int32() : uint32();
631 break;
632 case 64:
633 *type = is_signed ? int64() : uint64();
634 break;
635 default:
636 return Status::Invalid("Invalid bit width: ", bit_width);
637 }
638 return Status::OK();
639}
640
641static Status GetFloatingPoint(const RjObject& json_type,
642 std::shared_ptr<DataType>* type) {
643 const auto& it_precision = json_type.FindMember("precision");
644 RETURN_NOT_STRING("precision", it_precision, json_type);
645
646 std::string precision = it_precision->value.GetString();
647
648 if (precision == "DOUBLE") {
649 *type = float64();
650 } else if (precision == "SINGLE") {
651 *type = float32();
652 } else if (precision == "HALF") {
653 *type = float16();
654 } else {
655 return Status::Invalid("Invalid precision: ", precision);
656 }
657 return Status::OK();
658}
659
660static Status GetFixedSizeBinary(const RjObject& json_type,
661 std::shared_ptr<DataType>* type) {
662 const auto& it_byte_width = json_type.FindMember("byteWidth");
663 RETURN_NOT_INT("byteWidth", it_byte_width, json_type);
664
665 int32_t byte_width = it_byte_width->value.GetInt();
666 *type = fixed_size_binary(byte_width);
667 return Status::OK();
668}
669
670static Status GetDecimal(const RjObject& json_type, std::shared_ptr<DataType>* type) {
671 const auto& it_precision = json_type.FindMember("precision");
672 const auto& it_scale = json_type.FindMember("scale");
673
674 RETURN_NOT_INT("precision", it_precision, json_type);
675 RETURN_NOT_INT("scale", it_scale, json_type);
676
677 *type = decimal(it_precision->value.GetInt(), it_scale->value.GetInt());
678 return Status::OK();
679}
680
681static Status GetDate(const RjObject& json_type, std::shared_ptr<DataType>* type) {
682 const auto& it_unit = json_type.FindMember("unit");
683 RETURN_NOT_STRING("unit", it_unit, json_type);
684
685 std::string unit_str = it_unit->value.GetString();
686
687 if (unit_str == "DAY") {
688 *type = date32();
689 } else if (unit_str == "MILLISECOND") {
690 *type = date64();
691 } else {
692 return Status::Invalid("Invalid date unit: ", unit_str);
693 }
694 return Status::OK();
695}
696
697static Status GetTime(const RjObject& json_type, std::shared_ptr<DataType>* type) {
698 const auto& it_unit = json_type.FindMember("unit");
699 RETURN_NOT_STRING("unit", it_unit, json_type);
700
701 const auto& it_bit_width = json_type.FindMember("bitWidth");
702 RETURN_NOT_INT("bitWidth", it_bit_width, json_type);
703
704 std::string unit_str = it_unit->value.GetString();
705
706 if (unit_str == "SECOND") {
707 *type = time32(TimeUnit::SECOND);
708 } else if (unit_str == "MILLISECOND") {
709 *type = time32(TimeUnit::MILLI);
710 } else if (unit_str == "MICROSECOND") {
711 *type = time64(TimeUnit::MICRO);
712 } else if (unit_str == "NANOSECOND") {
713 *type = time64(TimeUnit::NANO);
714 } else {
715 return Status::Invalid("Invalid time unit: ", unit_str);
716 }
717
718 const auto& fw_type = checked_cast<const FixedWidthType&>(**type);
719
720 int bit_width = it_bit_width->value.GetInt();
721 if (bit_width != fw_type.bit_width()) {
722 return Status::Invalid("Indicated bit width does not match unit");
723 }
724
725 return Status::OK();
726}
727
728static Status GetTimestamp(const RjObject& json_type, std::shared_ptr<DataType>* type) {
729 const auto& it_unit = json_type.FindMember("unit");
730 RETURN_NOT_STRING("unit", it_unit, json_type);
731
732 std::string unit_str = it_unit->value.GetString();
733
734 TimeUnit::type unit;
735 if (unit_str == "SECOND") {
736 unit = TimeUnit::SECOND;
737 } else if (unit_str == "MILLISECOND") {
738 unit = TimeUnit::MILLI;
739 } else if (unit_str == "MICROSECOND") {
740 unit = TimeUnit::MICRO;
741 } else if (unit_str == "NANOSECOND") {
742 unit = TimeUnit::NANO;
743 } else {
744 return Status::Invalid("Invalid time unit: ", unit_str);
745 }
746
747 const auto& it_tz = json_type.FindMember("timezone");
748 if (it_tz == json_type.MemberEnd()) {
749 *type = timestamp(unit);
750 } else {
751 *type = timestamp(unit, it_tz->value.GetString());
752 }
753
754 return Status::OK();
755}
756
757static Status GetUnion(const RjObject& json_type,
758 const std::vector<std::shared_ptr<Field>>& children,
759 std::shared_ptr<DataType>* type) {
760 const auto& it_mode = json_type.FindMember("mode");
761 RETURN_NOT_STRING("mode", it_mode, json_type);
762
763 std::string mode_str = it_mode->value.GetString();
764 UnionMode::type mode;
765
766 if (mode_str == "SPARSE") {
767 mode = UnionMode::SPARSE;
768 } else if (mode_str == "DENSE") {
769 mode = UnionMode::DENSE;
770 } else {
771 return Status::Invalid("Invalid union mode: ", mode_str);
772 }
773
774 const auto& it_type_codes = json_type.FindMember("typeIds");
775 RETURN_NOT_ARRAY("typeIds", it_type_codes, json_type);
776
777 std::vector<uint8_t> type_codes;
778 const auto& id_array = it_type_codes->value.GetArray();
779 for (const rj::Value& val : id_array) {
780 DCHECK(val.IsUint());
781 type_codes.push_back(static_cast<uint8_t>(val.GetUint()));
782 }
783
784 *type = union_(children, type_codes, mode);
785
786 return Status::OK();
787}
788
789static Status GetType(const RjObject& json_type,
790 const std::vector<std::shared_ptr<Field>>& children,
791 std::shared_ptr<DataType>* type) {
792 const auto& it_type_name = json_type.FindMember("name");
793 RETURN_NOT_STRING("name", it_type_name, json_type);
794
795 std::string type_name = it_type_name->value.GetString();
796
797 if (type_name == "int") {
798 return GetInteger(json_type, type);
799 } else if (type_name == "floatingpoint") {
800 return GetFloatingPoint(json_type, type);
801 } else if (type_name == "bool") {
802 *type = boolean();
803 } else if (type_name == "utf8") {
804 *type = utf8();
805 } else if (type_name == "binary") {
806 *type = binary();
807 } else if (type_name == "fixedsizebinary") {
808 return GetFixedSizeBinary(json_type, type);
809 } else if (type_name == "decimal") {
810 return GetDecimal(json_type, type);
811 } else if (type_name == "null") {
812 *type = null();
813 } else if (type_name == "date") {
814 return GetDate(json_type, type);
815 } else if (type_name == "time") {
816 return GetTime(json_type, type);
817 } else if (type_name == "timestamp") {
818 return GetTimestamp(json_type, type);
819 } else if (type_name == "list") {
820 if (children.size() != 1) {
821 return Status::Invalid("List must have exactly one child");
822 }
823 *type = list(children[0]);
824 } else if (type_name == "struct") {
825 *type = struct_(children);
826 } else if (type_name == "union") {
827 return GetUnion(json_type, children, type);
828 } else {
829 return Status::Invalid("Unrecognized type name: ", type_name);
830 }
831 return Status::OK();
832}
833
834static Status GetField(const rj::Value& obj, const DictionaryMemo* dictionary_memo,
835 std::shared_ptr<Field>* field);
836
837static Status GetFieldsFromArray(const rj::Value& obj,
838 const DictionaryMemo* dictionary_memo,
839 std::vector<std::shared_ptr<Field>>* fields) {
840 const auto& values = obj.GetArray();
841
842 fields->resize(values.Size());
843 for (rj::SizeType i = 0; i < fields->size(); ++i) {
844 RETURN_NOT_OK(GetField(values[i], dictionary_memo, &(*fields)[i]));
845 }
846 return Status::OK();
847}
848
849static Status ParseDictionary(const RjObject& obj, int64_t* id, bool* is_ordered,
850 std::shared_ptr<DataType>* index_type) {
851 int32_t int32_id;
852 RETURN_NOT_OK(GetObjectInt(obj, "id", &int32_id));
853 *id = int32_id;
854
855 RETURN_NOT_OK(GetObjectBool(obj, "isOrdered", is_ordered));
856
857 const auto& it_index_type = obj.FindMember("indexType");
858 RETURN_NOT_OBJECT("indexType", it_index_type, obj);
859
860 const auto& json_index_type = it_index_type->value.GetObject();
861
862 std::string type_name;
863 RETURN_NOT_OK(GetObjectString(json_index_type, "name", &type_name));
864 if (type_name != "int") {
865 return Status::Invalid("Dictionary indices can only be integers");
866 }
867 return GetInteger(json_index_type, index_type);
868}
869
870static Status GetField(const rj::Value& obj, const DictionaryMemo* dictionary_memo,
871 std::shared_ptr<Field>* field) {
872 if (!obj.IsObject()) {
873 return Status::Invalid("Field was not a JSON object");
874 }
875 const auto& json_field = obj.GetObject();
876
877 std::string name;
878 bool nullable;
879 RETURN_NOT_OK(GetObjectString(json_field, "name", &name));
880 RETURN_NOT_OK(GetObjectBool(json_field, "nullable", &nullable));
881
882 std::shared_ptr<DataType> type;
883
884 const auto& it_dictionary = json_field.FindMember("dictionary");
885 if (dictionary_memo != nullptr && it_dictionary != json_field.MemberEnd()) {
886 // Field is dictionary encoded. We must have already
887 RETURN_NOT_OBJECT("dictionary", it_dictionary, json_field);
888 int64_t dictionary_id = -1;
889 bool is_ordered;
890 std::shared_ptr<DataType> index_type;
891 RETURN_NOT_OK(ParseDictionary(it_dictionary->value.GetObject(), &dictionary_id,
892 &is_ordered, &index_type));
893
894 std::shared_ptr<Array> dictionary;
895 RETURN_NOT_OK(dictionary_memo->GetDictionary(dictionary_id, &dictionary));
896
897 type = std::make_shared<DictionaryType>(index_type, dictionary, is_ordered);
898 } else {
899 // If the dictionary_memo was not passed, or if the field is not dictionary
900 // encoded, we are interested in the complete type including all children
901
902 const auto& it_type = json_field.FindMember("type");
903 RETURN_NOT_OBJECT("type", it_type, json_field);
904
905 const auto& it_children = json_field.FindMember("children");
906 RETURN_NOT_ARRAY("children", it_children, json_field);
907
908 std::vector<std::shared_ptr<Field>> children;
909 RETURN_NOT_OK(GetFieldsFromArray(it_children->value, dictionary_memo, &children));
910 RETURN_NOT_OK(GetType(it_type->value.GetObject(), children, &type));
911 }
912
913 *field = std::make_shared<Field>(name, type, nullable);
914 return Status::OK();
915}
916
917template <typename T>
918inline typename std::enable_if<IsSignedInt<T>::value, typename T::c_type>::type
919UnboxValue(const rj::Value& val) {
920 DCHECK(val.IsInt64());
921 return static_cast<typename T::c_type>(val.GetInt64());
922}
923
924template <typename T>
925inline typename std::enable_if<IsUnsignedInt<T>::value, typename T::c_type>::type
926UnboxValue(const rj::Value& val) {
927 DCHECK(val.IsUint());
928 return static_cast<typename T::c_type>(val.GetUint64());
929}
930
931template <typename T>
932inline typename std::enable_if<IsFloatingPoint<T>::value, typename T::c_type>::type
933UnboxValue(const rj::Value& val) {
934 DCHECK(val.IsFloat());
935 return static_cast<typename T::c_type>(val.GetDouble());
936}
937
938template <typename T>
939inline typename std::enable_if<std::is_base_of<BooleanType, T>::value, bool>::type
940UnboxValue(const rj::Value& val) {
941 DCHECK(val.IsBool());
942 return val.GetBool();
943}
944
945class ArrayReader {
946 public:
947 explicit ArrayReader(const rj::Value& json_array, const std::shared_ptr<DataType>& type,
948 MemoryPool* pool)
949 : json_array_(json_array), type_(type), pool_(pool) {}
950
951 Status ParseTypeValues(const DataType& type);
952
953 Status GetValidityBuffer(const std::vector<bool>& is_valid, int32_t* null_count,
954 std::shared_ptr<Buffer>* validity_buffer) {
955 int length = static_cast<int>(is_valid.size());
956
957 std::shared_ptr<Buffer> out_buffer;
958 RETURN_NOT_OK(AllocateEmptyBitmap(pool_, length, &out_buffer));
959 uint8_t* bitmap = out_buffer->mutable_data();
960
961 *null_count = 0;
962 for (int i = 0; i < length; ++i) {
963 if (!is_valid[i]) {
964 ++(*null_count);
965 continue;
966 }
967 BitUtil::SetBit(bitmap, i);
968 }
969
970 *validity_buffer = out_buffer;
971 return Status::OK();
972 }
973
974 template <typename T>
975 typename std::enable_if<
976 std::is_base_of<PrimitiveCType, T>::value || std::is_base_of<DateType, T>::value ||
977 std::is_base_of<TimestampType, T>::value ||
978 std::is_base_of<TimeType, T>::value || std::is_base_of<BooleanType, T>::value,
979 Status>::type
980 Visit(const T& type) {
981 typename TypeTraits<T>::BuilderType builder(type_, pool_);
982
983 const auto& json_data = obj_->FindMember("DATA");
984 RETURN_NOT_ARRAY("DATA", json_data, *obj_);
985
986 const auto& json_data_arr = json_data->value.GetArray();
987
988 DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_);
989 for (int i = 0; i < length_; ++i) {
990 if (!is_valid_[i]) {
991 RETURN_NOT_OK(builder.AppendNull());
992 continue;
993 }
994
995 const rj::Value& val = json_data_arr[i];
996 RETURN_NOT_OK(builder.Append(UnboxValue<T>(val)));
997 }
998
999 return builder.Finish(&result_);
1000 }
1001
1002 template <typename T>
1003 typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type Visit(
1004 const T& type) {
1005 typename TypeTraits<T>::BuilderType builder(pool_);
1006
1007 const auto& json_data = obj_->FindMember("DATA");
1008 RETURN_NOT_ARRAY("DATA", json_data, *obj_);
1009
1010 const auto& json_data_arr = json_data->value.GetArray();
1011
1012 DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_);
1013
1014 for (int i = 0; i < length_; ++i) {
1015 if (!is_valid_[i]) {
1016 RETURN_NOT_OK(builder.AppendNull());
1017 continue;
1018 }
1019
1020 const rj::Value& val = json_data_arr[i];
1021 DCHECK(val.IsString());
1022 if (std::is_base_of<StringType, T>::value) {
1023 RETURN_NOT_OK(builder.Append(val.GetString()));
1024 } else {
1025 std::string hex_string = val.GetString();
1026
1027 DCHECK(hex_string.size() % 2 == 0) << "Expected base16 hex string";
1028 int32_t length = static_cast<int>(hex_string.size()) / 2;
1029
1030 std::shared_ptr<Buffer> byte_buffer;
1031 RETURN_NOT_OK(AllocateBuffer(pool_, length, &byte_buffer));
1032
1033 const char* hex_data = hex_string.c_str();
1034 uint8_t* byte_buffer_data = byte_buffer->mutable_data();
1035 for (int32_t j = 0; j < length; ++j) {
1036 RETURN_NOT_OK(ParseHexValue(hex_data + j * 2, &byte_buffer_data[j]));
1037 }
1038 RETURN_NOT_OK(builder.Append(byte_buffer_data, length));
1039 }
1040 }
1041
1042 return builder.Finish(&result_);
1043 }
1044
1045 template <typename T>
1046 typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value &&
1047 !std::is_base_of<Decimal128Type, T>::value,
1048 Status>::type
1049 Visit(const T& type) {
1050 typename TypeTraits<T>::BuilderType builder(type_, pool_);
1051
1052 const auto& json_data = obj_->FindMember("DATA");
1053 RETURN_NOT_ARRAY("DATA", json_data, *obj_);
1054
1055 const auto& json_data_arr = json_data->value.GetArray();
1056
1057 DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_);
1058 int32_t byte_width = type.byte_width();
1059
1060 // Allocate space for parsed values
1061 std::shared_ptr<Buffer> byte_buffer;
1062 RETURN_NOT_OK(AllocateBuffer(pool_, byte_width, &byte_buffer));
1063 uint8_t* byte_buffer_data = byte_buffer->mutable_data();
1064
1065 for (int i = 0; i < length_; ++i) {
1066 if (!is_valid_[i]) {
1067 RETURN_NOT_OK(builder.AppendNull());
1068 } else {
1069 const rj::Value& val = json_data_arr[i];
1070 DCHECK(val.IsString())
1071 << "Found non-string JSON value when parsing FixedSizeBinary value";
1072 std::string hex_string = val.GetString();
1073 if (static_cast<int32_t>(hex_string.size()) != byte_width * 2) {
1074 DCHECK(false) << "Expected size: " << byte_width * 2
1075 << " got: " << hex_string.size();
1076 }
1077 const char* hex_data = hex_string.c_str();
1078
1079 for (int32_t j = 0; j < byte_width; ++j) {
1080 RETURN_NOT_OK(ParseHexValue(hex_data + j * 2, &byte_buffer_data[j]));
1081 }
1082 RETURN_NOT_OK(builder.Append(byte_buffer_data));
1083 }
1084 }
1085 return builder.Finish(&result_);
1086 }
1087
1088 template <typename T>
1089 typename std::enable_if<std::is_base_of<Decimal128Type, T>::value, Status>::type Visit(
1090 const T& type) {
1091 typename TypeTraits<T>::BuilderType builder(type_, pool_);
1092
1093 const auto& json_data = obj_->FindMember("DATA");
1094 RETURN_NOT_ARRAY("DATA", json_data, *obj_);
1095
1096 const auto& json_data_arr = json_data->value.GetArray();
1097
1098 DCHECK_EQ(static_cast<int32_t>(json_data_arr.Size()), length_);
1099
1100 for (int i = 0; i < length_; ++i) {
1101 if (!is_valid_[i]) {
1102 RETURN_NOT_OK(builder.AppendNull());
1103 } else {
1104 const rj::Value& val = json_data_arr[i];
1105 DCHECK(val.IsString())
1106 << "Found non-string JSON value when parsing Decimal128 value";
1107 DCHECK_GT(val.GetStringLength(), 0)
1108 << "Empty string found when parsing Decimal128 value";
1109
1110 Decimal128 value;
1111 RETURN_NOT_OK(Decimal128::FromString(val.GetString(), &value));
1112 RETURN_NOT_OK(builder.Append(value));
1113 }
1114 }
1115 return builder.Finish(&result_);
1116 }
1117
1118 template <typename T>
1119 Status GetIntArray(const RjArray& json_array, const int32_t length,
1120 std::shared_ptr<Buffer>* out) {
1121 std::shared_ptr<Buffer> buffer;
1122 RETURN_NOT_OK(AllocateBuffer(pool_, length * sizeof(T), &buffer));
1123
1124 T* values = reinterpret_cast<T*>(buffer->mutable_data());
1125 for (int i = 0; i < length; ++i) {
1126 const rj::Value& val = json_array[i];
1127 DCHECK(val.IsInt());
1128 values[i] = static_cast<T>(val.GetInt());
1129 }
1130
1131 *out = buffer;
1132 return Status::OK();
1133 }
1134
1135 Status Visit(const ListType& type) {
1136 int32_t null_count = 0;
1137 std::shared_ptr<Buffer> validity_buffer;
1138 RETURN_NOT_OK(GetValidityBuffer(is_valid_, &null_count, &validity_buffer));
1139
1140 const auto& json_offsets = obj_->FindMember("OFFSET");
1141 RETURN_NOT_ARRAY("OFFSET", json_offsets, *obj_);
1142 std::shared_ptr<Buffer> offsets_buffer;
1143 RETURN_NOT_OK(GetIntArray<int32_t>(json_offsets->value.GetArray(), length_ + 1,
1144 &offsets_buffer));
1145
1146 std::vector<std::shared_ptr<Array>> children;
1147 RETURN_NOT_OK(GetChildren(*obj_, type, &children));
1148 DCHECK_EQ(children.size(), 1);
1149
1150 result_ = std::make_shared<ListArray>(type_, length_, offsets_buffer, children[0],
1151 validity_buffer, null_count);
1152
1153 return Status::OK();
1154 }
1155
1156 Status Visit(const StructType& type) {
1157 int32_t null_count = 0;
1158 std::shared_ptr<Buffer> validity_buffer;
1159 RETURN_NOT_OK(GetValidityBuffer(is_valid_, &null_count, &validity_buffer));
1160
1161 std::vector<std::shared_ptr<Array>> fields;
1162 RETURN_NOT_OK(GetChildren(*obj_, type, &fields));
1163
1164 result_ = std::make_shared<StructArray>(type_, length_, fields, validity_buffer,
1165 null_count);
1166
1167 return Status::OK();
1168 }
1169
1170 Status Visit(const UnionType& type) {
1171 int32_t null_count = 0;
1172
1173 std::shared_ptr<Buffer> validity_buffer;
1174 std::shared_ptr<Buffer> type_id_buffer;
1175 std::shared_ptr<Buffer> offsets_buffer;
1176
1177 RETURN_NOT_OK(GetValidityBuffer(is_valid_, &null_count, &validity_buffer));
1178
1179 const auto& json_type_ids = obj_->FindMember("TYPE_ID");
1180 RETURN_NOT_ARRAY("TYPE_ID", json_type_ids, *obj_);
1181 RETURN_NOT_OK(
1182 GetIntArray<uint8_t>(json_type_ids->value.GetArray(), length_, &type_id_buffer));
1183
1184 if (type.mode() == UnionMode::DENSE) {
1185 const auto& json_offsets = obj_->FindMember("OFFSET");
1186 RETURN_NOT_ARRAY("OFFSET", json_offsets, *obj_);
1187 RETURN_NOT_OK(
1188 GetIntArray<int32_t>(json_offsets->value.GetArray(), length_, &offsets_buffer));
1189 }
1190
1191 std::vector<std::shared_ptr<Array>> children;
1192 RETURN_NOT_OK(GetChildren(*obj_, type, &children));
1193
1194 result_ = std::make_shared<UnionArray>(type_, length_, children, type_id_buffer,
1195 offsets_buffer, validity_buffer, null_count);
1196
1197 return Status::OK();
1198 }
1199
1200 Status Visit(const NullType& type) {
1201 result_ = std::make_shared<NullArray>(length_);
1202 return Status::OK();
1203 }
1204
1205 Status Visit(const DictionaryType& type) {
1206 // This stores the indices in result_
1207 //
1208 // XXX(wesm): slight hack
1209 auto dict_type = type_;
1210 type_ = type.index_type();
1211 RETURN_NOT_OK(ParseTypeValues(*type_));
1212 type_ = dict_type;
1213 result_ = std::make_shared<DictionaryArray>(type_, result_);
1214 return Status::OK();
1215 }
1216
1217 Status GetChildren(const RjObject& obj, const DataType& type,
1218 std::vector<std::shared_ptr<Array>>* array) {
1219 const auto& json_children = obj.FindMember("children");
1220 RETURN_NOT_ARRAY("children", json_children, obj);
1221 const auto& json_children_arr = json_children->value.GetArray();
1222
1223 if (type.num_children() != static_cast<int>(json_children_arr.Size())) {
1224 return Status::Invalid("Expected ", type.num_children(), " children, but got ",
1225 json_children_arr.Size());
1226 }
1227
1228 for (int i = 0; i < static_cast<int>(json_children_arr.Size()); ++i) {
1229 const rj::Value& json_child = json_children_arr[i];
1230 DCHECK(json_child.IsObject());
1231
1232 std::shared_ptr<Field> child_field = type.child(i);
1233
1234 auto it = json_child.FindMember("name");
1235 RETURN_NOT_STRING("name", it, json_child);
1236
1237 DCHECK_EQ(it->value.GetString(), child_field->name());
1238 std::shared_ptr<Array> child;
1239 RETURN_NOT_OK(ReadArray(pool_, json_children_arr[i], child_field->type(), &child));
1240 array->emplace_back(child);
1241 }
1242
1243 return Status::OK();
1244 }
1245
1246 Status GetArray(std::shared_ptr<Array>* out) {
1247 if (!json_array_.IsObject()) {
1248 return Status::Invalid("Array element was not a JSON object");
1249 }
1250
1251 auto obj = json_array_.GetObject();
1252 obj_ = &obj;
1253
1254 RETURN_NOT_OK(GetObjectInt(obj, "count", &length_));
1255
1256 const auto& json_valid_iter = obj.FindMember("VALIDITY");
1257 RETURN_NOT_ARRAY("VALIDITY", json_valid_iter, obj);
1258
1259 const auto& json_validity = json_valid_iter->value.GetArray();
1260 DCHECK_EQ(static_cast<int>(json_validity.Size()), length_);
1261 for (const rj::Value& val : json_validity) {
1262 DCHECK(val.IsInt());
1263 is_valid_.push_back(val.GetInt() != 0);
1264 }
1265
1266 RETURN_NOT_OK(ParseTypeValues(*type_));
1267 *out = result_;
1268 return Status::OK();
1269 }
1270
1271 private:
1272 const rj::Value& json_array_;
1273 const RjObject* obj_;
1274 std::shared_ptr<DataType> type_;
1275 MemoryPool* pool_;
1276
1277 // Parsed common attributes
1278 std::vector<bool> is_valid_;
1279 int32_t length_;
1280 std::shared_ptr<Array> result_;
1281};
1282
1283Status ArrayReader::ParseTypeValues(const DataType& type) {
1284 return VisitTypeInline(type, this);
1285}
1286
1287Status WriteSchema(const Schema& schema, RjWriter* json_writer) {
1288 SchemaWriter converter(schema, json_writer);
1289 return converter.Write();
1290}
1291
1292static Status LookForDictionaries(const rj::Value& obj, DictionaryTypeMap* id_to_field) {
1293 const auto& json_field = obj.GetObject();
1294
1295 const auto& it_dictionary = json_field.FindMember("dictionary");
1296 if (it_dictionary == json_field.MemberEnd()) {
1297 // Not dictionary-encoded
1298 return Status::OK();
1299 }
1300
1301 // Dictionary encoded. Construct the field and set in the type map
1302 std::shared_ptr<Field> dictionary_field;
1303 RETURN_NOT_OK(GetField(obj, nullptr, &dictionary_field));
1304
1305 int id;
1306 RETURN_NOT_OK(GetObjectInt(it_dictionary->value.GetObject(), "id", &id));
1307 (*id_to_field)[id] = dictionary_field;
1308 return Status::OK();
1309}
1310
1311static Status GetDictionaryTypes(const RjArray& fields, DictionaryTypeMap* id_to_field) {
1312 for (rj::SizeType i = 0; i < fields.Size(); ++i) {
1313 RETURN_NOT_OK(LookForDictionaries(fields[i], id_to_field));
1314 }
1315 return Status::OK();
1316}
1317
1318static Status ReadDictionary(const RjObject& obj, const DictionaryTypeMap& id_to_field,
1319 MemoryPool* pool, int64_t* dictionary_id,
1320 std::shared_ptr<Array>* out) {
1321 int id;
1322 RETURN_NOT_OK(GetObjectInt(obj, "id", &id));
1323
1324 const auto& it_data = obj.FindMember("data");
1325 RETURN_NOT_OBJECT("data", it_data, obj);
1326
1327 auto it = id_to_field.find(id);
1328 if (it == id_to_field.end()) {
1329 return Status::Invalid("No dictionary with id ", id);
1330 }
1331 std::vector<std::shared_ptr<Field>> fields = {it->second};
1332
1333 // We need a schema for the record batch
1334 auto dummy_schema = std::make_shared<Schema>(fields);
1335
1336 // The dictionary is embedded in a record batch with a single column
1337 std::shared_ptr<RecordBatch> batch;
1338 RETURN_NOT_OK(ReadRecordBatch(it_data->value, dummy_schema, pool, &batch));
1339
1340 if (batch->num_columns() != 1) {
1341 return Status::Invalid("Dictionary record batch must only contain one field");
1342 }
1343
1344 *dictionary_id = id;
1345 *out = batch->column(0);
1346 return Status::OK();
1347}
1348
1349static Status ReadDictionaries(const rj::Value& doc, const DictionaryTypeMap& id_to_field,
1350 MemoryPool* pool, DictionaryMemo* dictionary_memo) {
1351 auto it = doc.FindMember("dictionaries");
1352 if (it == doc.MemberEnd()) {
1353 // No dictionaries
1354 return Status::OK();
1355 }
1356
1357 RETURN_NOT_ARRAY("dictionaries", it, doc);
1358 const auto& dictionary_array = it->value.GetArray();
1359
1360 for (const rj::Value& val : dictionary_array) {
1361 DCHECK(val.IsObject());
1362 int64_t dictionary_id = -1;
1363 std::shared_ptr<Array> dictionary;
1364 RETURN_NOT_OK(
1365 ReadDictionary(val.GetObject(), id_to_field, pool, &dictionary_id, &dictionary));
1366
1367 RETURN_NOT_OK(dictionary_memo->AddDictionary(dictionary_id, dictionary));
1368 }
1369 return Status::OK();
1370}
1371
1372Status ReadSchema(const rj::Value& json_schema, MemoryPool* pool,
1373 std::shared_ptr<Schema>* schema) {
1374 auto it = json_schema.FindMember("schema");
1375 RETURN_NOT_OBJECT("schema", it, json_schema);
1376 const auto& obj_schema = it->value.GetObject();
1377
1378 const auto& it_fields = obj_schema.FindMember("fields");
1379 RETURN_NOT_ARRAY("fields", it_fields, obj_schema);
1380
1381 // Determine the dictionary types
1382 DictionaryTypeMap dictionary_types;
1383 RETURN_NOT_OK(GetDictionaryTypes(it_fields->value.GetArray(), &dictionary_types));
1384
1385 // Read the dictionaries (if any) and cache in the memo
1386 DictionaryMemo dictionary_memo;
1387 RETURN_NOT_OK(ReadDictionaries(json_schema, dictionary_types, pool, &dictionary_memo));
1388
1389 std::vector<std::shared_ptr<Field>> fields;
1390 RETURN_NOT_OK(GetFieldsFromArray(it_fields->value, &dictionary_memo, &fields));
1391
1392 *schema = std::make_shared<Schema>(fields);
1393 return Status::OK();
1394}
1395
1396Status ReadRecordBatch(const rj::Value& json_obj, const std::shared_ptr<Schema>& schema,
1397 MemoryPool* pool, std::shared_ptr<RecordBatch>* batch) {
1398 DCHECK(json_obj.IsObject());
1399 const auto& batch_obj = json_obj.GetObject();
1400
1401 auto it = batch_obj.FindMember("count");
1402 RETURN_NOT_INT("count", it, batch_obj);
1403 int32_t num_rows = static_cast<int32_t>(it->value.GetInt());
1404
1405 it = batch_obj.FindMember("columns");
1406 RETURN_NOT_ARRAY("columns", it, batch_obj);
1407 const auto& json_columns = it->value.GetArray();
1408
1409 std::vector<std::shared_ptr<Array>> columns(json_columns.Size());
1410 for (int i = 0; i < static_cast<int>(columns.size()); ++i) {
1411 const std::shared_ptr<DataType>& type = schema->field(i)->type();
1412 RETURN_NOT_OK(ReadArray(pool, json_columns[i], type, &columns[i]));
1413 }
1414
1415 *batch = RecordBatch::Make(schema, num_rows, columns);
1416 return Status::OK();
1417}
1418
1419Status WriteRecordBatch(const RecordBatch& batch, RjWriter* writer) {
1420 writer->StartObject();
1421 writer->Key("count");
1422 writer->Int(static_cast<int32_t>(batch.num_rows()));
1423
1424 writer->Key("columns");
1425 writer->StartArray();
1426
1427 for (int i = 0; i < batch.num_columns(); ++i) {
1428 const std::shared_ptr<Array>& column = batch.column(i);
1429
1430 DCHECK_EQ(batch.num_rows(), column->length())
1431 << "Array length did not match record batch length";
1432
1433 RETURN_NOT_OK(WriteArray(batch.column_name(i), *column, writer));
1434 }
1435
1436 writer->EndArray();
1437 writer->EndObject();
1438 return Status::OK();
1439}
1440
1441Status WriteArray(const std::string& name, const Array& array, RjWriter* json_writer) {
1442 ArrayWriter converter(name, array, json_writer);
1443 return converter.Write();
1444}
1445
1446Status ReadArray(MemoryPool* pool, const rj::Value& json_array,
1447 const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
1448 ArrayReader converter(json_array, type, pool);
1449 return converter.GetArray(array);
1450}
1451
1452Status ReadArray(MemoryPool* pool, const rj::Value& json_array, const Schema& schema,
1453 std::shared_ptr<Array>* array) {
1454 if (!json_array.IsObject()) {
1455 return Status::Invalid("Element was not a JSON object");
1456 }
1457
1458 const auto& json_obj = json_array.GetObject();
1459
1460 const auto& it_name = json_obj.FindMember("name");
1461 RETURN_NOT_STRING("name", it_name, json_obj);
1462
1463 std::string name = it_name->value.GetString();
1464
1465 std::shared_ptr<Field> result = nullptr;
1466 for (const std::shared_ptr<Field>& field : schema.fields()) {
1467 if (field->name() == name) {
1468 result = field;
1469 break;
1470 }
1471 }
1472
1473 if (result == nullptr) {
1474 return Status::KeyError("Field named ", name, " not found in schema");
1475 }
1476
1477 return ReadArray(pool, json_array, result->type(), array);
1478}
1479
1480} // namespace json
1481} // namespace internal
1482} // namespace ipc
1483} // namespace arrow
1484