1 | /* Copyright (c) 2018 BlackBerry Limited |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | http://www.apache.org/licenses/LICENSE-2.0 |
7 | Unless required by applicable law or agreed to in writing, software |
8 | distributed under the License is distributed on an "AS IS" BASIS, |
9 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
10 | See the License for the specific language governing permissions and |
11 | limitations under the License. */ |
12 | #pragma once |
13 | |
14 | #include <ext/shared_ptr_helper.h> |
15 | #include <Storages/IStorage.h> |
16 | |
17 | #include <mutex> |
18 | #include <condition_variable> |
19 | |
20 | |
21 | namespace DB |
22 | { |
23 | |
24 | struct BlocksMetadata |
25 | { |
26 | String hash; |
27 | UInt64 version; |
28 | }; |
29 | |
30 | class IAST; |
31 | using ASTPtr = std::shared_ptr<IAST>; |
32 | using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>; |
33 | |
34 | class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage |
35 | { |
36 | friend struct ext::shared_ptr_helper<StorageLiveView>; |
37 | friend class LiveViewBlockInputStream; |
38 | friend class LiveViewEventsBlockInputStream; |
39 | friend class LiveViewBlockOutputStream; |
40 | |
41 | public: |
42 | ~StorageLiveView() override; |
43 | String getName() const override { return "LiveView" ; } |
44 | String getTableName() const override { return table_name; } |
45 | String getDatabaseName() const override { return database_name; } |
46 | String getSelectDatabaseName() const { return select_database_name; } |
47 | String getSelectTableName() const { return select_table_name; } |
48 | |
49 | NameAndTypePair getColumn(const String & column_name) const override; |
50 | bool hasColumn(const String & column_name) const override; |
51 | |
52 | // const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } |
53 | ASTPtr getInnerQuery() const { return inner_query->clone(); } |
54 | |
55 | /// It is passed inside the query and solved at its level. |
56 | bool supportsSampling() const override { return true; } |
57 | bool supportsFinal() const override { return true; } |
58 | |
59 | bool isTemporary() { return is_temporary; } |
60 | |
61 | /// Check if we have any readers |
62 | /// must be called with mutex locked |
63 | bool hasUsers() |
64 | { |
65 | return blocks_ptr.use_count() > 1; |
66 | } |
67 | |
68 | /// Check we have any active readers |
69 | /// must be called with mutex locked |
70 | bool hasActiveUsers() |
71 | { |
72 | return active_ptr.use_count() > 1; |
73 | } |
74 | /// No users thread mutex, predicate and wake up condition |
75 | void startNoUsersThread(const UInt64 & timeout); |
76 | std::mutex no_users_thread_wakeup_mutex; |
77 | bool no_users_thread_wakeup = false; |
78 | std::condition_variable no_users_thread_condition; |
79 | /// Get blocks hash |
80 | /// must be called with mutex locked |
81 | String getBlocksHashKey() |
82 | { |
83 | if (*blocks_metadata_ptr) |
84 | return (*blocks_metadata_ptr)->hash; |
85 | return {}; |
86 | } |
87 | /// Get blocks version |
88 | /// must be called with mutex locked |
89 | UInt64 getBlocksVersion() |
90 | { |
91 | if (*blocks_metadata_ptr) |
92 | return (*blocks_metadata_ptr)->version; |
93 | return 0; |
94 | } |
95 | |
96 | /// Reset blocks |
97 | /// must be called with mutex locked |
98 | void reset() |
99 | { |
100 | (*blocks_ptr).reset(); |
101 | if (*blocks_metadata_ptr) |
102 | (*blocks_metadata_ptr)->hash.clear(); |
103 | mergeable_blocks.reset(); |
104 | } |
105 | |
106 | void checkTableCanBeDropped() const override; |
107 | void drop(TableStructureWriteLockHolder &) override; |
108 | void startup() override; |
109 | void shutdown() override; |
110 | |
111 | void refresh(const Context & context); |
112 | |
113 | BlockInputStreams read( |
114 | const Names & column_names, |
115 | const SelectQueryInfo & query_info, |
116 | const Context & context, |
117 | QueryProcessingStage::Enum processed_stage, |
118 | size_t max_block_size, |
119 | unsigned num_streams) override; |
120 | |
121 | BlockInputStreams watch( |
122 | const Names & column_names, |
123 | const SelectQueryInfo & query_info, |
124 | const Context & context, |
125 | QueryProcessingStage::Enum & processed_stage, |
126 | size_t max_block_size, |
127 | unsigned num_streams) override; |
128 | |
129 | std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; } |
130 | BlocksPtrs getMergeableBlocks() { return mergeable_blocks; } |
131 | void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; } |
132 | std::shared_ptr<bool> getActivePtr() { return active_ptr; } |
133 | |
134 | /// Read new data blocks that store query result |
135 | bool getNewBlocks(); |
136 | |
137 | Block () const; |
138 | |
139 | static void writeIntoLiveView( |
140 | StorageLiveView & live_view, |
141 | const Block & block, |
142 | const Context & context); |
143 | |
144 | private: |
145 | String select_database_name; |
146 | String select_table_name; |
147 | String table_name; |
148 | String database_name; |
149 | ASTPtr inner_query; |
150 | Context & global_context; |
151 | bool is_temporary = false; |
152 | /// Mutex to protect access to sample block |
153 | mutable std::mutex sample_block_lock; |
154 | mutable Block sample_block; |
155 | |
156 | /// Mutex for the blocks and ready condition |
157 | std::mutex mutex; |
158 | /// New blocks ready condition to broadcast to readers |
159 | /// that new blocks are available |
160 | std::condition_variable condition; |
161 | |
162 | /// Active users |
163 | std::shared_ptr<bool> active_ptr; |
164 | /// Current data blocks that store query result |
165 | std::shared_ptr<BlocksPtr> blocks_ptr; |
166 | /// Current data blocks metadata |
167 | std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr; |
168 | BlocksPtrs mergeable_blocks; |
169 | |
170 | /// Background thread for temporary tables |
171 | /// which drops this table if there are no users |
172 | static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout); |
173 | std::mutex no_users_thread_mutex; |
174 | std::thread no_users_thread; |
175 | std::atomic<bool> shutdown_called = false; |
176 | std::atomic<bool> start_no_users_thread_called = false; |
177 | UInt64 temporary_live_view_timeout; |
178 | |
179 | StorageLiveView( |
180 | const String & table_name_, |
181 | const String & database_name_, |
182 | Context & local_context, |
183 | const ASTCreateQuery & query, |
184 | const ColumnsDescription & columns |
185 | ); |
186 | }; |
187 | |
188 | } |
189 | |