1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #pragma once |
19 | |
20 | #include "arrow/util/windows_compatibility.h" |
21 | |
22 | #include <cstdint> |
23 | // Check if thrift version < 0.11.0 |
24 | // or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp |
25 | #if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR) |
26 | #include <boost/shared_ptr.hpp> |
27 | #else |
28 | #include <memory> |
29 | #endif |
30 | #include <string> |
31 | |
32 | // TCompactProtocol requires some #defines to work right. |
33 | #define SIGNED_RIGHT_SHIFT_IS 1 |
34 | #define ARITHMETIC_RIGHT_SHIFT 1 |
35 | #include <thrift/TApplicationException.h> |
36 | #include <thrift/protocol/TCompactProtocol.h> |
37 | #include <thrift/protocol/TDebugProtocol.h> |
38 | |
39 | #include <thrift/protocol/TBinaryProtocol.h> |
40 | #include <thrift/transport/TBufferTransports.h> |
41 | #include <sstream> |
42 | |
43 | #include "arrow/util/logging.h" |
44 | #include "parquet/exception.h" |
45 | #include "parquet/platform.h" |
46 | #include "parquet/statistics.h" |
47 | |
48 | #include "parquet/parquet_types.h" // IYWU pragma: export |
49 | |
50 | namespace parquet { |
51 | |
52 | // Check if thrift version < 0.11.0 |
53 | // or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp |
54 | #if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR) |
55 | using ::boost::shared_ptr; |
56 | #else |
57 | using ::std::shared_ptr; |
58 | #endif |
59 | |
60 | // ---------------------------------------------------------------------- |
61 | // Convert Thrift enums to / from parquet enums |
62 | |
63 | static inline Type::type FromThrift(format::Type::type type) { |
64 | return static_cast<Type::type>(type); |
65 | } |
66 | |
67 | static inline ConvertedType::type FromThrift(format::ConvertedType::type type) { |
68 | // item 0 is NONE |
69 | return static_cast<ConvertedType::type>(static_cast<int>(type) + 1); |
70 | } |
71 | |
72 | static inline Repetition::type FromThrift(format::FieldRepetitionType::type type) { |
73 | return static_cast<Repetition::type>(type); |
74 | } |
75 | |
76 | static inline Encoding::type FromThrift(format::Encoding::type type) { |
77 | return static_cast<Encoding::type>(type); |
78 | } |
79 | |
80 | static inline format::Type::type ToThrift(Type::type type) { |
81 | return static_cast<format::Type::type>(type); |
82 | } |
83 | |
84 | static inline format::ConvertedType::type ToThrift(ConvertedType::type type) { |
85 | // item 0 is NONE |
86 | DCHECK_NE(type, ConvertedType::NONE); |
87 | return static_cast<format::ConvertedType::type>(static_cast<int>(type) - 1); |
88 | } |
89 | |
90 | static inline format::FieldRepetitionType::type ToThrift(Repetition::type type) { |
91 | return static_cast<format::FieldRepetitionType::type>(type); |
92 | } |
93 | |
94 | static inline format::Encoding::type ToThrift(Encoding::type type) { |
95 | return static_cast<format::Encoding::type>(type); |
96 | } |
97 | |
98 | static inline Compression::type FromThrift(format::CompressionCodec::type type) { |
99 | switch (type) { |
100 | case format::CompressionCodec::UNCOMPRESSED: |
101 | return Compression::UNCOMPRESSED; |
102 | case format::CompressionCodec::SNAPPY: |
103 | return Compression::SNAPPY; |
104 | case format::CompressionCodec::GZIP: |
105 | return Compression::GZIP; |
106 | case format::CompressionCodec::LZO: |
107 | return Compression::LZO; |
108 | case format::CompressionCodec::BROTLI: |
109 | return Compression::BROTLI; |
110 | case format::CompressionCodec::LZ4: |
111 | return Compression::LZ4; |
112 | case format::CompressionCodec::ZSTD: |
113 | return Compression::ZSTD; |
114 | default: |
115 | DCHECK(false) << "Cannot reach here" ; |
116 | return Compression::UNCOMPRESSED; |
117 | } |
118 | } |
119 | |
120 | static inline format::CompressionCodec::type ToThrift(Compression::type type) { |
121 | switch (type) { |
122 | case Compression::UNCOMPRESSED: |
123 | return format::CompressionCodec::UNCOMPRESSED; |
124 | case Compression::SNAPPY: |
125 | return format::CompressionCodec::SNAPPY; |
126 | case Compression::GZIP: |
127 | return format::CompressionCodec::GZIP; |
128 | case Compression::LZO: |
129 | return format::CompressionCodec::LZO; |
130 | case Compression::BROTLI: |
131 | return format::CompressionCodec::BROTLI; |
132 | case Compression::LZ4: |
133 | return format::CompressionCodec::LZ4; |
134 | case Compression::ZSTD: |
135 | return format::CompressionCodec::ZSTD; |
136 | default: |
137 | DCHECK(false) << "Cannot reach here" ; |
138 | return format::CompressionCodec::UNCOMPRESSED; |
139 | } |
140 | } |
141 | |
142 | static inline format::Statistics ToThrift(const EncodedStatistics& stats) { |
143 | format::Statistics statistics; |
144 | if (stats.has_min) { |
145 | statistics.__set_min_value(stats.min()); |
146 | // If the order is SIGNED, then the old min value must be set too. |
147 | // This for backward compatibility |
148 | if (stats.is_signed()) { |
149 | statistics.__set_min(stats.min()); |
150 | } |
151 | } |
152 | if (stats.has_max) { |
153 | statistics.__set_max_value(stats.max()); |
154 | // If the order is SIGNED, then the old max value must be set too. |
155 | // This for backward compatibility |
156 | if (stats.is_signed()) { |
157 | statistics.__set_max(stats.max()); |
158 | } |
159 | } |
160 | if (stats.has_null_count) { |
161 | statistics.__set_null_count(stats.null_count); |
162 | } |
163 | if (stats.has_distinct_count) { |
164 | statistics.__set_distinct_count(stats.distinct_count); |
165 | } |
166 | |
167 | return statistics; |
168 | } |
169 | |
170 | // ---------------------------------------------------------------------- |
171 | // Thrift struct serialization / deserialization utilities |
172 | |
173 | using ThriftBuffer = apache::thrift::transport::TMemoryBuffer; |
174 | |
175 | // Deserialize a thrift message from buf/len. buf/len must at least contain |
176 | // all the bytes needed to store the thrift message. On return, len will be |
177 | // set to the actual length of the header. |
178 | template <class T> |
179 | inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { |
180 | // Deserialize msg bytes into c++ thrift msg using memory transport. |
181 | shared_ptr<ThriftBuffer> tmem_transport( |
182 | new ThriftBuffer(const_cast<uint8_t*>(buf), *len)); |
183 | apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory; |
184 | shared_ptr<apache::thrift::protocol::TProtocol> tproto = // |
185 | tproto_factory.getProtocol(tmem_transport); |
186 | try { |
187 | deserialized_msg->read(tproto.get()); |
188 | } catch (std::exception& e) { |
189 | std::stringstream ss; |
190 | ss << "Couldn't deserialize thrift: " << e.what() << "\n" ; |
191 | throw ParquetException(ss.str()); |
192 | } |
193 | uint32_t bytes_left = tmem_transport->available_read(); |
194 | *len = *len - bytes_left; |
195 | } |
196 | |
197 | /// Utility class to serialize thrift objects to a binary format. This object |
198 | /// should be reused if possible to reuse the underlying memory. |
199 | /// Note: thrift will encode NULLs into the serialized buffer so it is not valid |
200 | /// to treat it as a string. |
201 | class ThriftSerializer { |
202 | public: |
203 | explicit ThriftSerializer(int initial_buffer_size = 1024) |
204 | : mem_buffer_(new ThriftBuffer(initial_buffer_size)) { |
205 | apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> factory; |
206 | protocol_ = factory.getProtocol(mem_buffer_); |
207 | } |
208 | |
209 | /// Serialize obj into a memory buffer. The result is returned in buffer/len. The |
210 | /// memory returned is owned by this object and will be invalid when another object |
211 | /// is serialized. |
212 | template <class T> |
213 | void SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) { |
214 | SerializeObject(obj); |
215 | mem_buffer_->getBuffer(buffer, len); |
216 | } |
217 | |
218 | template <class T> |
219 | void SerializeToString(const T* obj, std::string* result) { |
220 | SerializeObject(obj); |
221 | *result = mem_buffer_->getBufferAsString(); |
222 | } |
223 | |
224 | template <class T> |
225 | int64_t Serialize(const T* obj, ArrowOutputStream* out) { |
226 | uint8_t* out_buffer; |
227 | uint32_t out_length; |
228 | SerializeToBuffer(obj, &out_length, &out_buffer); |
229 | PARQUET_THROW_NOT_OK(out->Write(out_buffer, out_length)); |
230 | return static_cast<int64_t>(out_length); |
231 | } |
232 | |
233 | private: |
234 | template <class T> |
235 | void SerializeObject(const T* obj) { |
236 | try { |
237 | mem_buffer_->resetBuffer(); |
238 | obj->write(protocol_.get()); |
239 | } catch (std::exception& e) { |
240 | std::stringstream ss; |
241 | ss << "Couldn't serialize thrift: " << e.what() << "\n" ; |
242 | throw ParquetException(ss.str()); |
243 | } |
244 | } |
245 | |
246 | shared_ptr<ThriftBuffer> mem_buffer_; |
247 | shared_ptr<apache::thrift::protocol::TProtocol> protocol_; |
248 | }; |
249 | |
250 | } // namespace parquet |
251 | |