1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <cstdint> |
19 | #include <cstdio> |
20 | #include <cstring> |
21 | #include <memory> |
22 | #include <string> |
23 | #include <vector> |
24 | |
25 | #include <gtest/gtest.h> |
26 | |
27 | #include "arrow/array.h" |
28 | #include "arrow/buffer.h" |
29 | #include "arrow/builder.h" |
30 | #include "arrow/ipc/json-internal.h" |
31 | #include "arrow/ipc/json.h" |
32 | #include "arrow/ipc/test-common.h" |
33 | #include "arrow/memory_pool.h" |
34 | #include "arrow/record_batch.h" |
35 | #include "arrow/test-util.h" |
36 | #include "arrow/type.h" |
37 | #include "arrow/type_traits.h" |
38 | |
39 | namespace arrow { |
40 | namespace ipc { |
41 | namespace internal { |
42 | namespace json { |
43 | |
44 | void TestSchemaRoundTrip(const Schema& schema) { |
45 | rj::StringBuffer sb; |
46 | rj::Writer<rj::StringBuffer> writer(sb); |
47 | |
48 | writer.StartObject(); |
49 | ASSERT_OK(WriteSchema(schema, &writer)); |
50 | writer.EndObject(); |
51 | |
52 | std::string json_schema = sb.GetString(); |
53 | |
54 | rj::Document d; |
55 | d.Parse(json_schema); |
56 | |
57 | std::shared_ptr<Schema> out; |
58 | if (!ReadSchema(d, default_memory_pool(), &out).ok()) { |
59 | FAIL() << "Unable to read JSON schema: " << json_schema; |
60 | } |
61 | |
62 | if (!schema.Equals(*out)) { |
63 | FAIL() << "In schema: " << schema.ToString() << "\nOut schema: " << out->ToString(); |
64 | } |
65 | } |
66 | |
67 | void TestArrayRoundTrip(const Array& array) { |
68 | static std::string name = "dummy" ; |
69 | |
70 | rj::StringBuffer sb; |
71 | rj::Writer<rj::StringBuffer> writer(sb); |
72 | |
73 | ASSERT_OK(WriteArray(name, array, &writer)); |
74 | |
75 | std::string array_as_json = sb.GetString(); |
76 | |
77 | rj::Document d; |
78 | d.Parse(array_as_json); |
79 | |
80 | if (d.HasParseError()) { |
81 | FAIL() << "JSON parsing failed" ; |
82 | } |
83 | |
84 | std::shared_ptr<Array> out; |
85 | ASSERT_OK(ReadArray(default_memory_pool(), d, array.type(), &out)); |
86 | |
87 | // std::cout << array_as_json << std::endl; |
88 | CompareArraysDetailed(0, *out, array); |
89 | } |
90 | |
91 | template <typename T, typename ValueType> |
92 | void CheckPrimitive(const std::shared_ptr<DataType>& type, |
93 | const std::vector<bool>& is_valid, |
94 | const std::vector<ValueType>& values) { |
95 | MemoryPool* pool = default_memory_pool(); |
96 | typename TypeTraits<T>::BuilderType builder(pool); |
97 | |
98 | for (size_t i = 0; i < values.size(); ++i) { |
99 | if (is_valid[i]) { |
100 | ASSERT_OK(builder.Append(values[i])); |
101 | } else { |
102 | ASSERT_OK(builder.AppendNull()); |
103 | } |
104 | } |
105 | |
106 | std::shared_ptr<Array> array; |
107 | ASSERT_OK(builder.Finish(&array)); |
108 | TestArrayRoundTrip(*array.get()); |
109 | } |
110 | |
111 | TEST(TestJsonSchemaWriter, FlatTypes) { |
112 | // TODO |
113 | // field("f14", date32()) |
114 | std::vector<std::shared_ptr<Field>> fields = { |
115 | field("f0" , int8()), |
116 | field("f1" , int16(), false), |
117 | field("f2" , int32()), |
118 | field("f3" , int64(), false), |
119 | field("f4" , uint8()), |
120 | field("f5" , uint16()), |
121 | field("f6" , uint32()), |
122 | field("f7" , uint64()), |
123 | field("f8" , float32()), |
124 | field("f9" , float64()), |
125 | field("f10" , utf8()), |
126 | field("f11" , binary()), |
127 | field("f12" , list(int32())), |
128 | field("f13" , struct_({field("s1" , int32()), field("s2" , utf8())})), |
129 | field("f15" , date64()), |
130 | field("f16" , timestamp(TimeUnit::NANO)), |
131 | field("f17" , time64(TimeUnit::MICRO)), |
132 | field("f18" , union_({field("u1" , int8()), field("u2" , time32(TimeUnit::MILLI))}, |
133 | {0, 1}, UnionMode::DENSE))}; |
134 | |
135 | Schema schema(fields); |
136 | TestSchemaRoundTrip(schema); |
137 | } |
138 | |
139 | template <typename T> |
140 | void PrimitiveTypesCheckOne() { |
141 | using c_type = typename T::c_type; |
142 | |
143 | std::vector<bool> is_valid = {true, false, true, true, true, false, true, true}; |
144 | std::vector<c_type> values = {0, 1, 2, 3, 4, 5, 6, 7}; |
145 | CheckPrimitive<T, c_type>(std::make_shared<T>(), is_valid, values); |
146 | } |
147 | |
148 | TEST(TestJsonArrayWriter, PrimitiveTypes) { |
149 | PrimitiveTypesCheckOne<Int8Type>(); |
150 | PrimitiveTypesCheckOne<Int16Type>(); |
151 | PrimitiveTypesCheckOne<Int32Type>(); |
152 | PrimitiveTypesCheckOne<Int64Type>(); |
153 | PrimitiveTypesCheckOne<UInt8Type>(); |
154 | PrimitiveTypesCheckOne<UInt16Type>(); |
155 | PrimitiveTypesCheckOne<UInt32Type>(); |
156 | PrimitiveTypesCheckOne<UInt64Type>(); |
157 | PrimitiveTypesCheckOne<FloatType>(); |
158 | PrimitiveTypesCheckOne<DoubleType>(); |
159 | |
160 | std::vector<bool> is_valid = {true, false, true, true, true, false, true, true}; |
161 | std::vector<std::string> values = {"foo" , "bar" , "" , "baz" , "qux" , "foo" , "a" , "1" }; |
162 | |
163 | CheckPrimitive<StringType, std::string>(utf8(), is_valid, values); |
164 | CheckPrimitive<BinaryType, std::string>(binary(), is_valid, values); |
165 | } |
166 | |
167 | TEST(TestJsonArrayWriter, NestedTypes) { |
168 | auto value_type = int32(); |
169 | |
170 | std::vector<bool> values_is_valid = {true, false, true, true, false, true, true}; |
171 | |
172 | std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6}; |
173 | std::shared_ptr<Array> values_array; |
174 | ArrayFromVector<Int32Type, int32_t>(values_is_valid, values, &values_array); |
175 | |
176 | std::vector<int16_t> i16_values = {0, 1, 2, 3, 4, 5, 6}; |
177 | std::shared_ptr<Array> i16_values_array; |
178 | ArrayFromVector<Int16Type, int16_t>(values_is_valid, i16_values, &i16_values_array); |
179 | |
180 | // List |
181 | std::vector<bool> list_is_valid = {true, false, true, true, true}; |
182 | std::vector<int32_t> offsets = {0, 0, 0, 1, 4, 7}; |
183 | |
184 | std::shared_ptr<Buffer> list_bitmap; |
185 | ASSERT_OK(GetBitmapFromVector(list_is_valid, &list_bitmap)); |
186 | std::shared_ptr<Buffer> offsets_buffer = Buffer::Wrap(offsets); |
187 | |
188 | ListArray list_array(list(value_type), 5, offsets_buffer, values_array, list_bitmap, 1); |
189 | |
190 | TestArrayRoundTrip(list_array); |
191 | |
192 | // Struct |
193 | std::vector<bool> struct_is_valid = {true, false, true, true, true, false, true}; |
194 | std::shared_ptr<Buffer> struct_bitmap; |
195 | ASSERT_OK(GetBitmapFromVector(struct_is_valid, &struct_bitmap)); |
196 | |
197 | auto struct_type = |
198 | struct_({field("f1" , int32()), field("f2" , int32()), field("f3" , int32())}); |
199 | |
200 | std::vector<std::shared_ptr<Array>> fields = {values_array, values_array, values_array}; |
201 | StructArray struct_array(struct_type, static_cast<int>(struct_is_valid.size()), fields, |
202 | struct_bitmap, 2); |
203 | TestArrayRoundTrip(struct_array); |
204 | } |
205 | |
206 | TEST(TestJsonArrayWriter, Unions) { |
207 | std::shared_ptr<RecordBatch> batch; |
208 | ASSERT_OK(MakeUnion(&batch)); |
209 | |
210 | for (int i = 0; i < batch->num_columns(); ++i) { |
211 | std::shared_ptr<Array> col = batch->column(i); |
212 | TestArrayRoundTrip(*col.get()); |
213 | } |
214 | } |
215 | |
216 | // Data generation for test case below |
217 | void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows, |
218 | std::vector<std::shared_ptr<Array>>* arrays) { |
219 | std::vector<bool> is_valid; |
220 | random_is_valid(num_rows, 0.25, &is_valid); |
221 | |
222 | std::vector<int8_t> v1_values; |
223 | std::vector<int32_t> v2_values; |
224 | |
225 | randint(num_rows, 0, 100, &v1_values); |
226 | randint(num_rows, 0, 100, &v2_values); |
227 | |
228 | std::shared_ptr<Array> v1; |
229 | ArrayFromVector<Int8Type, int8_t>(is_valid, v1_values, &v1); |
230 | |
231 | std::shared_ptr<Array> v2; |
232 | ArrayFromVector<Int32Type, int32_t>(is_valid, v2_values, &v2); |
233 | |
234 | static const int kBufferSize = 10; |
235 | static uint8_t buffer[kBufferSize]; |
236 | static uint32_t seed = 0; |
237 | StringBuilder string_builder; |
238 | for (int i = 0; i < num_rows; ++i) { |
239 | if (!is_valid[i]) { |
240 | ASSERT_OK(string_builder.AppendNull()); |
241 | } else { |
242 | random_ascii(kBufferSize, seed++, buffer); |
243 | ASSERT_OK(string_builder.Append(buffer, kBufferSize)); |
244 | } |
245 | } |
246 | std::shared_ptr<Array> v3; |
247 | ASSERT_OK(string_builder.Finish(&v3)); |
248 | |
249 | arrays->emplace_back(v1); |
250 | arrays->emplace_back(v2); |
251 | arrays->emplace_back(v3); |
252 | } |
253 | |
254 | TEST(TestJsonFileReadWrite, BasicRoundTrip) { |
255 | auto v1_type = int8(); |
256 | auto v2_type = int32(); |
257 | auto v3_type = utf8(); |
258 | |
259 | auto schema = |
260 | ::arrow::schema({field("f1" , v1_type), field("f2" , v2_type), field("f3" , v3_type)}); |
261 | |
262 | std::unique_ptr<JsonWriter> writer; |
263 | ASSERT_OK(JsonWriter::Open(schema, &writer)); |
264 | |
265 | const int nbatches = 3; |
266 | std::vector<std::shared_ptr<RecordBatch>> batches; |
267 | for (int i = 0; i < nbatches; ++i) { |
268 | int num_rows = 5 + i * 5; |
269 | std::vector<std::shared_ptr<Array>> arrays; |
270 | |
271 | MakeBatchArrays(schema, num_rows, &arrays); |
272 | auto batch = RecordBatch::Make(schema, num_rows, arrays); |
273 | batches.push_back(batch); |
274 | ASSERT_OK(writer->WriteRecordBatch(*batch)); |
275 | } |
276 | |
277 | std::string result; |
278 | ASSERT_OK(writer->Finish(&result)); |
279 | |
280 | std::unique_ptr<JsonReader> reader; |
281 | |
282 | auto buffer = std::make_shared<Buffer>(result); |
283 | |
284 | ASSERT_OK(JsonReader::Open(buffer, &reader)); |
285 | ASSERT_TRUE(reader->schema()->Equals(*schema)); |
286 | |
287 | ASSERT_EQ(nbatches, reader->num_record_batches()); |
288 | |
289 | for (int i = 0; i < nbatches; ++i) { |
290 | std::shared_ptr<RecordBatch> batch; |
291 | ASSERT_OK(reader->ReadRecordBatch(i, &batch)); |
292 | ASSERT_TRUE(batch->Equals(*batches[i])); |
293 | } |
294 | } |
295 | |
296 | TEST(TestJsonFileReadWrite, MinimalFormatExample) { |
297 | static const char* example = R"example( |
298 | { |
299 | "schema": { |
300 | "fields": [ |
301 | { |
302 | "name": "foo", |
303 | "type": {"name": "int", "isSigned": true, "bitWidth": 32}, |
304 | "nullable": true, "children": [], |
305 | "typeLayout": { |
306 | "vectors": [ |
307 | {"type": "VALIDITY", "typeBitWidth": 1}, |
308 | {"type": "DATA", "typeBitWidth": 32} |
309 | ] |
310 | } |
311 | }, |
312 | { |
313 | "name": "bar", |
314 | "type": {"name": "floatingpoint", "precision": "DOUBLE"}, |
315 | "nullable": true, "children": [], |
316 | "typeLayout": { |
317 | "vectors": [ |
318 | {"type": "VALIDITY", "typeBitWidth": 1}, |
319 | {"type": "DATA", "typeBitWidth": 64} |
320 | ] |
321 | } |
322 | } |
323 | ] |
324 | }, |
325 | "batches": [ |
326 | { |
327 | "count": 5, |
328 | "columns": [ |
329 | { |
330 | "name": "foo", |
331 | "count": 5, |
332 | "DATA": [1, 2, 3, 4, 5], |
333 | "VALIDITY": [1, 0, 1, 1, 1] |
334 | }, |
335 | { |
336 | "name": "bar", |
337 | "count": 5, |
338 | "DATA": [1.0, 2.0, 3.0, 4.0, 5.0], |
339 | "VALIDITY": [1, 0, 0, 1, 1] |
340 | } |
341 | ] |
342 | } |
343 | ] |
344 | } |
345 | )example" ; |
346 | |
347 | auto buffer = Buffer::Wrap(example, strlen(example)); |
348 | |
349 | std::unique_ptr<JsonReader> reader; |
350 | ASSERT_OK(JsonReader::Open(buffer, &reader)); |
351 | |
352 | Schema ex_schema({field("foo" , int32()), field("bar" , float64())}); |
353 | |
354 | ASSERT_TRUE(reader->schema()->Equals(ex_schema)); |
355 | ASSERT_EQ(1, reader->num_record_batches()); |
356 | |
357 | std::shared_ptr<RecordBatch> batch; |
358 | ASSERT_OK(reader->ReadRecordBatch(0, &batch)); |
359 | |
360 | std::vector<bool> foo_valid = {true, false, true, true, true}; |
361 | std::vector<int32_t> foo_values = {1, 2, 3, 4, 5}; |
362 | std::shared_ptr<Array> foo; |
363 | ArrayFromVector<Int32Type, int32_t>(foo_valid, foo_values, &foo); |
364 | ASSERT_TRUE(batch->column(0)->Equals(foo)); |
365 | |
366 | std::vector<bool> bar_valid = {true, false, false, true, true}; |
367 | std::vector<double> bar_values = {1, 2, 3, 4, 5}; |
368 | std::shared_ptr<Array> bar; |
369 | ArrayFromVector<DoubleType, double>(bar_valid, bar_values, &bar); |
370 | ASSERT_TRUE(batch->column(1)->Equals(bar)); |
371 | } |
372 | |
373 | #define BATCH_CASES() \ |
374 | ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \ |
375 | &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, \ |
376 | &MakeStringTypesRecordBatchWithNulls, &MakeStruct, &MakeUnion, \ |
377 | &MakeDates, &MakeTimestamps, &MakeTimes, &MakeFWBinary, \ |
378 | &MakeDecimal, &MakeDictionary); |
379 | |
380 | class TestJsonRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*> { |
381 | public: |
382 | void SetUp() {} |
383 | void TearDown() {} |
384 | }; |
385 | |
386 | void CheckRoundtrip(const RecordBatch& batch) { |
387 | TestSchemaRoundTrip(*batch.schema()); |
388 | |
389 | std::unique_ptr<JsonWriter> writer; |
390 | ASSERT_OK(JsonWriter::Open(batch.schema(), &writer)); |
391 | ASSERT_OK(writer->WriteRecordBatch(batch)); |
392 | |
393 | std::string result; |
394 | ASSERT_OK(writer->Finish(&result)); |
395 | |
396 | auto buffer = std::make_shared<Buffer>(result); |
397 | |
398 | std::unique_ptr<JsonReader> reader; |
399 | ASSERT_OK(JsonReader::Open(buffer, &reader)); |
400 | |
401 | std::shared_ptr<RecordBatch> result_batch; |
402 | ASSERT_OK(reader->ReadRecordBatch(0, &result_batch)); |
403 | |
404 | CompareBatch(batch, *result_batch); |
405 | } |
406 | |
407 | TEST_P(TestJsonRoundTrip, RoundTrip) { |
408 | std::shared_ptr<RecordBatch> batch; |
409 | ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue |
410 | |
411 | CheckRoundtrip(*batch); |
412 | } |
413 | |
414 | INSTANTIATE_TEST_CASE_P(TestJsonRoundTrip, TestJsonRoundTrip, BATCH_CASES()); |
415 | |
416 | } // namespace json |
417 | } // namespace internal |
418 | } // namespace ipc |
419 | } // namespace arrow |
420 | |