| 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | #pragma once |
| 19 | |
| 20 | #include "arrow/util/windows_compatibility.h" |
| 21 | |
| 22 | #include <cstdint> |
| 23 | // Check if thrift version < 0.11.0 |
| 24 | // or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp |
| 25 | #if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR) |
| 26 | #include <boost/shared_ptr.hpp> |
| 27 | #else |
| 28 | #include <memory> |
| 29 | #endif |
| 30 | #include <string> |
| 31 | |
| 32 | // TCompactProtocol requires some #defines to work right. |
| 33 | #define SIGNED_RIGHT_SHIFT_IS 1 |
| 34 | #define ARITHMETIC_RIGHT_SHIFT 1 |
| 35 | #include <thrift/TApplicationException.h> |
| 36 | #include <thrift/protocol/TCompactProtocol.h> |
| 37 | #include <thrift/protocol/TDebugProtocol.h> |
| 38 | |
| 39 | #include <thrift/protocol/TBinaryProtocol.h> |
| 40 | #include <thrift/transport/TBufferTransports.h> |
| 41 | #include <sstream> |
| 42 | |
| 43 | #include "arrow/util/logging.h" |
| 44 | #include "parquet/exception.h" |
| 45 | #include "parquet/platform.h" |
| 46 | #include "parquet/statistics.h" |
| 47 | |
| 48 | #include "parquet/parquet_types.h" // IYWU pragma: export |
| 49 | |
| 50 | namespace parquet { |
| 51 | |
| 52 | // Check if thrift version < 0.11.0 |
| 53 | // or if FORCE_BOOST_SMART_PTR is defined. Ref: https://thrift.apache.org/lib/cpp |
| 54 | #if defined(PARQUET_THRIFT_USE_BOOST) || defined(FORCE_BOOST_SMART_PTR) |
| 55 | using ::boost::shared_ptr; |
| 56 | #else |
| 57 | using ::std::shared_ptr; |
| 58 | #endif |
| 59 | |
| 60 | // ---------------------------------------------------------------------- |
| 61 | // Convert Thrift enums to / from parquet enums |
| 62 | |
| 63 | static inline Type::type FromThrift(format::Type::type type) { |
| 64 | return static_cast<Type::type>(type); |
| 65 | } |
| 66 | |
| 67 | static inline ConvertedType::type FromThrift(format::ConvertedType::type type) { |
| 68 | // item 0 is NONE |
| 69 | return static_cast<ConvertedType::type>(static_cast<int>(type) + 1); |
| 70 | } |
| 71 | |
| 72 | static inline Repetition::type FromThrift(format::FieldRepetitionType::type type) { |
| 73 | return static_cast<Repetition::type>(type); |
| 74 | } |
| 75 | |
| 76 | static inline Encoding::type FromThrift(format::Encoding::type type) { |
| 77 | return static_cast<Encoding::type>(type); |
| 78 | } |
| 79 | |
| 80 | static inline format::Type::type ToThrift(Type::type type) { |
| 81 | return static_cast<format::Type::type>(type); |
| 82 | } |
| 83 | |
| 84 | static inline format::ConvertedType::type ToThrift(ConvertedType::type type) { |
| 85 | // item 0 is NONE |
| 86 | DCHECK_NE(type, ConvertedType::NONE); |
| 87 | return static_cast<format::ConvertedType::type>(static_cast<int>(type) - 1); |
| 88 | } |
| 89 | |
| 90 | static inline format::FieldRepetitionType::type ToThrift(Repetition::type type) { |
| 91 | return static_cast<format::FieldRepetitionType::type>(type); |
| 92 | } |
| 93 | |
| 94 | static inline format::Encoding::type ToThrift(Encoding::type type) { |
| 95 | return static_cast<format::Encoding::type>(type); |
| 96 | } |
| 97 | |
| 98 | static inline Compression::type FromThrift(format::CompressionCodec::type type) { |
| 99 | switch (type) { |
| 100 | case format::CompressionCodec::UNCOMPRESSED: |
| 101 | return Compression::UNCOMPRESSED; |
| 102 | case format::CompressionCodec::SNAPPY: |
| 103 | return Compression::SNAPPY; |
| 104 | case format::CompressionCodec::GZIP: |
| 105 | return Compression::GZIP; |
| 106 | case format::CompressionCodec::LZO: |
| 107 | return Compression::LZO; |
| 108 | case format::CompressionCodec::BROTLI: |
| 109 | return Compression::BROTLI; |
| 110 | case format::CompressionCodec::LZ4: |
| 111 | return Compression::LZ4; |
| 112 | case format::CompressionCodec::ZSTD: |
| 113 | return Compression::ZSTD; |
| 114 | default: |
| 115 | DCHECK(false) << "Cannot reach here" ; |
| 116 | return Compression::UNCOMPRESSED; |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | static inline format::CompressionCodec::type ToThrift(Compression::type type) { |
| 121 | switch (type) { |
| 122 | case Compression::UNCOMPRESSED: |
| 123 | return format::CompressionCodec::UNCOMPRESSED; |
| 124 | case Compression::SNAPPY: |
| 125 | return format::CompressionCodec::SNAPPY; |
| 126 | case Compression::GZIP: |
| 127 | return format::CompressionCodec::GZIP; |
| 128 | case Compression::LZO: |
| 129 | return format::CompressionCodec::LZO; |
| 130 | case Compression::BROTLI: |
| 131 | return format::CompressionCodec::BROTLI; |
| 132 | case Compression::LZ4: |
| 133 | return format::CompressionCodec::LZ4; |
| 134 | case Compression::ZSTD: |
| 135 | return format::CompressionCodec::ZSTD; |
| 136 | default: |
| 137 | DCHECK(false) << "Cannot reach here" ; |
| 138 | return format::CompressionCodec::UNCOMPRESSED; |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | static inline format::Statistics ToThrift(const EncodedStatistics& stats) { |
| 143 | format::Statistics statistics; |
| 144 | if (stats.has_min) { |
| 145 | statistics.__set_min_value(stats.min()); |
| 146 | // If the order is SIGNED, then the old min value must be set too. |
| 147 | // This for backward compatibility |
| 148 | if (stats.is_signed()) { |
| 149 | statistics.__set_min(stats.min()); |
| 150 | } |
| 151 | } |
| 152 | if (stats.has_max) { |
| 153 | statistics.__set_max_value(stats.max()); |
| 154 | // If the order is SIGNED, then the old max value must be set too. |
| 155 | // This for backward compatibility |
| 156 | if (stats.is_signed()) { |
| 157 | statistics.__set_max(stats.max()); |
| 158 | } |
| 159 | } |
| 160 | if (stats.has_null_count) { |
| 161 | statistics.__set_null_count(stats.null_count); |
| 162 | } |
| 163 | if (stats.has_distinct_count) { |
| 164 | statistics.__set_distinct_count(stats.distinct_count); |
| 165 | } |
| 166 | |
| 167 | return statistics; |
| 168 | } |
| 169 | |
| 170 | // ---------------------------------------------------------------------- |
| 171 | // Thrift struct serialization / deserialization utilities |
| 172 | |
| 173 | using ThriftBuffer = apache::thrift::transport::TMemoryBuffer; |
| 174 | |
| 175 | // Deserialize a thrift message from buf/len. buf/len must at least contain |
| 176 | // all the bytes needed to store the thrift message. On return, len will be |
| 177 | // set to the actual length of the header. |
| 178 | template <class T> |
| 179 | inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) { |
| 180 | // Deserialize msg bytes into c++ thrift msg using memory transport. |
| 181 | shared_ptr<ThriftBuffer> tmem_transport( |
| 182 | new ThriftBuffer(const_cast<uint8_t*>(buf), *len)); |
| 183 | apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory; |
| 184 | shared_ptr<apache::thrift::protocol::TProtocol> tproto = // |
| 185 | tproto_factory.getProtocol(tmem_transport); |
| 186 | try { |
| 187 | deserialized_msg->read(tproto.get()); |
| 188 | } catch (std::exception& e) { |
| 189 | std::stringstream ss; |
| 190 | ss << "Couldn't deserialize thrift: " << e.what() << "\n" ; |
| 191 | throw ParquetException(ss.str()); |
| 192 | } |
| 193 | uint32_t bytes_left = tmem_transport->available_read(); |
| 194 | *len = *len - bytes_left; |
| 195 | } |
| 196 | |
| 197 | /// Utility class to serialize thrift objects to a binary format. This object |
| 198 | /// should be reused if possible to reuse the underlying memory. |
| 199 | /// Note: thrift will encode NULLs into the serialized buffer so it is not valid |
| 200 | /// to treat it as a string. |
| 201 | class ThriftSerializer { |
| 202 | public: |
| 203 | explicit ThriftSerializer(int initial_buffer_size = 1024) |
| 204 | : mem_buffer_(new ThriftBuffer(initial_buffer_size)) { |
| 205 | apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> factory; |
| 206 | protocol_ = factory.getProtocol(mem_buffer_); |
| 207 | } |
| 208 | |
| 209 | /// Serialize obj into a memory buffer. The result is returned in buffer/len. The |
| 210 | /// memory returned is owned by this object and will be invalid when another object |
| 211 | /// is serialized. |
| 212 | template <class T> |
| 213 | void SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) { |
| 214 | SerializeObject(obj); |
| 215 | mem_buffer_->getBuffer(buffer, len); |
| 216 | } |
| 217 | |
| 218 | template <class T> |
| 219 | void SerializeToString(const T* obj, std::string* result) { |
| 220 | SerializeObject(obj); |
| 221 | *result = mem_buffer_->getBufferAsString(); |
| 222 | } |
| 223 | |
| 224 | template <class T> |
| 225 | int64_t Serialize(const T* obj, ArrowOutputStream* out) { |
| 226 | uint8_t* out_buffer; |
| 227 | uint32_t out_length; |
| 228 | SerializeToBuffer(obj, &out_length, &out_buffer); |
| 229 | PARQUET_THROW_NOT_OK(out->Write(out_buffer, out_length)); |
| 230 | return static_cast<int64_t>(out_length); |
| 231 | } |
| 232 | |
| 233 | private: |
| 234 | template <class T> |
| 235 | void SerializeObject(const T* obj) { |
| 236 | try { |
| 237 | mem_buffer_->resetBuffer(); |
| 238 | obj->write(protocol_.get()); |
| 239 | } catch (std::exception& e) { |
| 240 | std::stringstream ss; |
| 241 | ss << "Couldn't serialize thrift: " << e.what() << "\n" ; |
| 242 | throw ParquetException(ss.str()); |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | shared_ptr<ThriftBuffer> mem_buffer_; |
| 247 | shared_ptr<apache::thrift::protocol::TProtocol> protocol_; |
| 248 | }; |
| 249 | |
| 250 | } // namespace parquet |
| 251 | |