1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/ipc/json.h"
19
20#include <cstddef>
21#include <memory>
22#include <string>
23
24#include "arrow/buffer.h"
25#include "arrow/ipc/json-internal.h"
26#include "arrow/memory_pool.h"
27#include "arrow/record_batch.h"
28#include "arrow/status.h"
29#include "arrow/type.h"
30#include "arrow/util/logging.h"
31
32using std::size_t;
33
34namespace arrow {
35namespace ipc {
36namespace internal {
37namespace json {
38
39// ----------------------------------------------------------------------
40// Writer implementation
41
42class JsonWriter::JsonWriterImpl {
43 public:
44 explicit JsonWriterImpl(const std::shared_ptr<Schema>& schema) : schema_(schema) {
45 writer_.reset(new RjWriter(string_buffer_));
46 }
47
48 Status Start() {
49 writer_->StartObject();
50 RETURN_NOT_OK(json::WriteSchema(*schema_, writer_.get()));
51
52 // Record batches
53 writer_->Key("batches");
54 writer_->StartArray();
55 return Status::OK();
56 }
57
58 Status Finish(std::string* result) {
59 writer_->EndArray(); // Record batches
60 writer_->EndObject();
61
62 *result = string_buffer_.GetString();
63 return Status::OK();
64 }
65
66 Status WriteRecordBatch(const RecordBatch& batch) {
67 DCHECK_EQ(batch.num_columns(), schema_->num_fields());
68 return json::WriteRecordBatch(batch, writer_.get());
69 }
70
71 private:
72 std::shared_ptr<Schema> schema_;
73
74 rj::StringBuffer string_buffer_;
75 std::unique_ptr<RjWriter> writer_;
76};
77
78JsonWriter::JsonWriter(const std::shared_ptr<Schema>& schema) {
79 impl_.reset(new JsonWriterImpl(schema));
80}
81
82JsonWriter::~JsonWriter() {}
83
84Status JsonWriter::Open(const std::shared_ptr<Schema>& schema,
85 std::unique_ptr<JsonWriter>* writer) {
86 *writer = std::unique_ptr<JsonWriter>(new JsonWriter(schema));
87 return (*writer)->impl_->Start();
88}
89
90Status JsonWriter::Finish(std::string* result) { return impl_->Finish(result); }
91
92Status JsonWriter::WriteRecordBatch(const RecordBatch& batch) {
93 return impl_->WriteRecordBatch(batch);
94}
95
96// ----------------------------------------------------------------------
97// Reader implementation
98
99class JsonReader::JsonReaderImpl {
100 public:
101 JsonReaderImpl(MemoryPool* pool, const std::shared_ptr<Buffer>& data)
102 : pool_(pool), data_(data), record_batches_(nullptr) {}
103
104 Status ParseAndReadSchema() {
105 doc_.Parse(reinterpret_cast<const rj::Document::Ch*>(data_->data()),
106 static_cast<size_t>(data_->size()));
107 if (doc_.HasParseError()) {
108 return Status::IOError("JSON parsing failed");
109 }
110
111 RETURN_NOT_OK(json::ReadSchema(doc_, pool_, &schema_));
112
113 auto it = doc_.FindMember("batches");
114 RETURN_NOT_ARRAY("batches", it, doc_);
115 record_batches_ = &it->value;
116
117 return Status::OK();
118 }
119
120 Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
121 DCHECK_GE(i, 0) << "i out of bounds";
122 DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size()))
123 << "i out of bounds";
124
125 return json::ReadRecordBatch(record_batches_->GetArray()[i], schema_, pool_, batch);
126 }
127
128 std::shared_ptr<Schema> schema() const { return schema_; }
129
130 int num_record_batches() const {
131 return static_cast<int>(record_batches_->GetArray().Size());
132 }
133
134 private:
135 MemoryPool* pool_;
136 std::shared_ptr<Buffer> data_;
137 rj::Document doc_;
138
139 const rj::Value* record_batches_;
140 std::shared_ptr<Schema> schema_;
141};
142
143JsonReader::JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data) {
144 impl_.reset(new JsonReaderImpl(pool, data));
145}
146
147JsonReader::~JsonReader() {}
148
149Status JsonReader::Open(const std::shared_ptr<Buffer>& data,
150 std::unique_ptr<JsonReader>* reader) {
151 return Open(default_memory_pool(), data, reader);
152}
153
154Status JsonReader::Open(MemoryPool* pool, const std::shared_ptr<Buffer>& data,
155 std::unique_ptr<JsonReader>* reader) {
156 *reader = std::unique_ptr<JsonReader>(new JsonReader(pool, data));
157 return (*reader)->impl_->ParseAndReadSchema();
158}
159
160std::shared_ptr<Schema> JsonReader::schema() const { return impl_->schema(); }
161
162int JsonReader::num_record_batches() const { return impl_->num_record_batches(); }
163
164Status JsonReader::ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const {
165 return impl_->ReadRecordBatch(i, batch);
166}
167
168} // namespace json
169} // namespace internal
170} // namespace ipc
171} // namespace arrow
172