1#pragma once
2
3#include <DataTypes/DataTypesNumber.h>
4#include <DataStreams/IBlockOutputStream.h>
5#include <Storages/LiveView/StorageLiveView.h>
6
7
8namespace DB
9{
10
11class LiveViewBlockOutputStream : public IBlockOutputStream
12{
13public:
14 explicit LiveViewBlockOutputStream(StorageLiveView & storage_) : storage(storage_) {}
15
16 void writePrefix() override
17 {
18 new_blocks = std::make_shared<Blocks>();
19 new_blocks_metadata = std::make_shared<BlocksMetadata>();
20 new_hash = std::make_shared<SipHash>();
21 }
22
23 void writeSuffix() override
24 {
25 UInt128 key;
26 String key_str;
27
28 new_hash->get128(key.low, key.high);
29 key_str = key.toHexString();
30
31 std::lock_guard lock(storage.mutex);
32
33 if (storage.getBlocksHashKey() != key_str)
34 {
35 new_blocks_metadata->hash = key_str;
36 new_blocks_metadata->version = storage.getBlocksVersion() + 1;
37
38 for (auto & block : *new_blocks)
39 {
40 block.insert({DataTypeUInt64().createColumnConst(
41 block.rows(), new_blocks_metadata->version)->convertToFullColumnIfConst(),
42 std::make_shared<DataTypeUInt64>(),
43 "_version"});
44 }
45
46 (*storage.blocks_ptr) = new_blocks;
47 (*storage.blocks_metadata_ptr) = new_blocks_metadata;
48
49 storage.condition.notify_all();
50 }
51
52 new_blocks.reset();
53 new_blocks_metadata.reset();
54 new_hash.reset();
55 }
56
57 void write(const Block & block) override
58 {
59 new_blocks->push_back(block);
60 block.updateHash(*new_hash);
61 }
62
63 Block getHeader() const override { return storage.getHeader(); }
64
65private:
66 using SipHashPtr = std::shared_ptr<SipHash>;
67
68 BlocksPtr new_blocks;
69 BlocksMetadataPtr new_blocks_metadata;
70 SipHashPtr new_hash;
71 StorageLiveView & storage;
72};
73
74}
75