1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "parquet/deprecated_io.h" |
19 | |
20 | #include <cstdint> |
21 | #include <utility> |
22 | |
23 | #include "parquet/exception.h" |
24 | |
25 | namespace parquet { |
26 | |
27 | ParquetInputWrapper::ParquetInputWrapper(std::unique_ptr<RandomAccessSource> source) |
28 | : ParquetInputWrapper(source.get()) { |
29 | owned_source_ = std::move(source); |
30 | } |
31 | |
32 | ParquetInputWrapper::ParquetInputWrapper(RandomAccessSource* source) |
33 | : source_(source), closed_(false) {} |
34 | |
35 | ParquetInputWrapper::~ParquetInputWrapper() { |
36 | if (!closed_) { |
37 | try { |
38 | source_->Close(); |
39 | } catch (...) { |
40 | } |
41 | closed_ = true; |
42 | } |
43 | } |
44 | |
45 | ::arrow::Status ParquetInputWrapper::Close() { |
46 | PARQUET_CATCH_NOT_OK(source_->Close()); |
47 | closed_ = true; |
48 | return ::arrow::Status::OK(); |
49 | } |
50 | |
51 | ::arrow::Status ParquetInputWrapper::Tell(int64_t* position) const { |
52 | PARQUET_CATCH_NOT_OK(*position = source_->Tell()); |
53 | return ::arrow::Status::OK(); |
54 | } |
55 | |
56 | bool ParquetInputWrapper::closed() const { return closed_; } |
57 | |
58 | ::arrow::Status ParquetInputWrapper::Seek(int64_t position) { |
59 | return ::arrow::Status::NotImplemented("Seek" ); |
60 | } |
61 | |
62 | ::arrow::Status ParquetInputWrapper::Read(int64_t nbytes, int64_t* bytes_read, |
63 | void* out) { |
64 | PARQUET_CATCH_NOT_OK(*bytes_read = |
65 | source_->Read(nbytes, reinterpret_cast<uint8_t*>(out))); |
66 | return ::arrow::Status::OK(); |
67 | } |
68 | |
69 | ::arrow::Status ParquetInputWrapper::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) { |
70 | PARQUET_CATCH_NOT_OK(*out = source_->Read(nbytes)); |
71 | return ::arrow::Status::OK(); |
72 | } |
73 | |
74 | ::arrow::Status ParquetInputWrapper::ReadAt(int64_t position, int64_t nbytes, |
75 | std::shared_ptr<Buffer>* out) { |
76 | PARQUET_CATCH_NOT_OK(*out = source_->ReadAt(position, nbytes)); |
77 | return ::arrow::Status::OK(); |
78 | } |
79 | |
80 | ::arrow::Status ParquetInputWrapper::GetSize(int64_t* size) { |
81 | PARQUET_CATCH_NOT_OK(*size = source_->Size()); |
82 | return ::arrow::Status::OK(); |
83 | } |
84 | |
85 | ParquetOutputWrapper::ParquetOutputWrapper(std::unique_ptr<::parquet::OutputStream> sink) |
86 | : ParquetOutputWrapper(sink.get()) { |
87 | owned_sink_ = std::move(sink); |
88 | } |
89 | |
90 | ParquetOutputWrapper::ParquetOutputWrapper( |
91 | const std::shared_ptr<::parquet::OutputStream>& sink) |
92 | : ParquetOutputWrapper(sink.get()) { |
93 | shared_sink_ = sink; |
94 | } |
95 | |
96 | ParquetOutputWrapper::ParquetOutputWrapper(::parquet::OutputStream* sink) |
97 | : sink_(sink), closed_(false) {} |
98 | |
99 | ParquetOutputWrapper::~ParquetOutputWrapper() { |
100 | if (!closed_) { |
101 | try { |
102 | sink_->Close(); |
103 | } catch (...) { |
104 | } |
105 | closed_ = true; |
106 | } |
107 | } |
108 | |
109 | ::arrow::Status ParquetOutputWrapper::Close() { |
110 | PARQUET_CATCH_NOT_OK(sink_->Close()); |
111 | closed_ = true; |
112 | return ::arrow::Status::OK(); |
113 | } |
114 | |
115 | ::arrow::Status ParquetOutputWrapper::Tell(int64_t* position) const { |
116 | PARQUET_CATCH_NOT_OK(*position = sink_->Tell()); |
117 | return ::arrow::Status::OK(); |
118 | } |
119 | |
120 | bool ParquetOutputWrapper::closed() const { return closed_; } |
121 | |
122 | ::arrow::Status ParquetOutputWrapper::Write(const void* data, int64_t nbytes) { |
123 | PARQUET_CATCH_NOT_OK(sink_->Write(reinterpret_cast<const uint8_t*>(data), nbytes)); |
124 | return ::arrow::Status::OK(); |
125 | } |
126 | |
127 | } // namespace parquet |
128 | |