1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include "arrow/util/windows_compatibility.h"
21
22#include <cstdint>
23// Check if thrift version < 0.11.0
24// or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp
25#if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR)
26#include <boost/shared_ptr.hpp>
27#else
28#include <memory>
29#endif
30#include <string>
31
32// TCompactProtocol requires some #defines to work right.
33#define SIGNED_RIGHT_SHIFT_IS 1
34#define ARITHMETIC_RIGHT_SHIFT 1
35#include <thrift/TApplicationException.h>
36#include <thrift/protocol/TCompactProtocol.h>
37#include <thrift/protocol/TDebugProtocol.h>
38
39#include <thrift/protocol/TBinaryProtocol.h>
40#include <thrift/transport/TBufferTransports.h>
41#include <sstream>
42
43#include "arrow/util/logging.h"
44#include "parquet/exception.h"
45#include "parquet/platform.h"
46#include "parquet/statistics.h"
47
48#include "parquet/parquet_types.h" // IYWU pragma: export
49
50namespace parquet {
51
52// Check if thrift version < 0.11.0
53// or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp
54#if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR)
55using ::boost::shared_ptr;
56#else
57using ::std::shared_ptr;
58#endif
59
60// ----------------------------------------------------------------------
61// Convert Thrift enums to / from parquet enums
62
63static inline Type::type FromThrift(format::Type::type type) {
64 return static_cast<Type::type>(type);
65}
66
67static inline ConvertedType::type FromThrift(format::ConvertedType::type type) {
68 // item 0 is NONE
69 return static_cast<ConvertedType::type>(static_cast<int>(type) + 1);
70}
71
72static inline Repetition::type FromThrift(format::FieldRepetitionType::type type) {
73 return static_cast<Repetition::type>(type);
74}
75
76static inline Encoding::type FromThrift(format::Encoding::type type) {
77 return static_cast<Encoding::type>(type);
78}
79
80static inline format::Type::type ToThrift(Type::type type) {
81 return static_cast<format::Type::type>(type);
82}
83
84static inline format::ConvertedType::type ToThrift(ConvertedType::type type) {
85 // item 0 is NONE
86 DCHECK_NE(type, ConvertedType::NONE);
87 return static_cast<format::ConvertedType::type>(static_cast<int>(type) - 1);
88}
89
90static inline format::FieldRepetitionType::type ToThrift(Repetition::type type) {
91 return static_cast<format::FieldRepetitionType::type>(type);
92}
93
94static inline format::Encoding::type ToThrift(Encoding::type type) {
95 return static_cast<format::Encoding::type>(type);
96}
97
98static inline Compression::type FromThrift(format::CompressionCodec::type type) {
99 switch (type) {
100 case format::CompressionCodec::UNCOMPRESSED:
101 return Compression::UNCOMPRESSED;
102 case format::CompressionCodec::SNAPPY:
103 return Compression::SNAPPY;
104 case format::CompressionCodec::GZIP:
105 return Compression::GZIP;
106 case format::CompressionCodec::LZO:
107 return Compression::LZO;
108 case format::CompressionCodec::BROTLI:
109 return Compression::BROTLI;
110 case format::CompressionCodec::LZ4:
111 return Compression::LZ4;
112 case format::CompressionCodec::ZSTD:
113 return Compression::ZSTD;
114 default:
115 DCHECK(false) << "Cannot reach here";
116 return Compression::UNCOMPRESSED;
117 }
118}
119
120static inline format::CompressionCodec::type ToThrift(Compression::type type) {
121 switch (type) {
122 case Compression::UNCOMPRESSED:
123 return format::CompressionCodec::UNCOMPRESSED;
124 case Compression::SNAPPY:
125 return format::CompressionCodec::SNAPPY;
126 case Compression::GZIP:
127 return format::CompressionCodec::GZIP;
128 case Compression::LZO:
129 return format::CompressionCodec::LZO;
130 case Compression::BROTLI:
131 return format::CompressionCodec::BROTLI;
132 case Compression::LZ4:
133 return format::CompressionCodec::LZ4;
134 case Compression::ZSTD:
135 return format::CompressionCodec::ZSTD;
136 default:
137 DCHECK(false) << "Cannot reach here";
138 return format::CompressionCodec::UNCOMPRESSED;
139 }
140}
141
142static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
143 format::Statistics statistics;
144 if (stats.has_min) {
145 statistics.__set_min_value(stats.min());
146 // If the order is SIGNED, then the old min value must be set too.
147 // This for backward compatibility
148 if (stats.is_signed()) {
149 statistics.__set_min(stats.min());
150 }
151 }
152 if (stats.has_max) {
153 statistics.__set_max_value(stats.max());
154 // If the order is SIGNED, then the old max value must be set too.
155 // This for backward compatibility
156 if (stats.is_signed()) {
157 statistics.__set_max(stats.max());
158 }
159 }
160 if (stats.has_null_count) {
161 statistics.__set_null_count(stats.null_count);
162 }
163 if (stats.has_distinct_count) {
164 statistics.__set_distinct_count(stats.distinct_count);
165 }
166
167 return statistics;
168}
169
170// ----------------------------------------------------------------------
171// Thrift struct serialization / deserialization utilities
172
173using ThriftBuffer = apache::thrift::transport::TMemoryBuffer;
174
175// Deserialize a thrift message from buf/len. buf/len must at least contain
176// all the bytes needed to store the thrift message. On return, len will be
177// set to the actual length of the header.
178template <class T>
179inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) {
180 // Deserialize msg bytes into c++ thrift msg using memory transport.
181 shared_ptr<ThriftBuffer> tmem_transport(
182 new ThriftBuffer(const_cast<uint8_t*>(buf), *len));
183 apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory;
184 shared_ptr<apache::thrift::protocol::TProtocol> tproto = //
185 tproto_factory.getProtocol(tmem_transport);
186 try {
187 deserialized_msg->read(tproto.get());
188 } catch (std::exception& e) {
189 std::stringstream ss;
190 ss << "Couldn't deserialize thrift: " << e.what() << "\n";
191 throw ParquetException(ss.str());
192 }
193 uint32_t bytes_left = tmem_transport->available_read();
194 *len = *len - bytes_left;
195}
196
197/// Utility class to serialize thrift objects to a binary format. This object
198/// should be reused if possible to reuse the underlying memory.
199/// Note: thrift will encode NULLs into the serialized buffer so it is not valid
200/// to treat it as a string.
201class ThriftSerializer {
202 public:
203 explicit ThriftSerializer(int initial_buffer_size = 1024)
204 : mem_buffer_(new ThriftBuffer(initial_buffer_size)) {
205 apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> factory;
206 protocol_ = factory.getProtocol(mem_buffer_);
207 }
208
209 /// Serialize obj into a memory buffer. The result is returned in buffer/len. The
210 /// memory returned is owned by this object and will be invalid when another object
211 /// is serialized.
212 template <class T>
213 void SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) {
214 SerializeObject(obj);
215 mem_buffer_->getBuffer(buffer, len);
216 }
217
218 template <class T>
219 void SerializeToString(const T* obj, std::string* result) {
220 SerializeObject(obj);
221 *result = mem_buffer_->getBufferAsString();
222 }
223
224 template <class T>
225 int64_t Serialize(const T* obj, ArrowOutputStream* out) {
226 uint8_t* out_buffer;
227 uint32_t out_length;
228 SerializeToBuffer(obj, &out_length, &out_buffer);
229 PARQUET_THROW_NOT_OK(out->Write(out_buffer, out_length));
230 return static_cast<int64_t>(out_length);
231 }
232
233 private:
234 template <class T>
235 void SerializeObject(const T* obj) {
236 try {
237 mem_buffer_->resetBuffer();
238 obj->write(protocol_.get());
239 } catch (std::exception& e) {
240 std::stringstream ss;
241 ss << "Couldn't serialize thrift: " << e.what() << "\n";
242 throw ParquetException(ss.str());
243 }
244 }
245
246 shared_ptr<ThriftBuffer> mem_buffer_;
247 shared_ptr<apache::thrift::protocol::TProtocol> protocol_;
248};
249
250} // namespace parquet
251