1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <gtest/gtest.h>
19
20#include <cstdint>
21#include <limits>
22#include <memory>
23#include <random>
24#include <string>
25#include <vector>
26
27#include "arrow/buffer.h"
28#include "arrow/io/file.h"
29#include "arrow/status.h"
30
31#include "parquet/bloom_filter.h"
32#include "parquet/exception.h"
33#include "parquet/murmur3.h"
34#include "parquet/types.h"
35#include "parquet/util/memory.h"
36#include "parquet/util/test-common.h"
37
38namespace parquet {
39namespace test {
40TEST(Murmur3Test, TestBloomFilter) {
41 uint64_t result;
42 const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
43 ByteArray byteArray(8, bitset);
44 MurmurHash3 murmur3;
45 result = murmur3.Hash(&byteArray);
46 EXPECT_EQ(result, UINT64_C(913737700387071329));
47}
48
49TEST(ConstructorTest, TestBloomFilter) {
50 BlockSplitBloomFilter bloom_filter;
51 EXPECT_NO_THROW(bloom_filter.Init(1000));
52
53 // It throws because the length cannot be zero
54 std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
55 EXPECT_THROW(bloom_filter.Init(bitset1.get(), 0), ParquetException);
56
57 // It throws because the number of bytes of Bloom filter bitset must be a power of 2.
58 std::unique_ptr<uint8_t[]> bitset2(new uint8_t[1024]());
59 EXPECT_THROW(bloom_filter.Init(bitset2.get(), 1023), ParquetException);
60}
61
62// The BasicTest is used to test basic operations including InsertHash, FindHash and
63// serializing and de-serializing.
64TEST(BasicTest, TestBloomFilter) {
65 BlockSplitBloomFilter bloom_filter;
66 bloom_filter.Init(1024);
67
68 for (int i = 0; i < 10; i++) {
69 bloom_filter.InsertHash(bloom_filter.Hash(i));
70 }
71
72 for (int i = 0; i < 10; i++) {
73 EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i)));
74 }
75
76 // Serialize Bloom filter to memory output stream
77 InMemoryOutputStream sink;
78 bloom_filter.WriteTo(&sink);
79
80 // Deserialize Bloom filter from memory
81 InMemoryInputStream source(sink.GetBuffer());
82
83 BlockSplitBloomFilter de_bloom = BlockSplitBloomFilter::Deserialize(&source);
84
85 for (int i = 0; i < 10; i++) {
86 EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i)));
87 }
88}
89
90// Helper function to generate random string.
91std::string GetRandomString(uint32_t length) {
92 // Character set used to generate random string
93 const std::string charset =
94 "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
95
96 std::default_random_engine gen(42);
97 std::uniform_int_distribution<uint32_t> dist(0, static_cast<int>(charset.size() - 1));
98 std::string ret(length, 'x');
99
100 for (uint32_t i = 0; i < length; i++) {
101 ret[i] = charset[dist(gen)];
102 }
103 return ret;
104}
105
106TEST(FPPTest, TestBloomFilter) {
107 // It counts the number of times FindHash returns true.
108 int exist = 0;
109
110 // Total count of elements that will be used
111#ifdef PARQUET_VALGRIND
112 const int total_count = 5000;
113#else
114 const int total_count = 100000;
115#endif
116
117 // Bloom filter fpp parameter
118 const double fpp = 0.01;
119
120 std::vector<std::string> members;
121 BlockSplitBloomFilter bloom_filter;
122 bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBits(total_count, fpp));
123
124 // Insert elements into the Bloom filter
125 for (int i = 0; i < total_count; i++) {
126 // Insert random string which length is 8
127 std::string tmp = GetRandomString(8);
128 const ByteArray byte_array(8, reinterpret_cast<const uint8_t*>(tmp.c_str()));
129 members.push_back(tmp);
130 bloom_filter.InsertHash(bloom_filter.Hash(&byte_array));
131 }
132
133 for (int i = 0; i < total_count; i++) {
134 const ByteArray byte_array1(8, reinterpret_cast<const uint8_t*>(members[i].c_str()));
135 ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1)));
136 std::string tmp = GetRandomString(7);
137 const ByteArray byte_array2(7, reinterpret_cast<const uint8_t*>(tmp.c_str()));
138
139 if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) {
140 exist++;
141 }
142 }
143
144 // The exist should be probably less than 1000 according default FPP 0.01.
145 EXPECT_LT(exist, total_count * fpp);
146}
147
148// The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads
149// the Bloom filter binary generated by the Bloom filter class in the parquet-mr project
150// and tests whether the values inserted before could be filtered or not.
151
152// The Bloom filter binary is generated by three steps in from Parquet-mr.
153// Step 1: Construct a Bloom filter with 1024 bytes bitset.
154// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter.
155// Step 3: Call writeTo API to write to File.
156TEST(CompatibilityTest, TestBloomFilter) {
157 const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"};
158 const std::string bloom_filter_test_binary =
159 std::string(test::get_data_dir()) + "/bloom_filter.bin";
160 std::shared_ptr<::arrow::io::ReadableFile> handle;
161
162 int64_t size;
163 PARQUET_THROW_NOT_OK(
164 ::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle));
165 PARQUET_THROW_NOT_OK(handle->GetSize(&size));
166
167 // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length)
168 EXPECT_EQ(size, 1036);
169
170 std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]());
171 std::shared_ptr<Buffer> buffer(new Buffer(bitset.get(), size));
172 PARQUET_THROW_NOT_OK(handle->Read(size, &buffer));
173
174 InMemoryInputStream source(buffer);
175 BlockSplitBloomFilter bloom_filter1 = BlockSplitBloomFilter::Deserialize(&source);
176
177 for (int i = 0; i < 4; i++) {
178 const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()),
179 reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
180 EXPECT_TRUE(bloom_filter1.FindHash(bloom_filter1.Hash(&tmp)));
181 }
182
183 // The following is used to check whether the new created Bloom filter in parquet-cpp is
184 // byte-for-byte identical to file at bloom_data_path which is created from parquet-mr
185 // with same inserted hashes.
186 BlockSplitBloomFilter bloom_filter2;
187 bloom_filter2.Init(bloom_filter1.GetBitsetSize());
188 for (int i = 0; i < 4; i++) {
189 const ByteArray byte_array(static_cast<uint32_t>(test_string[i].length()),
190 reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
191 bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array));
192 }
193
194 // Serialize Bloom filter to memory output stream
195 InMemoryOutputStream sink;
196 bloom_filter2.WriteTo(&sink);
197 std::shared_ptr<Buffer> buffer1 = sink.GetBuffer();
198
199 PARQUET_THROW_NOT_OK(handle->Seek(0));
200 PARQUET_THROW_NOT_OK(handle->GetSize(&size));
201 std::shared_ptr<Buffer> buffer2;
202 PARQUET_THROW_NOT_OK(handle->Read(size, &buffer2));
203
204 EXPECT_TRUE((*buffer1).Equals(*buffer2));
205}
206
207// OptmialValueTest is used to test whether OptimalNumOfBits returns expected
208// numbers according to formula:
209// num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
210// where ndv is the number of distinct values and fpp is the false positive probability.
211// Also it is used to test whether OptimalNumOfBits returns value between
212// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
213TEST(OptimalValueTest, TestBloomFilter) {
214 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096));
215 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192));
216 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384));
217 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768));
218
219 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048));
220 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096));
221 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192));
222 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384));
223
224 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048));
225 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096));
226 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192));
227 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384));
228
229 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048));
230 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096));
231 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192));
232 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384));
233
234 // Boundary check
235 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.01), UINT32_C(256));
236 EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.25), UINT32_C(256));
237
238 EXPECT_EQ(
239 BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.01),
240 UINT32_C(1073741824));
241 EXPECT_EQ(
242 BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.25),
243 UINT32_C(1073741824));
244}
245
246} // namespace test
247
248} // namespace parquet
249