1#include <Storages/MergeTree/MergeTreeIndexGranuleBloomFilter.h>
2#include <Columns/ColumnArray.h>
3#include <Columns/ColumnString.h>
4#include <Columns/ColumnNullable.h>
5#include <Columns/ColumnFixedString.h>
6#include <DataTypes/DataTypeNullable.h>
7#include <Common/HashTable/Hash.h>
8#include <ext/bit_cast.h>
9#include <Interpreters/BloomFilterHash.h>
10
11
12namespace DB
13{
14
15MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(size_t bits_per_row_, size_t hash_functions_, size_t index_columns_)
16 : bits_per_row(bits_per_row_), hash_functions(hash_functions_)
17{
18 total_rows = 0;
19 bloom_filters.resize(index_columns_);
20}
21
22MergeTreeIndexGranuleBloomFilter::MergeTreeIndexGranuleBloomFilter(
23 size_t bits_per_row_, size_t hash_functions_, size_t total_rows_, const Blocks & granule_index_blocks_)
24 : total_rows(total_rows_), bits_per_row(bits_per_row_), hash_functions(hash_functions_)
25{
26 if (granule_index_blocks_.empty() || !total_rows)
27 throw Exception("LOGICAL ERROR: granule_index_blocks empty or total_rows is zero.", ErrorCodes::LOGICAL_ERROR);
28
29 assertGranuleBlocksStructure(granule_index_blocks_);
30
31 for (size_t index = 0; index < granule_index_blocks_.size(); ++index)
32 {
33 Block granule_index_block = granule_index_blocks_[index];
34
35 if (unlikely(!granule_index_block || !granule_index_block.rows()))
36 throw Exception("LOGICAL ERROR: granule_index_block is empty.", ErrorCodes::LOGICAL_ERROR);
37
38 if (index == 0)
39 {
40 static size_t atom_size = 8;
41
42 for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
43 {
44 size_t total_items = total_rows;
45
46 if (const auto * array_col = typeid_cast<const ColumnArray *>(granule_index_block.getByPosition(column).column.get()))
47 {
48 const IColumn * nested_col = array_col->getDataPtr().get();
49 total_items = nested_col->size();
50 }
51
52 size_t bytes_size = (bits_per_row * total_items + atom_size - 1) / atom_size;
53 bloom_filters.emplace_back(std::make_shared<BloomFilter>(bytes_size, hash_functions, 0));
54 }
55 }
56
57 for (size_t column = 0, columns = granule_index_block.columns(); column < columns; ++column)
58 fillingBloomFilter(bloom_filters[column], granule_index_block, column);
59 }
60}
61
62bool MergeTreeIndexGranuleBloomFilter::empty() const
63{
64 return !total_rows;
65}
66
67void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr)
68{
69 if (!empty())
70 throw Exception("Cannot read data to a non-empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
71
72 readVarUInt(total_rows, istr);
73 for (size_t index = 0; index < bloom_filters.size(); ++index)
74 {
75 static size_t atom_size = 8;
76 size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
77 bloom_filters[index] = std::make_shared<BloomFilter>(bytes_size, hash_functions, 0);
78 istr.read(reinterpret_cast<char *>(bloom_filters[index]->getFilter().data()), bytes_size);
79 }
80}
81
82void MergeTreeIndexGranuleBloomFilter::serializeBinary(WriteBuffer & ostr) const
83{
84 if (empty())
85 throw Exception("Attempt to write empty bloom filter index.", ErrorCodes::LOGICAL_ERROR);
86
87 static size_t atom_size = 8;
88 writeVarUInt(total_rows, ostr);
89 size_t bytes_size = (bits_per_row * total_rows + atom_size - 1) / atom_size;
90 for (const auto & bloom_filter : bloom_filters)
91 ostr.write(reinterpret_cast<const char *>(bloom_filter->getFilter().data()), bytes_size);
92}
93
94void MergeTreeIndexGranuleBloomFilter::assertGranuleBlocksStructure(const Blocks & granule_index_blocks) const
95{
96 Block prev_block;
97 for (size_t index = 0; index < granule_index_blocks.size(); ++index)
98 {
99 Block granule_index_block = granule_index_blocks[index];
100
101 if (index != 0)
102 assertBlocksHaveEqualStructure(prev_block, granule_index_block, "Granule blocks of bloom filter has difference structure.");
103
104 prev_block = granule_index_block;
105 }
106}
107
108void MergeTreeIndexGranuleBloomFilter::fillingBloomFilter(BloomFilterPtr & bf, const Block & granule_index_block, size_t index_hash_column)
109{
110 const auto & column = granule_index_block.getByPosition(index_hash_column);
111
112 if (const auto hash_column = typeid_cast<const ColumnUInt64 *>(column.column.get()))
113 {
114 const auto & hash_column_vec = hash_column->getData();
115
116 for (size_t index = 0, size = hash_column_vec.size(); index < size; ++index)
117 {
118 const UInt64 & bf_base_hash = hash_column_vec[index];
119
120 for (size_t i = 0; i < hash_functions; ++i)
121 bf->addHashWithSeed(bf_base_hash, BloomFilterHash::bf_hash_seed[i]);
122 }
123 }
124}
125
126}
127