1/* Copyright (c) 2018 BlackBerry Limited
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6http://www.apache.org/licenses/LICENSE-2.0
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License. */
12
13#pragma once
14
15#include <Poco/Condition.h>
16#include <DataTypes/DataTypesNumber.h>
17#include <DataTypes/DataTypeString.h>
18#include <Columns/ColumnString.h>
19#include <Columns/ColumnsNumber.h>
20#include <DataStreams/OneBlockInputStream.h>
21#include <DataStreams/IBlockInputStream.h>
22#include <Storages/LiveView/StorageLiveView.h>
23
24
25namespace DB
26{
27
28/** Implements LIVE VIEW table WATCH EVENTS input stream.
29 * Keeps stream alive by outputing blocks with no rows
30 * based on period specified by the heartbeat interval.
31 */
32class LiveViewEventsBlockInputStream : public IBlockInputStream
33{
34
35using NonBlockingResult = std::pair<Block, bool>;
36
37public:
38 ~LiveViewEventsBlockInputStream() override
39 {
40 /// Start storage no users thread
41 /// if we are the last active user
42 if (!storage->is_dropped && blocks_ptr.use_count() < 3)
43 storage->startNoUsersThread(temporary_live_view_timeout_sec);
44 }
45 /// length default -2 because we want LIMIT to specify number of updates so that LIMIT 1 waits for 1 update
46 /// and LIMIT 0 just returns data without waiting for any updates
47 LiveViewEventsBlockInputStream(std::shared_ptr<StorageLiveView> storage_,
48 std::shared_ptr<BlocksPtr> blocks_ptr_,
49 std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr_,
50 std::shared_ptr<bool> active_ptr_,
51 const bool has_limit_, const UInt64 limit_,
52 const UInt64 heartbeat_interval_sec_,
53 const UInt64 temporary_live_view_timeout_sec_)
54 : storage(std::move(storage_)), blocks_ptr(std::move(blocks_ptr_)),
55 blocks_metadata_ptr(std::move(blocks_metadata_ptr_)),
56 active_ptr(std::move(active_ptr_)), has_limit(has_limit_),
57 limit(limit_),
58 heartbeat_interval_usec(heartbeat_interval_sec_ * 1000000),
59 temporary_live_view_timeout_sec(temporary_live_view_timeout_sec_)
60 {
61 /// grab active pointer
62 active = active_ptr.lock();
63 }
64
65 String getName() const override { return "LiveViewEventsBlockInputStream"; }
66
67 void cancel(bool kill) override
68 {
69 if (isCancelled() || storage->is_dropped)
70 return;
71 IBlockInputStream::cancel(kill);
72 std::lock_guard lock(storage->mutex);
73 storage->condition.notify_all();
74 }
75
76 Block getHeader() const override
77 {
78 return {ColumnWithTypeAndName(ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "version")};
79 }
80
81 void refresh()
82 {
83 if (active && blocks && it == end)
84 it = blocks->begin();
85 }
86
87 void suspend()
88 {
89 active.reset();
90 }
91
92 void resume()
93 {
94 active = active_ptr.lock();
95 {
96 if (!blocks || blocks.get() != (*blocks_ptr).get())
97 {
98 blocks = (*blocks_ptr);
99 blocks_metadata = (*blocks_metadata_ptr);
100 }
101 }
102 it = blocks->begin();
103 begin = blocks->begin();
104 end = blocks->end();
105 }
106
107 NonBlockingResult tryRead()
108 {
109 return tryReadImpl(false);
110 }
111
112 Block getEventBlock()
113 {
114 Block res{
115 ColumnWithTypeAndName(
116 DataTypeUInt64().createColumnConst(1, blocks_metadata->version)->convertToFullColumnIfConst(),
117 std::make_shared<DataTypeUInt64>(),
118 "version")
119 };
120 return res;
121 }
122protected:
123 Block readImpl() override
124 {
125 /// try reading
126 return tryReadImpl(true).first;
127 }
128
129 /** tryRead method attempts to read a block in either blocking
130 * or non-blocking mode. If blocking is set to false
131 * then method return empty block with flag set to false
132 * to indicate that method would block to get the next block.
133 */
134 NonBlockingResult tryReadImpl(bool blocking)
135 {
136 if (has_limit && num_updates == static_cast<Int64>(limit))
137 {
138 return { Block(), true };
139 }
140 /// If blocks were never assigned get blocks
141 if (!blocks)
142 {
143 std::lock_guard lock(storage->mutex);
144 if (!active)
145 return { Block(), false };
146 blocks = (*blocks_ptr);
147 blocks_metadata = (*blocks_metadata_ptr);
148 it = blocks->begin();
149 begin = blocks->begin();
150 end = blocks->end();
151 }
152
153 if (isCancelled() || storage->is_dropped)
154 {
155 return { Block(), true };
156 }
157
158 if (it == end)
159 {
160 {
161 std::unique_lock lock(storage->mutex);
162 if (!active)
163 return { Block(), false };
164 /// If we are done iterating over our blocks
165 /// and there are new blocks availble then get them
166 if (blocks.get() != (*blocks_ptr).get())
167 {
168 blocks = (*blocks_ptr);
169 blocks_metadata = (*blocks_metadata_ptr);
170 it = blocks->begin();
171 begin = blocks->begin();
172 end = blocks->end();
173 }
174 /// No new blocks available wait for new ones
175 else
176 {
177 if (!blocking)
178 {
179 return { Block(), false };
180 }
181 if (!end_of_blocks)
182 {
183 end_of_blocks = true;
184 return { getHeader(), true };
185 }
186 while (true)
187 {
188 UInt64 timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
189
190 /// Or spurious wakeup.
191 bool signaled = std::cv_status::no_timeout == storage->condition.wait_for(lock,
192 std::chrono::microseconds(std::max(UInt64(0), heartbeat_interval_usec - (timestamp_usec - last_event_timestamp_usec))));
193
194 if (isCancelled() || storage->is_dropped)
195 {
196 return { Block(), true };
197 }
198 if (signaled)
199 {
200 break;
201 }
202 else
203 {
204 // repeat the event block as a heartbeat
205 last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
206 return { getHeader(), true };
207 }
208 }
209 }
210 }
211 return tryReadImpl(blocking);
212 }
213
214 // move right to the end
215 it = end;
216 end_of_blocks = false;
217 num_updates += 1;
218
219 last_event_timestamp_usec = static_cast<UInt64>(timestamp.epochMicroseconds());
220
221 return { getEventBlock(), true };
222 }
223
224private:
225 std::shared_ptr<StorageLiveView> storage;
226 std::shared_ptr<BlocksPtr> blocks_ptr;
227 std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
228 std::weak_ptr<bool> active_ptr;
229 std::shared_ptr<bool> active;
230 BlocksPtr blocks;
231 BlocksMetadataPtr blocks_metadata;
232 Blocks::iterator it;
233 Blocks::iterator end;
234 Blocks::iterator begin;
235 const bool has_limit;
236 const UInt64 limit;
237 Int64 num_updates = -1;
238 bool end_of_blocks = false;
239 UInt64 heartbeat_interval_usec;
240 UInt64 temporary_live_view_timeout_sec;
241 UInt64 last_event_timestamp_usec = 0;
242 Poco::Timestamp timestamp;
243};
244
245}
246