1#pragma once
2
3#include <DataStreams/IBlockInputStream.h>
4
5
6namespace DB
7{
8
9/** Implements LIVE VIEW table WATCH input stream.
10 * Keeps stream alive by outputing blocks with no rows
11 * based on period specified by the heartbeat interval.
12 */
13class LiveViewBlockInputStream : public IBlockInputStream
14{
15
16using NonBlockingResult = std::pair<Block, bool>;
17
18public:
19 ~LiveViewBlockInputStream() override
20 {
21 /// Start storage no users thread
22 /// if we are the last active user
23 if (!storage->is_dropped && blocks_ptr.use_count() < 3)
24 storage->startNoUsersThread(temporary_live_view_timeout_sec);
25 }
26
27 LiveViewBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
28 std::shared_ptr<BlocksPtr> blocks_ptr_,
29 std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
30 std::shared_ptr<bool> active_ptr_,
31 const bool has_limit_, const UInt64 limit_,
32 const UInt64 heartbeat_interval_sec_,
33 const UInt64 temporary_live_view_timeout_sec_)
34 : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
35 blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
36 active_ptr(std::move(active_ptr_)),
37 has_limit(has_limit_), limit(limit_),
38 heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
39 temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
40 {
41 /// grab active pointer
42 active = active_ptr.lock();
43 }
44
45 String getName() const override { return "LiveViewBlockInputStream"; }
46
47 void cancel(bool kill) override
48 {
49 if (isCancelled() || storage->is_dropped)
50 return;
51 IBlockInputStream::cancel(kill);
52 std::lock_guard lock(storage->mutex);
53 storage->condition.notify_all();
54 }
55
56 Block getHeader() const override { return storage->getHeader(); }
57
58 void refresh()
59 {
60 if (active && blocks && it == end)
61 it = blocks->begin();
62 }
63
64 void suspend()
65 {
66 active.reset();
67 }
68
69 void resume()
70 {
71 active = active_ptr.lock();
72 {
73 if (!blocks || blocks.get() != (*blocks_ptr).get())
74 blocks = (*blocks_ptr);
75 }
76 it = blocks->begin();
77 begin = blocks->begin();
78 end = blocks->end();
79 }
80
81 NonBlockingResult tryRead()
82 {
83 return tryReadImpl(false);
84 }
85
86protected:
87 Block readImpl() override
88 {
89 /// try reading
90 return tryReadImpl(true).first;
91 }
92
93 /** tryRead method attempts to read a block in either blocking
94 * or non-blocking mode. If blocking is set to false
95 * then method return empty block with flag set to false
96 * to indicate that method would block to get the next block.
97 */
98 NonBlockingResult tryReadImpl(bool blocking)
99 {
100 Block res;
101
102 if (has_limit && num_updates == static_cast<Int64>(limit))
103 {
104 return { Block(), true };
105 }
106 /// If blocks were never assigned get blocks
107 if (!blocks)
108 {
109 std::lock_guard lock(storage->mutex);
110 if (!active)
111 return { Block(), false };
112 blocks = (*blocks_ptr);
113 it = blocks->begin();
114 begin = blocks->begin();
115 end = blocks->end();
116 }
117
118 if (isCancelled() || storage->is_dropped)
119 {
120 return { Block(), true };
121 }
122
123 if (it == end)
124 {
125 {
126 std::unique_lock lock(storage->mutex);
127 if (!active)
128 return { Block(), false };
129 /// If we are done iterating over our blocks
130 /// and there are new blocks availble then get them
131 if (blocks.get() != (*blocks_ptr).get())
132 {
133 blocks = (*blocks_ptr);
134 it = blocks->begin();
135 begin = blocks->begin();
136 end = blocks->end();
137 }
138 /// No new blocks available wait for new ones
139 else
140 {
141 if (!blocking)
142 {
143 return { Block(), false };
144 }
145 if (!end_of_blocks)
146 {
147 end_of_blocks = true;
148 return { getHeader(), true };
149 }
150 while (true)
151 {
152 UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
153
154 /// Or spurious wakeup.
155 bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
156 std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
157
158 if (isCancelled() || storage->is_dropped)
159 {
160 return { Block(), true };
161 }
162 if (signaled)
163 {
164 break;
165 }
166 else
167 {
168 // heartbeat
169 last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
170 return { getHeader(), true };
171 }
172 }
173 }
174 }
175 return tryReadImpl(blocking);
176 }
177
178 res = *it;
179
180 ++it;
181
182 if (it == end)
183 {
184 end_of_blocks = false;
185 num_updates += 1;
186 }
187
188 last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
189 return { res, true };
190 }
191
192private:
193 std::shared_ptr<StorageLiveView> storage;
194 std::shared_ptr<BlocksPtr> blocks_ptr;
195 std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
196 std::weak_ptr<bool> active_ptr;
197 std::shared_ptr<bool> active;
198 BlocksPtr blocks;
199 BlocksMetadataPtr blocks_metadata;
200 Blocks::iterator it;
201 Blocks::iterator end;
202 Blocks::iterator begin;
203 const bool has_limit;
204 const UInt64 limit;
205 Int64 num_updates = -1;
206 bool end_of_blocks = false;
207 UInt64 heartbeat_interval_usec;
208 UInt64 temporary_live_view_timeout_sec;
209 UInt64 last_event_timestamp_usec = 0;
210 Poco::Timestamp timestamp;
211};
212
213}
214