1#include <Interpreters/RowRefs.h>
2
3#include <Core/Block.h>
4#include <Core/Types.h>
5#include <Common/typeid_cast.h>
6#include <Common/ColumnsHashing.h>
7#include <Columns/IColumn.h>
8#include <Columns/ColumnVector.h>
9#include <Columns/ColumnDecimal.h>
10
11
12namespace DB
13{
14
15namespace
16{
17
18/// maps enum values to types
19template <typename F>
20void callWithType(AsofRowRefs::Type which, F && f)
21{
22 switch (which)
23 {
24 case AsofRowRefs::Type::keyu32: return f(UInt32());
25 case AsofRowRefs::Type::keyu64: return f(UInt64());
26 case AsofRowRefs::Type::keyi32: return f(Int32());
27 case AsofRowRefs::Type::keyi64: return f(Int64());
28 case AsofRowRefs::Type::keyf32: return f(Float32());
29 case AsofRowRefs::Type::keyf64: return f(Float64());
30 case AsofRowRefs::Type::keyDecimal32: return f(Decimal32());
31 case AsofRowRefs::Type::keyDecimal64: return f(Decimal64());
32 case AsofRowRefs::Type::keyDecimal128: return f(Decimal128());
33 }
34
35 __builtin_unreachable();
36}
37
38}
39
40
41AsofRowRefs::AsofRowRefs(Type type)
42{
43 auto call = [&](const auto & t)
44 {
45 using T = std::decay_t<decltype(t)>;
46 using LookupType = typename Entry<T>::LookupType;
47 lookups = std::make_unique<LookupType>();
48 };
49
50 callWithType(type, call);
51}
52
53void AsofRowRefs::insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num)
54{
55 auto call = [&](const auto & t)
56 {
57 using T = std::decay_t<decltype(t)>;
58 using LookupPtr = typename Entry<T>::LookupPtr;
59
60 using ColumnType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
61 auto * column = typeid_cast<const ColumnType *>(asof_column);
62
63 T key = column->getElement(row_num);
64 auto entry = Entry<T>(key, RowRef(block, row_num));
65 std::get<LookupPtr>(lookups)->insert(entry);
66 };
67
68 callWithType(type, call);
69}
70
71const RowRef * AsofRowRefs::findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const
72{
73 const RowRef * out = nullptr;
74
75 bool ascending = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::LessOrEquals);
76 bool is_strict = (inequality == ASOF::Inequality::Less) || (inequality == ASOF::Inequality::Greater);
77
78 auto call = [&](const auto & t)
79 {
80 using T = std::decay_t<decltype(t)>;
81 using EntryType = Entry<T>;
82 using LookupPtr = typename EntryType::LookupPtr;
83
84 using ColumnType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
85 auto * column = typeid_cast<const ColumnType *>(asof_column);
86 T key = column->getElement(row_num);
87 auto & typed_lookup = std::get<LookupPtr>(lookups);
88
89 if (is_strict)
90 out = typed_lookup->upperBound(EntryType(key), ascending);
91 else
92 out = typed_lookup->lowerBound(EntryType(key), ascending);
93 };
94
95 callWithType(type, call);
96 return out;
97}
98
99std::optional<AsofRowRefs::Type> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
100{
101 if (typeid_cast<const ColumnVector<UInt32> *>(asof_column))
102 {
103 size = sizeof(UInt32);
104 return Type::keyu32;
105 }
106 else if (typeid_cast<const ColumnVector<UInt64> *>(asof_column))
107 {
108 size = sizeof(UInt64);
109 return Type::keyu64;
110 }
111 else if (typeid_cast<const ColumnVector<Int32> *>(asof_column))
112 {
113 size = sizeof(Int32);
114 return Type::keyi32;
115 }
116 else if (typeid_cast<const ColumnVector<Int64> *>(asof_column))
117 {
118 size = sizeof(Int64);
119 return Type::keyi64;
120 }
121 else if (typeid_cast<const ColumnVector<Float32> *>(asof_column))
122 {
123 size = sizeof(Float32);
124 return Type::keyf32;
125 }
126 else if (typeid_cast<const ColumnVector<Float64> *>(asof_column))
127 {
128 size = sizeof(Float64);
129 return Type::keyf64;
130 }
131 else if (typeid_cast<const ColumnDecimal<Decimal32> *>(asof_column))
132 {
133 size = sizeof(Decimal32);
134 return Type::keyDecimal32;
135 }
136 else if (typeid_cast<const ColumnDecimal<Decimal64> *>(asof_column))
137 {
138 size = sizeof(Decimal64);
139 return Type::keyDecimal64;
140 }
141 else if (typeid_cast<const ColumnDecimal<Decimal128> *>(asof_column))
142 {
143 size = sizeof(Decimal128);
144 return Type::keyDecimal128;
145 }
146
147 size = 0;
148 return {};
149}
150
151}
152