1#pragma once
2
3#include <DataStreams/IBlockOutputStream.h>
4#include <Storages/MergeTree/MergeTreeData.h>
5#include <Core/Types.h>
6
7
8namespace Poco { class Logger; }
9
10namespace zkutil
11{
12 class ZooKeeper;
13 using ZooKeeperPtr = std::shared_ptr<ZooKeeper>;
14}
15
16namespace DB
17{
18
19class StorageReplicatedMergeTree;
20
21
22class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
23{
24public:
25 ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
26 size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_,
27 bool deduplicate_);
28
29 Block getHeader() const override;
30 void writePrefix() override;
31 void write(const Block & block) override;
32
33 /// For ATTACHing existing data on filesystem.
34 void writeExistingPart(MergeTreeData::MutableDataPartPtr & part);
35
36 /// For proper deduplication in MaterializedViews
37 bool lastBlockIsDuplicate() const
38 {
39 return last_block_is_duplicate;
40 }
41
42private:
43 struct QuorumInfo
44 {
45 String status_path;
46 String is_active_node_value;
47 int is_active_node_version = -1;
48 int host_node_version = -1;
49 };
50
51 QuorumInfo quorum_info;
52 void checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper);
53
54 /// Rename temporary part and commit to ZooKeeper.
55 void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id);
56
57 StorageReplicatedMergeTree & storage;
58 size_t quorum;
59 size_t quorum_timeout_ms;
60 size_t max_parts_per_block;
61
62 bool deduplicate = true;
63 bool last_block_is_duplicate = false;
64
65 using Logger = Poco::Logger;
66 Logger * log;
67};
68
69}
70