1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <algorithm>
19#include <cmath>
20#include <cstdint>
21
22#include "arrow/status.h"
23#include "arrow/util/bit-util.h"
24#include "arrow/util/logging.h"
25#include "parquet/bloom_filter.h"
26#include "parquet/exception.h"
27#include "parquet/murmur3.h"
28#include "parquet/types.h"
29
30namespace parquet {
31constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
32
33BlockSplitBloomFilter::BlockSplitBloomFilter()
34 : pool_(::arrow::default_memory_pool()),
35 hash_strategy_(HashStrategy::MURMUR3_X64_128),
36 algorithm_(Algorithm::BLOCK) {}
37
38void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
39 if (num_bytes < kMinimumBloomFilterBytes) {
40 num_bytes = kMinimumBloomFilterBytes;
41 }
42
43 // Get next power of 2 if it is not power of 2.
44 if ((num_bytes & (num_bytes - 1)) != 0) {
45 num_bytes = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bytes));
46 }
47
48 if (num_bytes > kMaximumBloomFilterBytes) {
49 num_bytes = kMaximumBloomFilterBytes;
50 }
51
52 num_bytes_ = num_bytes;
53 PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
54 memset(data_->mutable_data(), 0, num_bytes_);
55
56 this->hasher_.reset(new MurmurHash3());
57}
58
59void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
60 DCHECK(bitset != nullptr);
61
62 if (num_bytes < kMinimumBloomFilterBytes || num_bytes > kMaximumBloomFilterBytes ||
63 (num_bytes & (num_bytes - 1)) != 0) {
64 throw ParquetException("Given length of bitset is illegal");
65 }
66
67 num_bytes_ = num_bytes;
68 PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
69 memcpy(data_->mutable_data(), bitset, num_bytes_);
70
71 this->hasher_.reset(new MurmurHash3());
72}
73
74BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(InputStream* input) {
75 int64_t bytes_available;
76
77 const uint8_t* read_buffer = NULL;
78 read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
79 if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
80 throw ParquetException("Failed to deserialize from input stream");
81 }
82 uint32_t len;
83 memcpy(&len, read_buffer, sizeof(uint32_t));
84
85 read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
86 if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
87 throw ParquetException("Failed to deserialize from input stream");
88 }
89 uint32_t hash;
90 memcpy(&hash, read_buffer, sizeof(uint32_t));
91 if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
92 throw ParquetException("Unsupported hash strategy");
93 }
94
95 read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
96 if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
97 throw ParquetException("Failed to deserialize from input stream");
98 }
99 uint32_t algorithm;
100 memcpy(&algorithm, read_buffer, sizeof(uint32_t));
101 if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
102 throw ParquetException("Unsupported Bloom filter algorithm");
103 }
104
105 BlockSplitBloomFilter bloom_filter;
106 bloom_filter.Init(input->Read(len, &bytes_available), len);
107 return bloom_filter;
108}
109
110void BlockSplitBloomFilter::WriteTo(OutputStream* sink) const {
111 DCHECK(sink != nullptr);
112
113 sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), sizeof(num_bytes_));
114 sink->Write(reinterpret_cast<const uint8_t*>(&hash_strategy_), sizeof(hash_strategy_));
115 sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), sizeof(algorithm_));
116 sink->Write(data_->mutable_data(), num_bytes_);
117}
118
119void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const {
120 for (int i = 0; i < kBitsSetPerBlock; ++i) {
121 block_mask.item[i] = key * SALT[i];
122 }
123
124 for (int i = 0; i < kBitsSetPerBlock; ++i) {
125 block_mask.item[i] = block_mask.item[i] >> 27;
126 }
127
128 for (int i = 0; i < kBitsSetPerBlock; ++i) {
129 block_mask.item[i] = UINT32_C(0x1) << block_mask.item[i];
130 }
131}
132
133bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
134 const uint32_t bucket_index =
135 static_cast<uint32_t>((hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1));
136 uint32_t key = static_cast<uint32_t>(hash);
137 uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
138
139 // Calculate mask for bucket.
140 BlockMask block_mask;
141 SetMask(key, block_mask);
142
143 for (int i = 0; i < kBitsSetPerBlock; ++i) {
144 if (0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & block_mask.item[i])) {
145 return false;
146 }
147 }
148 return true;
149}
150
151void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
152 const uint32_t bucket_index =
153 static_cast<uint32_t>(hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1);
154 uint32_t key = static_cast<uint32_t>(hash);
155 uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
156
157 // Calculate mask for bucket.
158 BlockMask block_mask;
159 SetMask(key, block_mask);
160
161 for (int i = 0; i < kBitsSetPerBlock; i++) {
162 bitset32[bucket_index * kBitsSetPerBlock + i] |= block_mask.item[i];
163 }
164}
165
166} // namespace parquet
167