1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include <gtest/gtest.h> |
19 | |
20 | #include <cstdint> |
21 | #include <limits> |
22 | #include <memory> |
23 | #include <random> |
24 | #include <string> |
25 | #include <vector> |
26 | |
27 | #include "arrow/buffer.h" |
28 | #include "arrow/io/file.h" |
29 | #include "arrow/status.h" |
30 | |
31 | #include "parquet/bloom_filter.h" |
32 | #include "parquet/exception.h" |
33 | #include "parquet/murmur3.h" |
34 | #include "parquet/types.h" |
35 | #include "parquet/util/memory.h" |
36 | #include "parquet/util/test-common.h" |
37 | |
38 | namespace parquet { |
39 | namespace test { |
40 | TEST(Murmur3Test, TestBloomFilter) { |
41 | uint64_t result; |
42 | const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7}; |
43 | ByteArray byteArray(8, bitset); |
44 | MurmurHash3 murmur3; |
45 | result = murmur3.Hash(&byteArray); |
46 | EXPECT_EQ(result, UINT64_C(913737700387071329)); |
47 | } |
48 | |
49 | TEST(ConstructorTest, TestBloomFilter) { |
50 | BlockSplitBloomFilter bloom_filter; |
51 | EXPECT_NO_THROW(bloom_filter.Init(1000)); |
52 | |
53 | // It throws because the length cannot be zero |
54 | std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]()); |
55 | EXPECT_THROW(bloom_filter.Init(bitset1.get(), 0), ParquetException); |
56 | |
57 | // It throws because the number of bytes of Bloom filter bitset must be a power of 2. |
58 | std::unique_ptr<uint8_t[]> bitset2(new uint8_t[1024]()); |
59 | EXPECT_THROW(bloom_filter.Init(bitset2.get(), 1023), ParquetException); |
60 | } |
61 | |
62 | // The BasicTest is used to test basic operations including InsertHash, FindHash and |
63 | // serializing and de-serializing. |
64 | TEST(BasicTest, TestBloomFilter) { |
65 | BlockSplitBloomFilter bloom_filter; |
66 | bloom_filter.Init(1024); |
67 | |
68 | for (int i = 0; i < 10; i++) { |
69 | bloom_filter.InsertHash(bloom_filter.Hash(i)); |
70 | } |
71 | |
72 | for (int i = 0; i < 10; i++) { |
73 | EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i))); |
74 | } |
75 | |
76 | // Serialize Bloom filter to memory output stream |
77 | InMemoryOutputStream sink; |
78 | bloom_filter.WriteTo(&sink); |
79 | |
80 | // Deserialize Bloom filter from memory |
81 | InMemoryInputStream source(sink.GetBuffer()); |
82 | |
83 | BlockSplitBloomFilter de_bloom = BlockSplitBloomFilter::Deserialize(&source); |
84 | |
85 | for (int i = 0; i < 10; i++) { |
86 | EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i))); |
87 | } |
88 | } |
89 | |
90 | // Helper function to generate random string. |
91 | std::string GetRandomString(uint32_t length) { |
92 | // Character set used to generate random string |
93 | const std::string charset = |
94 | "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" ; |
95 | |
96 | std::default_random_engine gen(42); |
97 | std::uniform_int_distribution<uint32_t> dist(0, static_cast<int>(charset.size() - 1)); |
98 | std::string ret(length, 'x'); |
99 | |
100 | for (uint32_t i = 0; i < length; i++) { |
101 | ret[i] = charset[dist(gen)]; |
102 | } |
103 | return ret; |
104 | } |
105 | |
106 | TEST(FPPTest, TestBloomFilter) { |
107 | // It counts the number of times FindHash returns true. |
108 | int exist = 0; |
109 | |
110 | // Total count of elements that will be used |
111 | #ifdef PARQUET_VALGRIND |
112 | const int total_count = 5000; |
113 | #else |
114 | const int total_count = 100000; |
115 | #endif |
116 | |
117 | // Bloom filter fpp parameter |
118 | const double fpp = 0.01; |
119 | |
120 | std::vector<std::string> members; |
121 | BlockSplitBloomFilter bloom_filter; |
122 | bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBits(total_count, fpp)); |
123 | |
124 | // Insert elements into the Bloom filter |
125 | for (int i = 0; i < total_count; i++) { |
126 | // Insert random string which length is 8 |
127 | std::string tmp = GetRandomString(8); |
128 | const ByteArray byte_array(8, reinterpret_cast<const uint8_t*>(tmp.c_str())); |
129 | members.push_back(tmp); |
130 | bloom_filter.InsertHash(bloom_filter.Hash(&byte_array)); |
131 | } |
132 | |
133 | for (int i = 0; i < total_count; i++) { |
134 | const ByteArray byte_array1(8, reinterpret_cast<const uint8_t*>(members[i].c_str())); |
135 | ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1))); |
136 | std::string tmp = GetRandomString(7); |
137 | const ByteArray byte_array2(7, reinterpret_cast<const uint8_t*>(tmp.c_str())); |
138 | |
139 | if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) { |
140 | exist++; |
141 | } |
142 | } |
143 | |
144 | // The exist should be probably less than 1000 according default FPP 0.01. |
145 | EXPECT_LT(exist, total_count * fpp); |
146 | } |
147 | |
148 | // The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads |
149 | // the Bloom filter binary generated by the Bloom filter class in the parquet-mr project |
150 | // and tests whether the values inserted before could be filtered or not. |
151 | |
152 | // The Bloom filter binary is generated by three steps in from Parquet-mr. |
153 | // Step 1: Construct a Bloom filter with 1024 bytes bitset. |
154 | // Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter. |
155 | // Step 3: Call writeTo API to write to File. |
156 | TEST(CompatibilityTest, TestBloomFilter) { |
157 | const std::string test_string[4] = {"hello" , "parquet" , "bloom" , "filter" }; |
158 | const std::string bloom_filter_test_binary = |
159 | std::string(test::get_data_dir()) + "/bloom_filter.bin" ; |
160 | std::shared_ptr<::arrow::io::ReadableFile> handle; |
161 | |
162 | int64_t size; |
163 | PARQUET_THROW_NOT_OK( |
164 | ::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle)); |
165 | PARQUET_THROW_NOT_OK(handle->GetSize(&size)); |
166 | |
167 | // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length) |
168 | EXPECT_EQ(size, 1036); |
169 | |
170 | std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]()); |
171 | std::shared_ptr<Buffer> buffer(new Buffer(bitset.get(), size)); |
172 | PARQUET_THROW_NOT_OK(handle->Read(size, &buffer)); |
173 | |
174 | InMemoryInputStream source(buffer); |
175 | BlockSplitBloomFilter bloom_filter1 = BlockSplitBloomFilter::Deserialize(&source); |
176 | |
177 | for (int i = 0; i < 4; i++) { |
178 | const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()), |
179 | reinterpret_cast<const uint8_t*>(test_string[i].c_str())); |
180 | EXPECT_TRUE(bloom_filter1.FindHash(bloom_filter1.Hash(&tmp))); |
181 | } |
182 | |
183 | // The following is used to check whether the new created Bloom filter in parquet-cpp is |
184 | // byte-for-byte identical to file at bloom_data_path which is created from parquet-mr |
185 | // with same inserted hashes. |
186 | BlockSplitBloomFilter bloom_filter2; |
187 | bloom_filter2.Init(bloom_filter1.GetBitsetSize()); |
188 | for (int i = 0; i < 4; i++) { |
189 | const ByteArray byte_array(static_cast<uint32_t>(test_string[i].length()), |
190 | reinterpret_cast<const uint8_t*>(test_string[i].c_str())); |
191 | bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array)); |
192 | } |
193 | |
194 | // Serialize Bloom filter to memory output stream |
195 | InMemoryOutputStream sink; |
196 | bloom_filter2.WriteTo(&sink); |
197 | std::shared_ptr<Buffer> buffer1 = sink.GetBuffer(); |
198 | |
199 | PARQUET_THROW_NOT_OK(handle->Seek(0)); |
200 | PARQUET_THROW_NOT_OK(handle->GetSize(&size)); |
201 | std::shared_ptr<Buffer> buffer2; |
202 | PARQUET_THROW_NOT_OK(handle->Read(size, &buffer2)); |
203 | |
204 | EXPECT_TRUE((*buffer1).Equals(*buffer2)); |
205 | } |
206 | |
207 | // OptmialValueTest is used to test whether OptimalNumOfBits returns expected |
208 | // numbers according to formula: |
209 | // num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0)) |
210 | // where ndv is the number of distinct values and fpp is the false positive probability. |
211 | // Also it is used to test whether OptimalNumOfBits returns value between |
212 | // [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE]. |
213 | TEST(OptimalValueTest, TestBloomFilter) { |
214 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096)); |
215 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192)); |
216 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384)); |
217 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768)); |
218 | |
219 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048)); |
220 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096)); |
221 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192)); |
222 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384)); |
223 | |
224 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048)); |
225 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096)); |
226 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192)); |
227 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384)); |
228 | |
229 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048)); |
230 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096)); |
231 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192)); |
232 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384)); |
233 | |
234 | // Boundary check |
235 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.01), UINT32_C(256)); |
236 | EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.25), UINT32_C(256)); |
237 | |
238 | EXPECT_EQ( |
239 | BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.01), |
240 | UINT32_C(1073741824)); |
241 | EXPECT_EQ( |
242 | BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.25), |
243 | UINT32_C(1073741824)); |
244 | } |
245 | |
246 | } // namespace test |
247 | |
248 | } // namespace parquet |
249 | |