1#include <DataStreams/TTLBlockInputStream.h>
2#include <DataTypes/DataTypeDate.h>
3#include <Interpreters/evaluateMissingDefaults.h>
4#include <Interpreters/SyntaxAnalyzer.h>
5#include <Interpreters/ExpressionAnalyzer.h>
6
7namespace DB
8{
9
10namespace ErrorCodes
11{
12 extern const int LOGICAL_ERROR;
13}
14
15
16TTLBlockInputStream::TTLBlockInputStream(
17 const BlockInputStreamPtr & input_,
18 const MergeTreeData & storage_,
19 const MergeTreeData::MutableDataPartPtr & data_part_,
20 time_t current_time_,
21 bool force_)
22 : storage(storage_)
23 , data_part(data_part_)
24 , current_time(current_time_)
25 , force(force_)
26 , old_ttl_infos(data_part->ttl_infos)
27 , log(&Logger::get(storage.getLogName() + " (TTLBlockInputStream)"))
28 , date_lut(DateLUT::instance())
29{
30 children.push_back(input_);
31 header = children.at(0)->getHeader();
32
33 const auto & column_defaults = storage.getColumns().getDefaults();
34 ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
35 for (const auto & [name, ttl_info] : old_ttl_infos.columns_ttl)
36 {
37 if (force || isTTLExpired(ttl_info.min))
38 {
39 new_ttl_infos.columns_ttl.emplace(name, MergeTreeDataPart::TTLInfo{});
40 empty_columns.emplace(name);
41
42 auto it = column_defaults.find(name);
43
44 if (it != column_defaults.end())
45 {
46 auto expression = it->second.expression->clone();
47 default_expr_list->children.emplace_back(setAlias(expression, it->first));
48 }
49 }
50 else
51 new_ttl_infos.columns_ttl.emplace(name, ttl_info);
52 }
53
54 if (!force && !isTTLExpired(old_ttl_infos.table_ttl.min))
55 new_ttl_infos.table_ttl = old_ttl_infos.table_ttl;
56
57 if (!default_expr_list->children.empty())
58 {
59 auto syntax_result = SyntaxAnalyzer(storage.global_context).analyze(
60 default_expr_list, storage.getColumns().getAllPhysical());
61 defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true);
62 }
63}
64
65bool TTLBlockInputStream::isTTLExpired(time_t ttl)
66{
67 return (ttl && (ttl <= current_time));
68}
69
70Block TTLBlockInputStream::readImpl()
71{
72 /// Skip all data if table ttl is expired for part
73 if (storage.hasTableTTL() && isTTLExpired(old_ttl_infos.table_ttl.max))
74 {
75 rows_removed = data_part->rows_count;
76 return {};
77 }
78
79 Block block = children.at(0)->read();
80 if (!block)
81 return block;
82
83 if (storage.hasTableTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
84 removeRowsWithExpiredTableTTL(block);
85
86 removeValuesWithExpiredColumnTTL(block);
87
88 updateMovesTTL(block);
89
90 return block;
91}
92
93void TTLBlockInputStream::readSuffixImpl()
94{
95 for (const auto & elem : new_ttl_infos.columns_ttl)
96 new_ttl_infos.updatePartMinMaxTTL(elem.second.min, elem.second.max);
97
98 new_ttl_infos.updatePartMinMaxTTL(new_ttl_infos.table_ttl.min, new_ttl_infos.table_ttl.max);
99
100 data_part->ttl_infos = std::move(new_ttl_infos);
101 data_part->empty_columns = std::move(empty_columns);
102
103 if (rows_removed)
104 LOG_INFO(log, "Removed " << rows_removed << " rows with expired TTL from part " << data_part->name);
105}
106
107void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
108{
109 storage.ttl_table_entry.expression->execute(block);
110
111 const IColumn * ttl_column =
112 block.getByName(storage.ttl_table_entry.result_column).column.get();
113
114 const auto & column_names = header.getNames();
115 MutableColumns result_columns;
116 result_columns.reserve(column_names.size());
117
118 for (auto it = column_names.begin(); it != column_names.end(); ++it)
119 {
120 const IColumn * values_column = block.getByName(*it).column.get();
121 MutableColumnPtr result_column = values_column->cloneEmpty();
122 result_column->reserve(block.rows());
123
124 for (size_t i = 0; i < block.rows(); ++i)
125 {
126 UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
127 if (!isTTLExpired(cur_ttl))
128 {
129 new_ttl_infos.table_ttl.update(cur_ttl);
130 result_column->insertFrom(*values_column, i);
131 }
132 else if (it == column_names.begin())
133 ++rows_removed;
134 }
135 result_columns.emplace_back(std::move(result_column));
136 }
137
138 block = header.cloneWithColumns(std::move(result_columns));
139}
140
141void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
142{
143 Block block_with_defaults;
144 if (defaults_expression)
145 {
146 block_with_defaults = block;
147 defaults_expression->execute(block_with_defaults);
148 }
149
150 std::vector<String> columns_to_remove;
151 for (const auto & [name, ttl_entry] : storage.column_ttl_entries_by_name)
152 {
153 const auto & old_ttl_info = old_ttl_infos.columns_ttl[name];
154 auto & new_ttl_info = new_ttl_infos.columns_ttl[name];
155
156 /// Nothing to do
157 if (!force && !isTTLExpired(old_ttl_info.min))
158 continue;
159
160 /// Later drop full column
161 if (isTTLExpired(old_ttl_info.max))
162 continue;
163
164 if (!block.has(ttl_entry.result_column))
165 {
166 columns_to_remove.push_back(ttl_entry.result_column);
167 ttl_entry.expression->execute(block);
168 }
169
170 ColumnPtr default_column = nullptr;
171 if (block_with_defaults.has(name))
172 default_column = block_with_defaults.getByName(name).column->convertToFullColumnIfConst();
173
174 auto & column_with_type = block.getByName(name);
175 const IColumn * values_column = column_with_type.column.get();
176 MutableColumnPtr result_column = values_column->cloneEmpty();
177 result_column->reserve(block.rows());
178
179 const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
180
181 for (size_t i = 0; i < block.rows(); ++i)
182 {
183 UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
184 if (isTTLExpired(cur_ttl))
185 {
186 if (default_column)
187 result_column->insertFrom(*default_column, i);
188 else
189 result_column->insertDefault();
190 }
191 else
192 {
193 new_ttl_info.update(cur_ttl);
194 empty_columns.erase(name);
195 result_column->insertFrom(*values_column, i);
196 }
197 }
198 column_with_type.column = std::move(result_column);
199 }
200
201 for (const String & column : columns_to_remove)
202 block.erase(column);
203}
204
205void TTLBlockInputStream::updateMovesTTL(Block & block)
206{
207 std::vector<String> columns_to_remove;
208 for (const auto & ttl_entry : storage.move_ttl_entries)
209 {
210 auto & new_ttl_info = new_ttl_infos.moves_ttl[ttl_entry.result_column];
211
212 if (!block.has(ttl_entry.result_column))
213 {
214 columns_to_remove.push_back(ttl_entry.result_column);
215 ttl_entry.expression->execute(block);
216 }
217
218 const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
219
220 for (size_t i = 0; i < block.rows(); ++i)
221 {
222 UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
223 new_ttl_info.update(cur_ttl);
224 }
225 }
226
227 for (const String & column : columns_to_remove)
228 block.erase(column);
229}
230
231UInt32 TTLBlockInputStream::getTimestampByIndex(const IColumn * column, size_t ind)
232{
233 if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(column))
234 return date_lut.fromDayNum(DayNum(column_date->getData()[ind]));
235 else if (const ColumnUInt32 * column_date_time = typeid_cast<const ColumnUInt32 *>(column))
236 return column_date_time->getData()[ind];
237 else if (const ColumnConst * column_const = typeid_cast<const ColumnConst *>(column))
238 {
239 if (typeid_cast<const ColumnUInt16 *>(&column_const->getDataColumn()))
240 return date_lut.fromDayNum(DayNum(column_const->getValue<UInt16>()));
241 else if (typeid_cast<const ColumnUInt32 *>(&column_const->getDataColumn()))
242 return column_const->getValue<UInt32>();
243 }
244
245 throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
246}
247
248}
249