1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// ARROW-3837: Depending on how gflags was built, this might be set to
19// 1 or 0 by default, making it so that we cannot statically link
20// gflags on Windows if it is set to 1
21#define GFLAGS_IS_A_DLL 0
22
23#include <cstdint>
24#include <cstdio>
25#include <cstring>
26#include <fstream> // IWYU pragma: keep
27#include <iostream>
28#include <memory>
29#include <string>
30#include <vector>
31
32#include <gflags/gflags.h>
33#include <gtest/gtest.h>
34
35#include <boost/filesystem.hpp> // NOLINT
36
37#include "arrow/io/file.h"
38#include "arrow/ipc/json.h"
39#include "arrow/ipc/reader.h"
40#include "arrow/ipc/writer.h"
41#include "arrow/pretty_print.h"
42#include "arrow/record_batch.h"
43#include "arrow/status.h"
44#include "arrow/test-util.h"
45#include "arrow/type.h"
46
47DEFINE_string(arrow, "", "Arrow file name");
48DEFINE_string(json, "", "JSON file name");
49DEFINE_string(
50 mode, "VALIDATE",
51 "Mode of integration testing tool (ARROW_TO_JSON, JSON_TO_ARROW, VALIDATE)");
52DEFINE_bool(integration, false, "Run in integration test mode");
53DEFINE_bool(verbose, true, "Verbose output");
54
55namespace fs = boost::filesystem;
56
57namespace arrow {
58
59class Buffer;
60
61namespace ipc {
62
63bool file_exists(const char* path) {
64 std::ifstream handle(path);
65 return handle.good();
66}
67
68// Convert JSON file to IPC binary format
69static Status ConvertJsonToArrow(const std::string& json_path,
70 const std::string& arrow_path) {
71 std::shared_ptr<io::ReadableFile> in_file;
72 std::shared_ptr<io::FileOutputStream> out_file;
73
74 RETURN_NOT_OK(io::ReadableFile::Open(json_path, &in_file));
75 RETURN_NOT_OK(io::FileOutputStream::Open(arrow_path, &out_file));
76
77 int64_t file_size = 0;
78 RETURN_NOT_OK(in_file->GetSize(&file_size));
79
80 std::shared_ptr<Buffer> json_buffer;
81 RETURN_NOT_OK(in_file->Read(file_size, &json_buffer));
82
83 std::unique_ptr<internal::json::JsonReader> reader;
84 RETURN_NOT_OK(internal::json::JsonReader::Open(json_buffer, &reader));
85
86 if (FLAGS_verbose) {
87 std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
88 }
89
90 std::shared_ptr<RecordBatchWriter> writer;
91 RETURN_NOT_OK(RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer));
92
93 for (int i = 0; i < reader->num_record_batches(); ++i) {
94 std::shared_ptr<RecordBatch> batch;
95 RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
96 RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
97 }
98 return writer->Close();
99}
100
101// Convert IPC binary format to JSON
102static Status ConvertArrowToJson(const std::string& arrow_path,
103 const std::string& json_path) {
104 std::shared_ptr<io::ReadableFile> in_file;
105 std::shared_ptr<io::FileOutputStream> out_file;
106
107 RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file));
108 RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file));
109
110 std::shared_ptr<RecordBatchFileReader> reader;
111 RETURN_NOT_OK(RecordBatchFileReader::Open(in_file.get(), &reader));
112
113 if (FLAGS_verbose) {
114 std::cout << "Found schema: " << reader->schema()->ToString() << std::endl;
115 }
116
117 std::unique_ptr<internal::json::JsonWriter> writer;
118 RETURN_NOT_OK(internal::json::JsonWriter::Open(reader->schema(), &writer));
119
120 for (int i = 0; i < reader->num_record_batches(); ++i) {
121 std::shared_ptr<RecordBatch> batch;
122 RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
123 RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
124 }
125
126 std::string result;
127 RETURN_NOT_OK(writer->Finish(&result));
128 return out_file->Write(result.c_str(), static_cast<int64_t>(result.size()));
129}
130
131static Status ValidateArrowVsJson(const std::string& arrow_path,
132 const std::string& json_path) {
133 // Construct JSON reader
134 std::shared_ptr<io::ReadableFile> json_file;
135 RETURN_NOT_OK(io::ReadableFile::Open(json_path, &json_file));
136
137 int64_t file_size = 0;
138 RETURN_NOT_OK(json_file->GetSize(&file_size));
139
140 std::shared_ptr<Buffer> json_buffer;
141 RETURN_NOT_OK(json_file->Read(file_size, &json_buffer));
142
143 std::unique_ptr<internal::json::JsonReader> json_reader;
144 RETURN_NOT_OK(internal::json::JsonReader::Open(json_buffer, &json_reader));
145
146 // Construct Arrow reader
147 std::shared_ptr<io::ReadableFile> arrow_file;
148 RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file));
149
150 std::shared_ptr<RecordBatchFileReader> arrow_reader;
151 RETURN_NOT_OK(RecordBatchFileReader::Open(arrow_file.get(), &arrow_reader));
152
153 auto json_schema = json_reader->schema();
154 auto arrow_schema = arrow_reader->schema();
155
156 if (!json_schema->Equals(*arrow_schema)) {
157 std::stringstream ss;
158 ss << "JSON schema: \n"
159 << json_schema->ToString() << "\n"
160 << "Arrow schema: \n"
161 << arrow_schema->ToString();
162
163 if (FLAGS_verbose) {
164 std::cout << ss.str() << std::endl;
165 }
166 return Status::Invalid("Schemas did not match");
167 }
168
169 const int json_nbatches = json_reader->num_record_batches();
170 const int arrow_nbatches = arrow_reader->num_record_batches();
171
172 if (json_nbatches != arrow_nbatches) {
173 return Status::Invalid("Different number of record batches: ", json_nbatches,
174 " (JSON) vs ", arrow_nbatches, " (Arrow)");
175 }
176
177 std::shared_ptr<RecordBatch> arrow_batch;
178 std::shared_ptr<RecordBatch> json_batch;
179 for (int i = 0; i < json_nbatches; ++i) {
180 RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch));
181 RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch));
182
183 if (!json_batch->ApproxEquals(*arrow_batch)) {
184 std::stringstream ss;
185 ss << "Record batch " << i << " did not match";
186
187 ss << "\nJSON:\n";
188 RETURN_NOT_OK(PrettyPrint(*json_batch, 0, &ss));
189
190 ss << "\nArrow:\n";
191 RETURN_NOT_OK(PrettyPrint(*arrow_batch, 0, &ss));
192 return Status::Invalid(ss.str());
193 }
194 }
195
196 return Status::OK();
197}
198
199Status RunCommand(const std::string& json_path, const std::string& arrow_path,
200 const std::string& command) {
201 if (json_path == "") {
202 return Status::Invalid("Must specify json file name");
203 }
204
205 if (arrow_path == "") {
206 return Status::Invalid("Must specify arrow file name");
207 }
208
209 if (command == "ARROW_TO_JSON") {
210 if (!file_exists(arrow_path.c_str())) {
211 return Status::Invalid("Input file does not exist");
212 }
213
214 return ConvertArrowToJson(arrow_path, json_path);
215 } else if (command == "JSON_TO_ARROW") {
216 if (!file_exists(json_path.c_str())) {
217 return Status::Invalid("Input file does not exist");
218 }
219
220 return ConvertJsonToArrow(json_path, arrow_path);
221 } else if (command == "VALIDATE") {
222 if (!file_exists(json_path.c_str())) {
223 return Status::Invalid("JSON file does not exist");
224 }
225
226 if (!file_exists(arrow_path.c_str())) {
227 return Status::Invalid("Arrow file does not exist");
228 }
229
230 return ValidateArrowVsJson(arrow_path, json_path);
231 } else {
232 return Status::Invalid("Unknown command: ", command);
233 }
234}
235
236static std::string temp_path() {
237 return (fs::temp_directory_path() / fs::unique_path()).string();
238}
239
240class TestJSONIntegration : public ::testing::Test {
241 public:
242 void SetUp() {}
243
244 std::string mkstemp() {
245 auto path = temp_path();
246 tmp_paths_.push_back(path);
247 return path;
248 }
249
250 Status WriteJson(const char* data, const std::string& path) {
251 do {
252 std::shared_ptr<io::FileOutputStream> out;
253 RETURN_NOT_OK(io::FileOutputStream::Open(path, &out));
254 RETURN_NOT_OK(out->Write(data, static_cast<int64_t>(strlen(data))));
255 } while (0);
256 return Status::OK();
257 }
258
259 void TearDown() {
260 for (const std::string path : tmp_paths_) {
261 ARROW_UNUSED(std::remove(path.c_str()));
262 }
263 }
264
265 protected:
266 std::vector<std::string> tmp_paths_;
267};
268
269static const char* JSON_EXAMPLE = R"example(
270{
271 "schema": {
272 "fields": [
273 {
274 "name": "foo",
275 "type": {"name": "int", "isSigned": true, "bitWidth": 32},
276 "nullable": true, "children": [],
277 "typeLayout": {
278 "vectors": [
279 {"type": "VALIDITY", "typeBitWidth": 1},
280 {"type": "DATA", "typeBitWidth": 32}
281 ]
282 }
283 },
284 {
285 "name": "bar",
286 "type": {"name": "floatingpoint", "precision": "DOUBLE"},
287 "nullable": true, "children": [],
288 "typeLayout": {
289 "vectors": [
290 {"type": "VALIDITY", "typeBitWidth": 1},
291 {"type": "DATA", "typeBitWidth": 64}
292 ]
293 }
294 }
295 ]
296 },
297 "batches": [
298 {
299 "count": 5,
300 "columns": [
301 {
302 "name": "foo",
303 "count": 5,
304 "DATA": [1, 2, 3, 4, 5],
305 "VALIDITY": [1, 0, 1, 1, 1]
306 },
307 {
308 "name": "bar",
309 "count": 5,
310 "DATA": [1.0, 2.0, 3.0, 4.0, 5.0],
311 "VALIDITY": [1, 0, 0, 1, 1]
312 }
313 ]
314 },
315 {
316 "count": 4,
317 "columns": [
318 {
319 "name": "foo",
320 "count": 4,
321 "DATA": [1, 2, 3, 4],
322 "VALIDITY": [1, 0, 1, 1]
323 },
324 {
325 "name": "bar",
326 "count": 4,
327 "DATA": [1.0, 2.0, 3.0, 4.0],
328 "VALIDITY": [1, 0, 0, 1]
329 }
330 ]
331 }
332 ]
333}
334)example";
335
336static const char* JSON_EXAMPLE2 = R"example(
337{
338 "schema": {
339 "fields": [
340 {
341 "name": "foo",
342 "type": {"name": "int", "isSigned": true, "bitWidth": 32},
343 "nullable": true, "children": [],
344 "typeLayout": {
345 "vectors": [
346 {"type": "VALIDITY", "typeBitWidth": 1},
347 {"type": "DATA", "typeBitWidth": 32}
348 ]
349 }
350 }
351 ]
352 },
353 "batches": [
354 {
355 "count": 5,
356 "columns": [
357 {
358 "name": "foo",
359 "count": 5,
360 "DATA": [1, 2, 3, 4, 5],
361 "VALIDITY": [1, 0, 1, 1, 1]
362 }
363 ]
364 }
365 ]
366}
367)example";
368
369TEST_F(TestJSONIntegration, ConvertAndValidate) {
370 std::string json_path = this->mkstemp();
371 std::string arrow_path = this->mkstemp();
372
373 ASSERT_OK(WriteJson(JSON_EXAMPLE, json_path));
374
375 ASSERT_OK(RunCommand(json_path, arrow_path, "JSON_TO_ARROW"));
376 ASSERT_OK(RunCommand(json_path, arrow_path, "VALIDATE"));
377
378 // Convert and overwrite
379 ASSERT_OK(RunCommand(json_path, arrow_path, "ARROW_TO_JSON"));
380
381 // Convert back to arrow, and validate
382 ASSERT_OK(RunCommand(json_path, arrow_path, "JSON_TO_ARROW"));
383 ASSERT_OK(RunCommand(json_path, arrow_path, "VALIDATE"));
384}
385
386TEST_F(TestJSONIntegration, ErrorStates) {
387 std::string json_path = this->mkstemp();
388 std::string json_path2 = this->mkstemp();
389 std::string arrow_path = this->mkstemp();
390
391 ASSERT_OK(WriteJson(JSON_EXAMPLE, json_path));
392 ASSERT_OK(WriteJson(JSON_EXAMPLE2, json_path2));
393
394 ASSERT_OK(ConvertJsonToArrow(json_path, arrow_path));
395 ASSERT_RAISES(Invalid, ValidateArrowVsJson(arrow_path, json_path2));
396
397 ASSERT_RAISES(IOError, ValidateArrowVsJson("does_not_exist-1234", json_path2));
398 ASSERT_RAISES(IOError, ValidateArrowVsJson(arrow_path, "does_not_exist-1234"));
399
400 ASSERT_RAISES(Invalid, RunCommand("", arrow_path, "VALIDATE"));
401 ASSERT_RAISES(Invalid, RunCommand(json_path, "", "VALIDATE"));
402}
403
404} // namespace ipc
405} // namespace arrow
406
407int main(int argc, char** argv) {
408 gflags::ParseCommandLineFlags(&argc, &argv, true);
409
410 int ret = 0;
411
412 if (FLAGS_integration) {
413 arrow::Status result = arrow::ipc::RunCommand(FLAGS_json, FLAGS_arrow, FLAGS_mode);
414 if (!result.ok()) {
415 std::cout << "Error message: " << result.ToString() << std::endl;
416 ret = 1;
417 }
418 } else {
419 ::testing::InitGoogleTest(&argc, argv);
420 ret = RUN_ALL_TESTS();
421 }
422 gflags::ShutDownCommandLineFlags();
423 return ret;
424}
425