1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #pragma once |
19 | |
20 | #include "arrow/util/windows_compatibility.h" |
21 | |
22 | #include <cstdint> |
23 | // Check if thrift version < 0.11.0 |
24 | // or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp |
25 | #if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR) |
26 | #include <boost/shared_ptr.hpp> |
27 | #else |
28 | #include <memory> |
29 | #endif |
30 | #include <string> |
31 | |
32 | // TCompactProtocol requires some #defines to work right. |
33 | #define SIGNED_RIGHT_SHIFT_IS 1 |
34 | #define ARITHMETIC_RIGHT_SHIFT 1 |
35 | #include <thrift/TApplicationException.h> |
36 | #include <thrift/protocol/TCompactProtocol.h> |
37 | #include <thrift/protocol/TDebugProtocol.h> |
38 | |
39 | #include <thrift/protocol/TBinaryProtocol.h> |
40 | #include <thrift/transport/TBufferTransports.h> |
41 | #include <sstream> |
42 | |
43 | #include "arrow/util/logging.h" |
44 | #include "parquet/exception.h" |
45 | #include "parquet/util/memory.h" |
46 | |
47 | #include "parquet/parquet_types.h" // IYWU pragma: export |
48 | |
49 | namespace parquet { |
50 | |
51 | // Check if thrift version < 0.11.0 |
52 | // or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp |
53 | #if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR) |
54 | using ::boost::shared_ptr; |
55 | #else |
56 | using ::std::shared_ptr; |
57 | #endif |
58 | |
59 | // ---------------------------------------------------------------------- |
60 | // Convert Thrift enums to / from parquet enums |
61 | |
62 | static inline Type::type FromThrift(format::Type::type type) { |
63 | return static_cast<Type::type>(type); |
64 | } |
65 | |
66 | static inline LogicalType::type FromThrift(format::ConvertedType::type type) { |
67 | // item 0 is NONE |
68 | return static_cast<LogicalType::type>(static_cast<int>(type) + 1); |
69 | } |
70 | |
71 | static inline Repetition::type FromThrift(format::FieldRepetitionType::type type) { |
72 | return static_cast<Repetition::type>(type); |
73 | } |
74 | |
75 | static inline Encoding::type FromThrift(format::Encoding::type type) { |
76 | return static_cast<Encoding::type>(type); |
77 | } |
78 | |
79 | static inline Compression::type FromThrift(format::CompressionCodec::type type) { |
80 | return static_cast<Compression::type>(type); |
81 | } |
82 | |
83 | static inline format::Type::type ToThrift(Type::type type) { |
84 | return static_cast<format::Type::type>(type); |
85 | } |
86 | |
87 | static inline format::ConvertedType::type ToThrift(LogicalType::type type) { |
88 | // item 0 is NONE |
89 | DCHECK_NE(type, LogicalType::NONE); |
90 | return static_cast<format::ConvertedType::type>(static_cast<int>(type) - 1); |
91 | } |
92 | |
93 | static inline format::FieldRepetitionType::type ToThrift(Repetition::type type) { |
94 | return static_cast<format::FieldRepetitionType::type>(type); |
95 | } |
96 | |
97 | static inline format::Encoding::type ToThrift(Encoding::type type) { |
98 | return static_cast<format::Encoding::type>(type); |
99 | } |
100 | |
101 | static inline format::CompressionCodec::type ToThrift(Compression::type type) { |
102 | return static_cast<format::CompressionCodec::type>(type); |
103 | } |
104 | |
105 | // ---------------------------------------------------------------------- |
106 | // Thrift struct serialization / deserialization utilities |
107 | |
108 | using ThriftBuffer = apache::thrift::transport::TMemoryBuffer; |
109 | |
110 | // Deserialize a thrift message from buf/len. buf/len must at least contain |
111 | // all the bytes needed to store the thrift message. On return, len will be |
112 | // set to the actual length of the header. |
113 | template <class T> |
114 | inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { |
115 | // Deserialize msg bytes into c++ thrift msg using memory transport. |
116 | shared_ptr<ThriftBuffer> tmem_transport( |
117 | new ThriftBuffer(const_cast<uint8_t*>(buf), *len)); |
118 | apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory; |
119 | shared_ptr<apache::thrift::protocol::TProtocol> tproto = // |
120 | tproto_factory.getProtocol(tmem_transport); |
121 | try { |
122 | deserialized_msg->read(tproto.get()); |
123 | } catch (std::exception& e) { |
124 | std::stringstream ss; |
125 | ss << "Couldn't deserialize thrift: " << e.what() << "\n" ; |
126 | throw ParquetException(ss.str()); |
127 | } |
128 | uint32_t bytes_left = tmem_transport->available_read(); |
129 | *len = *len - bytes_left; |
130 | } |
131 | |
132 | /// Utility class to serialize thrift objects to a binary format. This object |
133 | /// should be reused if possible to reuse the underlying memory. |
134 | /// Note: thrift will encode NULLs into the serialized buffer so it is not valid |
135 | /// to treat it as a string. |
136 | class ThriftSerializer { |
137 | public: |
138 | explicit ThriftSerializer(int initial_buffer_size = 1024) |
139 | : mem_buffer_(new ThriftBuffer(initial_buffer_size)) { |
140 | apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> factory; |
141 | protocol_ = factory.getProtocol(mem_buffer_); |
142 | } |
143 | |
144 | /// Serialize obj into a memory buffer. The result is returned in buffer/len. The |
145 | /// memory returned is owned by this object and will be invalid when another object |
146 | /// is serialized. |
147 | template <class T> |
148 | void SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) { |
149 | SerializeObject(obj); |
150 | mem_buffer_->getBuffer(buffer, len); |
151 | } |
152 | |
153 | template <class T> |
154 | void SerializeToString(const T* obj, std::string* result) { |
155 | SerializeObject(obj); |
156 | *result = mem_buffer_->getBufferAsString(); |
157 | } |
158 | |
159 | template <class T> |
160 | int64_t Serialize(const T* obj, OutputStream* out) { |
161 | uint8_t* out_buffer; |
162 | uint32_t out_length; |
163 | SerializeToBuffer(obj, &out_length, &out_buffer); |
164 | out->Write(out_buffer, out_length); |
165 | return static_cast<int64_t>(out_length); |
166 | } |
167 | |
168 | private: |
169 | template <class T> |
170 | void SerializeObject(const T* obj) { |
171 | try { |
172 | mem_buffer_->resetBuffer(); |
173 | obj->write(protocol_.get()); |
174 | } catch (std::exception& e) { |
175 | std::stringstream ss; |
176 | ss << "Couldn't serialize thrift: " << e.what() << "\n" ; |
177 | throw ParquetException(ss.str()); |
178 | } |
179 | } |
180 | |
181 | shared_ptr<ThriftBuffer> mem_buffer_; |
182 | shared_ptr<apache::thrift::protocol::TProtocol> protocol_; |
183 | }; |
184 | |
185 | } // namespace parquet |
186 | |