1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <cstdint>
19#include <cstdio>
20#include <cstring>
21#include <memory>
22#include <string>
23#include <vector>
24
25#include <gtest/gtest.h>
26
27#include "arrow/array.h"
28#include "arrow/buffer.h"
29#include "arrow/builder.h"
30#include "arrow/ipc/json-internal.h"
31#include "arrow/ipc/json.h"
32#include "arrow/ipc/test-common.h"
33#include "arrow/memory_pool.h"
34#include "arrow/record_batch.h"
35#include "arrow/test-util.h"
36#include "arrow/type.h"
37#include "arrow/type_traits.h"
38
39namespace arrow {
40namespace ipc {
41namespace internal {
42namespace json {
43
44void TestSchemaRoundTrip(const Schema& schema) {
45 rj::StringBuffer sb;
46 rj::Writer<rj::StringBuffer> writer(sb);
47
48 writer.StartObject();
49 ASSERT_OK(WriteSchema(schema, &writer));
50 writer.EndObject();
51
52 std::string json_schema = sb.GetString();
53
54 rj::Document d;
55 d.Parse(json_schema);
56
57 std::shared_ptr<Schema> out;
58 if (!ReadSchema(d, default_memory_pool(), &out).ok()) {
59 FAIL() << "Unable to read JSON schema: " << json_schema;
60 }
61
62 if (!schema.Equals(*out)) {
63 FAIL() << "In schema: " << schema.ToString() << "\nOut schema: " << out->ToString();
64 }
65}
66
67void TestArrayRoundTrip(const Array& array) {
68 static std::string name = "dummy";
69
70 rj::StringBuffer sb;
71 rj::Writer<rj::StringBuffer> writer(sb);
72
73 ASSERT_OK(WriteArray(name, array, &writer));
74
75 std::string array_as_json = sb.GetString();
76
77 rj::Document d;
78 d.Parse(array_as_json);
79
80 if (d.HasParseError()) {
81 FAIL() << "JSON parsing failed";
82 }
83
84 std::shared_ptr<Array> out;
85 ASSERT_OK(ReadArray(default_memory_pool(), d, array.type(), &out));
86
87 // std::cout << array_as_json << std::endl;
88 CompareArraysDetailed(0, *out, array);
89}
90
91template <typename T, typename ValueType>
92void CheckPrimitive(const std::shared_ptr<DataType>& type,
93 const std::vector<bool>& is_valid,
94 const std::vector<ValueType>& values) {
95 MemoryPool* pool = default_memory_pool();
96 typename TypeTraits<T>::BuilderType builder(pool);
97
98 for (size_t i = 0; i < values.size(); ++i) {
99 if (is_valid[i]) {
100 ASSERT_OK(builder.Append(values[i]));
101 } else {
102 ASSERT_OK(builder.AppendNull());
103 }
104 }
105
106 std::shared_ptr<Array> array;
107 ASSERT_OK(builder.Finish(&array));
108 TestArrayRoundTrip(*array.get());
109}
110
111TEST(TestJsonSchemaWriter, FlatTypes) {
112 // TODO
113 // field("f14", date32())
114 std::vector<std::shared_ptr<Field>> fields = {
115 field("f0", int8()),
116 field("f1", int16(), false),
117 field("f2", int32()),
118 field("f3", int64(), false),
119 field("f4", uint8()),
120 field("f5", uint16()),
121 field("f6", uint32()),
122 field("f7", uint64()),
123 field("f8", float32()),
124 field("f9", float64()),
125 field("f10", utf8()),
126 field("f11", binary()),
127 field("f12", list(int32())),
128 field("f13", struct_({field("s1", int32()), field("s2", utf8())})),
129 field("f15", date64()),
130 field("f16", timestamp(TimeUnit::NANO)),
131 field("f17", time64(TimeUnit::MICRO)),
132 field("f18", union_({field("u1", int8()), field("u2", time32(TimeUnit::MILLI))},
133 {0, 1}, UnionMode::DENSE))};
134
135 Schema schema(fields);
136 TestSchemaRoundTrip(schema);
137}
138
139template <typename T>
140void PrimitiveTypesCheckOne() {
141 using c_type = typename T::c_type;
142
143 std::vector<bool> is_valid = {true, false, true, true, true, false, true, true};
144 std::vector<c_type> values = {0, 1, 2, 3, 4, 5, 6, 7};
145 CheckPrimitive<T, c_type>(std::make_shared<T>(), is_valid, values);
146}
147
148TEST(TestJsonArrayWriter, PrimitiveTypes) {
149 PrimitiveTypesCheckOne<Int8Type>();
150 PrimitiveTypesCheckOne<Int16Type>();
151 PrimitiveTypesCheckOne<Int32Type>();
152 PrimitiveTypesCheckOne<Int64Type>();
153 PrimitiveTypesCheckOne<UInt8Type>();
154 PrimitiveTypesCheckOne<UInt16Type>();
155 PrimitiveTypesCheckOne<UInt32Type>();
156 PrimitiveTypesCheckOne<UInt64Type>();
157 PrimitiveTypesCheckOne<FloatType>();
158 PrimitiveTypesCheckOne<DoubleType>();
159
160 std::vector<bool> is_valid = {true, false, true, true, true, false, true, true};
161 std::vector<std::string> values = {"foo", "bar", "", "baz", "qux", "foo", "a", "1"};
162
163 CheckPrimitive<StringType, std::string>(utf8(), is_valid, values);
164 CheckPrimitive<BinaryType, std::string>(binary(), is_valid, values);
165}
166
167TEST(TestJsonArrayWriter, NestedTypes) {
168 auto value_type = int32();
169
170 std::vector<bool> values_is_valid = {true, false, true, true, false, true, true};
171
172 std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
173 std::shared_ptr<Array> values_array;
174 ArrayFromVector<Int32Type, int32_t>(values_is_valid, values, &values_array);
175
176 std::vector<int16_t> i16_values = {0, 1, 2, 3, 4, 5, 6};
177 std::shared_ptr<Array> i16_values_array;
178 ArrayFromVector<Int16Type, int16_t>(values_is_valid, i16_values, &i16_values_array);
179
180 // List
181 std::vector<bool> list_is_valid = {true, false, true, true, true};
182 std::vector<int32_t> offsets = {0, 0, 0, 1, 4, 7};
183
184 std::shared_ptr<Buffer> list_bitmap;
185 ASSERT_OK(GetBitmapFromVector(list_is_valid, &list_bitmap));
186 std::shared_ptr<Buffer> offsets_buffer = Buffer::Wrap(offsets);
187
188 ListArray list_array(list(value_type), 5, offsets_buffer, values_array, list_bitmap, 1);
189
190 TestArrayRoundTrip(list_array);
191
192 // Struct
193 std::vector<bool> struct_is_valid = {true, false, true, true, true, false, true};
194 std::shared_ptr<Buffer> struct_bitmap;
195 ASSERT_OK(GetBitmapFromVector(struct_is_valid, &struct_bitmap));
196
197 auto struct_type =
198 struct_({field("f1", int32()), field("f2", int32()), field("f3", int32())});
199
200 std::vector<std::shared_ptr<Array>> fields = {values_array, values_array, values_array};
201 StructArray struct_array(struct_type, static_cast<int>(struct_is_valid.size()), fields,
202 struct_bitmap, 2);
203 TestArrayRoundTrip(struct_array);
204}
205
206TEST(TestJsonArrayWriter, Unions) {
207 std::shared_ptr<RecordBatch> batch;
208 ASSERT_OK(MakeUnion(&batch));
209
210 for (int i = 0; i < batch->num_columns(); ++i) {
211 std::shared_ptr<Array> col = batch->column(i);
212 TestArrayRoundTrip(*col.get());
213 }
214}
215
216// Data generation for test case below
217void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows,
218 std::vector<std::shared_ptr<Array>>* arrays) {
219 std::vector<bool> is_valid;
220 random_is_valid(num_rows, 0.25, &is_valid);
221
222 std::vector<int8_t> v1_values;
223 std::vector<int32_t> v2_values;
224
225 randint(num_rows, 0, 100, &v1_values);
226 randint(num_rows, 0, 100, &v2_values);
227
228 std::shared_ptr<Array> v1;
229 ArrayFromVector<Int8Type, int8_t>(is_valid, v1_values, &v1);
230
231 std::shared_ptr<Array> v2;
232 ArrayFromVector<Int32Type, int32_t>(is_valid, v2_values, &v2);
233
234 static const int kBufferSize = 10;
235 static uint8_t buffer[kBufferSize];
236 static uint32_t seed = 0;
237 StringBuilder string_builder;
238 for (int i = 0; i < num_rows; ++i) {
239 if (!is_valid[i]) {
240 ASSERT_OK(string_builder.AppendNull());
241 } else {
242 random_ascii(kBufferSize, seed++, buffer);
243 ASSERT_OK(string_builder.Append(buffer, kBufferSize));
244 }
245 }
246 std::shared_ptr<Array> v3;
247 ASSERT_OK(string_builder.Finish(&v3));
248
249 arrays->emplace_back(v1);
250 arrays->emplace_back(v2);
251 arrays->emplace_back(v3);
252}
253
254TEST(TestJsonFileReadWrite, BasicRoundTrip) {
255 auto v1_type = int8();
256 auto v2_type = int32();
257 auto v3_type = utf8();
258
259 auto schema =
260 ::arrow::schema({field("f1", v1_type), field("f2", v2_type), field("f3", v3_type)});
261
262 std::unique_ptr<JsonWriter> writer;
263 ASSERT_OK(JsonWriter::Open(schema, &writer));
264
265 const int nbatches = 3;
266 std::vector<std::shared_ptr<RecordBatch>> batches;
267 for (int i = 0; i < nbatches; ++i) {
268 int num_rows = 5 + i * 5;
269 std::vector<std::shared_ptr<Array>> arrays;
270
271 MakeBatchArrays(schema, num_rows, &arrays);
272 auto batch = RecordBatch::Make(schema, num_rows, arrays);
273 batches.push_back(batch);
274 ASSERT_OK(writer->WriteRecordBatch(*batch));
275 }
276
277 std::string result;
278 ASSERT_OK(writer->Finish(&result));
279
280 std::unique_ptr<JsonReader> reader;
281
282 auto buffer = std::make_shared<Buffer>(result);
283
284 ASSERT_OK(JsonReader::Open(buffer, &reader));
285 ASSERT_TRUE(reader->schema()->Equals(*schema));
286
287 ASSERT_EQ(nbatches, reader->num_record_batches());
288
289 for (int i = 0; i < nbatches; ++i) {
290 std::shared_ptr<RecordBatch> batch;
291 ASSERT_OK(reader->ReadRecordBatch(i, &batch));
292 ASSERT_TRUE(batch->Equals(*batches[i]));
293 }
294}
295
296TEST(TestJsonFileReadWrite, MinimalFormatExample) {
297 static const char* example = R"example(
298{
299 "schema": {
300 "fields": [
301 {
302 "name": "foo",
303 "type": {"name": "int", "isSigned": true, "bitWidth": 32},
304 "nullable": true, "children": [],
305 "typeLayout": {
306 "vectors": [
307 {"type": "VALIDITY", "typeBitWidth": 1},
308 {"type": "DATA", "typeBitWidth": 32}
309 ]
310 }
311 },
312 {
313 "name": "bar",
314 "type": {"name": "floatingpoint", "precision": "DOUBLE"},
315 "nullable": true, "children": [],
316 "typeLayout": {
317 "vectors": [
318 {"type": "VALIDITY", "typeBitWidth": 1},
319 {"type": "DATA", "typeBitWidth": 64}
320 ]
321 }
322 }
323 ]
324 },
325 "batches": [
326 {
327 "count": 5,
328 "columns": [
329 {
330 "name": "foo",
331 "count": 5,
332 "DATA": [1, 2, 3, 4, 5],
333 "VALIDITY": [1, 0, 1, 1, 1]
334 },
335 {
336 "name": "bar",
337 "count": 5,
338 "DATA": [1.0, 2.0, 3.0, 4.0, 5.0],
339 "VALIDITY": [1, 0, 0, 1, 1]
340 }
341 ]
342 }
343 ]
344}
345)example";
346
347 auto buffer = Buffer::Wrap(example, strlen(example));
348
349 std::unique_ptr<JsonReader> reader;
350 ASSERT_OK(JsonReader::Open(buffer, &reader));
351
352 Schema ex_schema({field("foo", int32()), field("bar", float64())});
353
354 ASSERT_TRUE(reader->schema()->Equals(ex_schema));
355 ASSERT_EQ(1, reader->num_record_batches());
356
357 std::shared_ptr<RecordBatch> batch;
358 ASSERT_OK(reader->ReadRecordBatch(0, &batch));
359
360 std::vector<bool> foo_valid = {true, false, true, true, true};
361 std::vector<int32_t> foo_values = {1, 2, 3, 4, 5};
362 std::shared_ptr<Array> foo;
363 ArrayFromVector<Int32Type, int32_t>(foo_valid, foo_values, &foo);
364 ASSERT_TRUE(batch->column(0)->Equals(foo));
365
366 std::vector<bool> bar_valid = {true, false, false, true, true};
367 std::vector<double> bar_values = {1, 2, 3, 4, 5};
368 std::shared_ptr<Array> bar;
369 ArrayFromVector<DoubleType, double>(bar_valid, bar_values, &bar);
370 ASSERT_TRUE(batch->column(1)->Equals(bar));
371}
372
373#define BATCH_CASES() \
374 ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
375 &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, \
376 &MakeStringTypesRecordBatchWithNulls, &MakeStruct, &MakeUnion, \
377 &MakeDates, &MakeTimestamps, &MakeTimes, &MakeFWBinary, \
378 &MakeDecimal, &MakeDictionary);
379
380class TestJsonRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*> {
381 public:
382 void SetUp() {}
383 void TearDown() {}
384};
385
386void CheckRoundtrip(const RecordBatch& batch) {
387 TestSchemaRoundTrip(*batch.schema());
388
389 std::unique_ptr<JsonWriter> writer;
390 ASSERT_OK(JsonWriter::Open(batch.schema(), &writer));
391 ASSERT_OK(writer->WriteRecordBatch(batch));
392
393 std::string result;
394 ASSERT_OK(writer->Finish(&result));
395
396 auto buffer = std::make_shared<Buffer>(result);
397
398 std::unique_ptr<JsonReader> reader;
399 ASSERT_OK(JsonReader::Open(buffer, &reader));
400
401 std::shared_ptr<RecordBatch> result_batch;
402 ASSERT_OK(reader->ReadRecordBatch(0, &result_batch));
403
404 CompareBatch(batch, *result_batch);
405}
406
407TEST_P(TestJsonRoundTrip, RoundTrip) {
408 std::shared_ptr<RecordBatch> batch;
409 ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
410
411 CheckRoundtrip(*batch);
412}
413
414INSTANTIATE_TEST_CASE_P(TestJsonRoundTrip, TestJsonRoundTrip, BATCH_CASES());
415
416} // namespace json
417} // namespace internal
418} // namespace ipc
419} // namespace arrow
420