1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // ARROW-3837: Depending on how gflags was built, this might be set to |
19 | // 1 or 0 by default, making it so that we cannot statically link |
20 | // gflags on Windows if it is set to 1 |
21 | #define GFLAGS_IS_A_DLL 0 |
22 | |
23 | #include <cstdint> |
24 | #include <cstdio> |
25 | #include <cstring> |
26 | #include <fstream> // IWYU pragma: keep |
27 | #include <iostream> |
28 | #include <memory> |
29 | #include <string> |
30 | #include <vector> |
31 | |
32 | #include <gflags/gflags.h> |
33 | #include <gtest/gtest.h> |
34 | |
35 | #include <boost/filesystem.hpp> // NOLINT |
36 | |
37 | #include "arrow/io/file.h" |
38 | #include "arrow/ipc/json.h" |
39 | #include "arrow/ipc/reader.h" |
40 | #include "arrow/ipc/writer.h" |
41 | #include "arrow/pretty_print.h" |
42 | #include "arrow/record_batch.h" |
43 | #include "arrow/status.h" |
44 | #include "arrow/test-util.h" |
45 | #include "arrow/type.h" |
46 | |
47 | DEFINE_string(arrow, "" , "Arrow file name" ); |
48 | DEFINE_string(json, "" , "JSON file name" ); |
49 | DEFINE_string( |
50 | mode, "VALIDATE" , |
51 | "Mode of integration testing tool (ARROW_TO_JSON, JSON_TO_ARROW, VALIDATE)" ); |
52 | DEFINE_bool(integration, false, "Run in integration test mode" ); |
53 | DEFINE_bool(verbose, true, "Verbose output" ); |
54 | |
55 | namespace fs = boost::filesystem; |
56 | |
57 | namespace arrow { |
58 | |
59 | class Buffer; |
60 | |
61 | namespace ipc { |
62 | |
63 | bool file_exists(const char* path) { |
64 | std::ifstream handle(path); |
65 | return handle.good(); |
66 | } |
67 | |
68 | // Convert JSON file to IPC binary format |
69 | static Status ConvertJsonToArrow(const std::string& json_path, |
70 | const std::string& arrow_path) { |
71 | std::shared_ptr<io::ReadableFile> in_file; |
72 | std::shared_ptr<io::FileOutputStream> out_file; |
73 | |
74 | RETURN_NOT_OK(io::ReadableFile::Open(json_path, &in_file)); |
75 | RETURN_NOT_OK(io::FileOutputStream::Open(arrow_path, &out_file)); |
76 | |
77 | int64_t file_size = 0; |
78 | RETURN_NOT_OK(in_file->GetSize(&file_size)); |
79 | |
80 | std::shared_ptr<Buffer> json_buffer; |
81 | RETURN_NOT_OK(in_file->Read(file_size, &json_buffer)); |
82 | |
83 | std::unique_ptr<internal::json::JsonReader> reader; |
84 | RETURN_NOT_OK(internal::json::JsonReader::Open(json_buffer, &reader)); |
85 | |
86 | if (FLAGS_verbose) { |
87 | std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; |
88 | } |
89 | |
90 | std::shared_ptr<RecordBatchWriter> writer; |
91 | RETURN_NOT_OK(RecordBatchFileWriter::Open(out_file.get(), reader->schema(), &writer)); |
92 | |
93 | for (int i = 0; i < reader->num_record_batches(); ++i) { |
94 | std::shared_ptr<RecordBatch> batch; |
95 | RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch)); |
96 | RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); |
97 | } |
98 | return writer->Close(); |
99 | } |
100 | |
101 | // Convert IPC binary format to JSON |
102 | static Status ConvertArrowToJson(const std::string& arrow_path, |
103 | const std::string& json_path) { |
104 | std::shared_ptr<io::ReadableFile> in_file; |
105 | std::shared_ptr<io::FileOutputStream> out_file; |
106 | |
107 | RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &in_file)); |
108 | RETURN_NOT_OK(io::FileOutputStream::Open(json_path, &out_file)); |
109 | |
110 | std::shared_ptr<RecordBatchFileReader> reader; |
111 | RETURN_NOT_OK(RecordBatchFileReader::Open(in_file.get(), &reader)); |
112 | |
113 | if (FLAGS_verbose) { |
114 | std::cout << "Found schema: " << reader->schema()->ToString() << std::endl; |
115 | } |
116 | |
117 | std::unique_ptr<internal::json::JsonWriter> writer; |
118 | RETURN_NOT_OK(internal::json::JsonWriter::Open(reader->schema(), &writer)); |
119 | |
120 | for (int i = 0; i < reader->num_record_batches(); ++i) { |
121 | std::shared_ptr<RecordBatch> batch; |
122 | RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch)); |
123 | RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); |
124 | } |
125 | |
126 | std::string result; |
127 | RETURN_NOT_OK(writer->Finish(&result)); |
128 | return out_file->Write(result.c_str(), static_cast<int64_t>(result.size())); |
129 | } |
130 | |
131 | static Status ValidateArrowVsJson(const std::string& arrow_path, |
132 | const std::string& json_path) { |
133 | // Construct JSON reader |
134 | std::shared_ptr<io::ReadableFile> json_file; |
135 | RETURN_NOT_OK(io::ReadableFile::Open(json_path, &json_file)); |
136 | |
137 | int64_t file_size = 0; |
138 | RETURN_NOT_OK(json_file->GetSize(&file_size)); |
139 | |
140 | std::shared_ptr<Buffer> json_buffer; |
141 | RETURN_NOT_OK(json_file->Read(file_size, &json_buffer)); |
142 | |
143 | std::unique_ptr<internal::json::JsonReader> json_reader; |
144 | RETURN_NOT_OK(internal::json::JsonReader::Open(json_buffer, &json_reader)); |
145 | |
146 | // Construct Arrow reader |
147 | std::shared_ptr<io::ReadableFile> arrow_file; |
148 | RETURN_NOT_OK(io::ReadableFile::Open(arrow_path, &arrow_file)); |
149 | |
150 | std::shared_ptr<RecordBatchFileReader> arrow_reader; |
151 | RETURN_NOT_OK(RecordBatchFileReader::Open(arrow_file.get(), &arrow_reader)); |
152 | |
153 | auto json_schema = json_reader->schema(); |
154 | auto arrow_schema = arrow_reader->schema(); |
155 | |
156 | if (!json_schema->Equals(*arrow_schema)) { |
157 | std::stringstream ss; |
158 | ss << "JSON schema: \n" |
159 | << json_schema->ToString() << "\n" |
160 | << "Arrow schema: \n" |
161 | << arrow_schema->ToString(); |
162 | |
163 | if (FLAGS_verbose) { |
164 | std::cout << ss.str() << std::endl; |
165 | } |
166 | return Status::Invalid("Schemas did not match" ); |
167 | } |
168 | |
169 | const int json_nbatches = json_reader->num_record_batches(); |
170 | const int arrow_nbatches = arrow_reader->num_record_batches(); |
171 | |
172 | if (json_nbatches != arrow_nbatches) { |
173 | return Status::Invalid("Different number of record batches: " , json_nbatches, |
174 | " (JSON) vs " , arrow_nbatches, " (Arrow)" ); |
175 | } |
176 | |
177 | std::shared_ptr<RecordBatch> arrow_batch; |
178 | std::shared_ptr<RecordBatch> json_batch; |
179 | for (int i = 0; i < json_nbatches; ++i) { |
180 | RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch)); |
181 | RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch)); |
182 | |
183 | if (!json_batch->ApproxEquals(*arrow_batch)) { |
184 | std::stringstream ss; |
185 | ss << "Record batch " << i << " did not match" ; |
186 | |
187 | ss << "\nJSON:\n" ; |
188 | RETURN_NOT_OK(PrettyPrint(*json_batch, 0, &ss)); |
189 | |
190 | ss << "\nArrow:\n" ; |
191 | RETURN_NOT_OK(PrettyPrint(*arrow_batch, 0, &ss)); |
192 | return Status::Invalid(ss.str()); |
193 | } |
194 | } |
195 | |
196 | return Status::OK(); |
197 | } |
198 | |
199 | Status RunCommand(const std::string& json_path, const std::string& arrow_path, |
200 | const std::string& command) { |
201 | if (json_path == "" ) { |
202 | return Status::Invalid("Must specify json file name" ); |
203 | } |
204 | |
205 | if (arrow_path == "" ) { |
206 | return Status::Invalid("Must specify arrow file name" ); |
207 | } |
208 | |
209 | if (command == "ARROW_TO_JSON" ) { |
210 | if (!file_exists(arrow_path.c_str())) { |
211 | return Status::Invalid("Input file does not exist" ); |
212 | } |
213 | |
214 | return ConvertArrowToJson(arrow_path, json_path); |
215 | } else if (command == "JSON_TO_ARROW" ) { |
216 | if (!file_exists(json_path.c_str())) { |
217 | return Status::Invalid("Input file does not exist" ); |
218 | } |
219 | |
220 | return ConvertJsonToArrow(json_path, arrow_path); |
221 | } else if (command == "VALIDATE" ) { |
222 | if (!file_exists(json_path.c_str())) { |
223 | return Status::Invalid("JSON file does not exist" ); |
224 | } |
225 | |
226 | if (!file_exists(arrow_path.c_str())) { |
227 | return Status::Invalid("Arrow file does not exist" ); |
228 | } |
229 | |
230 | return ValidateArrowVsJson(arrow_path, json_path); |
231 | } else { |
232 | return Status::Invalid("Unknown command: " , command); |
233 | } |
234 | } |
235 | |
236 | static std::string temp_path() { |
237 | return (fs::temp_directory_path() / fs::unique_path()).string(); |
238 | } |
239 | |
240 | class TestJSONIntegration : public ::testing::Test { |
241 | public: |
242 | void SetUp() {} |
243 | |
244 | std::string mkstemp() { |
245 | auto path = temp_path(); |
246 | tmp_paths_.push_back(path); |
247 | return path; |
248 | } |
249 | |
250 | Status WriteJson(const char* data, const std::string& path) { |
251 | do { |
252 | std::shared_ptr<io::FileOutputStream> out; |
253 | RETURN_NOT_OK(io::FileOutputStream::Open(path, &out)); |
254 | RETURN_NOT_OK(out->Write(data, static_cast<int64_t>(strlen(data)))); |
255 | } while (0); |
256 | return Status::OK(); |
257 | } |
258 | |
259 | void TearDown() { |
260 | for (const std::string path : tmp_paths_) { |
261 | ARROW_UNUSED(std::remove(path.c_str())); |
262 | } |
263 | } |
264 | |
265 | protected: |
266 | std::vector<std::string> tmp_paths_; |
267 | }; |
268 | |
269 | static const char* JSON_EXAMPLE = R"example( |
270 | { |
271 | "schema": { |
272 | "fields": [ |
273 | { |
274 | "name": "foo", |
275 | "type": {"name": "int", "isSigned": true, "bitWidth": 32}, |
276 | "nullable": true, "children": [], |
277 | "typeLayout": { |
278 | "vectors": [ |
279 | {"type": "VALIDITY", "typeBitWidth": 1}, |
280 | {"type": "DATA", "typeBitWidth": 32} |
281 | ] |
282 | } |
283 | }, |
284 | { |
285 | "name": "bar", |
286 | "type": {"name": "floatingpoint", "precision": "DOUBLE"}, |
287 | "nullable": true, "children": [], |
288 | "typeLayout": { |
289 | "vectors": [ |
290 | {"type": "VALIDITY", "typeBitWidth": 1}, |
291 | {"type": "DATA", "typeBitWidth": 64} |
292 | ] |
293 | } |
294 | } |
295 | ] |
296 | }, |
297 | "batches": [ |
298 | { |
299 | "count": 5, |
300 | "columns": [ |
301 | { |
302 | "name": "foo", |
303 | "count": 5, |
304 | "DATA": [1, 2, 3, 4, 5], |
305 | "VALIDITY": [1, 0, 1, 1, 1] |
306 | }, |
307 | { |
308 | "name": "bar", |
309 | "count": 5, |
310 | "DATA": [1.0, 2.0, 3.0, 4.0, 5.0], |
311 | "VALIDITY": [1, 0, 0, 1, 1] |
312 | } |
313 | ] |
314 | }, |
315 | { |
316 | "count": 4, |
317 | "columns": [ |
318 | { |
319 | "name": "foo", |
320 | "count": 4, |
321 | "DATA": [1, 2, 3, 4], |
322 | "VALIDITY": [1, 0, 1, 1] |
323 | }, |
324 | { |
325 | "name": "bar", |
326 | "count": 4, |
327 | "DATA": [1.0, 2.0, 3.0, 4.0], |
328 | "VALIDITY": [1, 0, 0, 1] |
329 | } |
330 | ] |
331 | } |
332 | ] |
333 | } |
334 | )example" ; |
335 | |
336 | static const char* JSON_EXAMPLE2 = R"example( |
337 | { |
338 | "schema": { |
339 | "fields": [ |
340 | { |
341 | "name": "foo", |
342 | "type": {"name": "int", "isSigned": true, "bitWidth": 32}, |
343 | "nullable": true, "children": [], |
344 | "typeLayout": { |
345 | "vectors": [ |
346 | {"type": "VALIDITY", "typeBitWidth": 1}, |
347 | {"type": "DATA", "typeBitWidth": 32} |
348 | ] |
349 | } |
350 | } |
351 | ] |
352 | }, |
353 | "batches": [ |
354 | { |
355 | "count": 5, |
356 | "columns": [ |
357 | { |
358 | "name": "foo", |
359 | "count": 5, |
360 | "DATA": [1, 2, 3, 4, 5], |
361 | "VALIDITY": [1, 0, 1, 1, 1] |
362 | } |
363 | ] |
364 | } |
365 | ] |
366 | } |
367 | )example" ; |
368 | |
369 | TEST_F(TestJSONIntegration, ConvertAndValidate) { |
370 | std::string json_path = this->mkstemp(); |
371 | std::string arrow_path = this->mkstemp(); |
372 | |
373 | ASSERT_OK(WriteJson(JSON_EXAMPLE, json_path)); |
374 | |
375 | ASSERT_OK(RunCommand(json_path, arrow_path, "JSON_TO_ARROW" )); |
376 | ASSERT_OK(RunCommand(json_path, arrow_path, "VALIDATE" )); |
377 | |
378 | // Convert and overwrite |
379 | ASSERT_OK(RunCommand(json_path, arrow_path, "ARROW_TO_JSON" )); |
380 | |
381 | // Convert back to arrow, and validate |
382 | ASSERT_OK(RunCommand(json_path, arrow_path, "JSON_TO_ARROW" )); |
383 | ASSERT_OK(RunCommand(json_path, arrow_path, "VALIDATE" )); |
384 | } |
385 | |
386 | TEST_F(TestJSONIntegration, ErrorStates) { |
387 | std::string json_path = this->mkstemp(); |
388 | std::string json_path2 = this->mkstemp(); |
389 | std::string arrow_path = this->mkstemp(); |
390 | |
391 | ASSERT_OK(WriteJson(JSON_EXAMPLE, json_path)); |
392 | ASSERT_OK(WriteJson(JSON_EXAMPLE2, json_path2)); |
393 | |
394 | ASSERT_OK(ConvertJsonToArrow(json_path, arrow_path)); |
395 | ASSERT_RAISES(Invalid, ValidateArrowVsJson(arrow_path, json_path2)); |
396 | |
397 | ASSERT_RAISES(IOError, ValidateArrowVsJson("does_not_exist-1234" , json_path2)); |
398 | ASSERT_RAISES(IOError, ValidateArrowVsJson(arrow_path, "does_not_exist-1234" )); |
399 | |
400 | ASSERT_RAISES(Invalid, RunCommand("" , arrow_path, "VALIDATE" )); |
401 | ASSERT_RAISES(Invalid, RunCommand(json_path, "" , "VALIDATE" )); |
402 | } |
403 | |
404 | } // namespace ipc |
405 | } // namespace arrow |
406 | |
407 | int main(int argc, char** argv) { |
408 | gflags::ParseCommandLineFlags(&argc, &argv, true); |
409 | |
410 | int ret = 0; |
411 | |
412 | if (FLAGS_integration) { |
413 | arrow::Status result = arrow::ipc::RunCommand(FLAGS_json, FLAGS_arrow, FLAGS_mode); |
414 | if (!result.ok()) { |
415 | std::cout << "Error message: " << result.ToString() << std::endl; |
416 | ret = 1; |
417 | } |
418 | } else { |
419 | ::testing::InitGoogleTest(&argc, argv); |
420 | ret = RUN_ALL_TESTS(); |
421 | } |
422 | gflags::ShutDownCommandLineFlags(); |
423 | return ret; |
424 | } |
425 | |