1#pragma once
2#include <DataStreams/IBlockInputStream.h>
3#include <Storages/MergeTree/MergeTreeData.h>
4#include <Storages/MergeTree/MergeTreeDataPart.h>
5#include <Core/Block.h>
6
7#include <common/DateLUT.h>
8
9namespace DB
10{
11
12class TTLBlockInputStream : public IBlockInputStream
13{
14public:
15 TTLBlockInputStream(
16 const BlockInputStreamPtr & input_,
17 const MergeTreeData & storage_,
18 const MergeTreeData::MutableDataPartPtr & data_part_,
19 time_t current_time,
20 bool force_
21 );
22
23 String getName() const override { return "TTL"; }
24
25 Block getHeader() const override { return header; }
26
27protected:
28 Block readImpl() override;
29
30 /// Finalizes ttl infos and updates data part
31 void readSuffixImpl() override;
32
33private:
34 const MergeTreeData & storage;
35
36 /// ttl_infos and empty_columns are updating while reading
37 const MergeTreeData::MutableDataPartPtr & data_part;
38
39 time_t current_time;
40 bool force;
41
42 MergeTreeDataPart::TTLInfos old_ttl_infos;
43 MergeTreeDataPart::TTLInfos new_ttl_infos;
44 NameSet empty_columns;
45
46 size_t rows_removed = 0;
47 Logger * log;
48 DateLUTImpl date_lut;
49
50 std::unordered_map<String, String> defaults_result_column;
51 ExpressionActionsPtr defaults_expression;
52
53 Block header;
54private:
55 /// Removes values with expired ttl and computes new_ttl_infos and empty_columns for part
56 void removeValuesWithExpiredColumnTTL(Block & block);
57
58 /// Removes rows with expired table ttl and computes new ttl_infos for part
59 void removeRowsWithExpiredTableTTL(Block & block);
60
61 /// Updates TTL for moves
62 void updateMovesTTL(Block & block);
63
64 UInt32 getTimestampByIndex(const IColumn * column, size_t ind);
65 bool isTTLExpired(time_t ttl);
66};
67
68}
69