1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "parquet/deprecated_io.h"
19
20#include <cstdint>
21#include <utility>
22
23#include "parquet/exception.h"
24
25namespace parquet {
26
27ParquetInputWrapper::ParquetInputWrapper(std::unique_ptr<RandomAccessSource> source)
28 : ParquetInputWrapper(source.get()) {
29 owned_source_ = std::move(source);
30}
31
32ParquetInputWrapper::ParquetInputWrapper(RandomAccessSource* source)
33 : source_(source), closed_(false) {}
34
35ParquetInputWrapper::~ParquetInputWrapper() {
36 if (!closed_) {
37 try {
38 source_->Close();
39 } catch (...) {
40 }
41 closed_ = true;
42 }
43}
44
45::arrow::Status ParquetInputWrapper::Close() {
46 PARQUET_CATCH_NOT_OK(source_->Close());
47 closed_ = true;
48 return ::arrow::Status::OK();
49}
50
51::arrow::Status ParquetInputWrapper::Tell(int64_t* position) const {
52 PARQUET_CATCH_NOT_OK(*position = source_->Tell());
53 return ::arrow::Status::OK();
54}
55
56bool ParquetInputWrapper::closed() const { return closed_; }
57
58::arrow::Status ParquetInputWrapper::Seek(int64_t position) {
59 return ::arrow::Status::NotImplemented("Seek");
60}
61
62::arrow::Status ParquetInputWrapper::Read(int64_t nbytes, int64_t* bytes_read,
63 void* out) {
64 PARQUET_CATCH_NOT_OK(*bytes_read =
65 source_->Read(nbytes, reinterpret_cast<uint8_t*>(out)));
66 return ::arrow::Status::OK();
67}
68
69::arrow::Status ParquetInputWrapper::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
70 PARQUET_CATCH_NOT_OK(*out = source_->Read(nbytes));
71 return ::arrow::Status::OK();
72}
73
74::arrow::Status ParquetInputWrapper::ReadAt(int64_t position, int64_t nbytes,
75 std::shared_ptr<Buffer>* out) {
76 PARQUET_CATCH_NOT_OK(*out = source_->ReadAt(position, nbytes));
77 return ::arrow::Status::OK();
78}
79
80::arrow::Status ParquetInputWrapper::GetSize(int64_t* size) {
81 PARQUET_CATCH_NOT_OK(*size = source_->Size());
82 return ::arrow::Status::OK();
83}
84
85ParquetOutputWrapper::ParquetOutputWrapper(std::unique_ptr<::parquet::OutputStream> sink)
86 : ParquetOutputWrapper(sink.get()) {
87 owned_sink_ = std::move(sink);
88}
89
90ParquetOutputWrapper::ParquetOutputWrapper(
91 const std::shared_ptr<::parquet::OutputStream>& sink)
92 : ParquetOutputWrapper(sink.get()) {
93 shared_sink_ = sink;
94}
95
96ParquetOutputWrapper::ParquetOutputWrapper(::parquet::OutputStream* sink)
97 : sink_(sink), closed_(false) {}
98
99ParquetOutputWrapper::~ParquetOutputWrapper() {
100 if (!closed_) {
101 try {
102 sink_->Close();
103 } catch (...) {
104 }
105 closed_ = true;
106 }
107}
108
109::arrow::Status ParquetOutputWrapper::Close() {
110 PARQUET_CATCH_NOT_OK(sink_->Close());
111 closed_ = true;
112 return ::arrow::Status::OK();
113}
114
115::arrow::Status ParquetOutputWrapper::Tell(int64_t* position) const {
116 PARQUET_CATCH_NOT_OK(*position = sink_->Tell());
117 return ::arrow::Status::OK();
118}
119
120bool ParquetOutputWrapper::closed() const { return closed_; }
121
122::arrow::Status ParquetOutputWrapper::Write(const void* data, int64_t nbytes) {
123 PARQUET_CATCH_NOT_OK(sink_->Write(reinterpret_cast<const uint8_t*>(data), nbytes));
124 return ::arrow::Status::OK();
125}
126
127} // namespace parquet
128