1/* Copyright (c) 2018 BlackBerry Limited
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6http://www.apache.org/licenses/LICENSE-2.0
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License. */
12#pragma once
13
14#include <ext/shared_ptr_helper.h>
15#include <Storages/IStorage.h>
16
17#include <mutex>
18#include <condition_variable>
19
20
21namespace DB
22{
23
24struct BlocksMetadata
25{
26 String hash;
27 UInt64 version;
28};
29
30class IAST;
31using ASTPtr = std::shared_ptr<IAST>;
32using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
33
34class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
35{
36friend struct ext::shared_ptr_helper<StorageLiveView>;
37friend class LiveViewBlockInputStream;
38friend class LiveViewEventsBlockInputStream;
39friend class LiveViewBlockOutputStream;
40
41public:
42 ~StorageLiveView() override;
43 String getName() const override { return "LiveView"; }
44 String getTableName() const override { return table_name; }
45 String getDatabaseName() const override { return database_name; }
46 String getSelectDatabaseName() const { return select_database_name; }
47 String getSelectTableName() const { return select_table_name; }
48
49 NameAndTypePair getColumn(const String & column_name) const override;
50 bool hasColumn(const String & column_name) const override;
51
52 // const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
53 ASTPtr getInnerQuery() const { return inner_query->clone(); }
54
55 /// It is passed inside the query and solved at its level.
56 bool supportsSampling() const override { return true; }
57 bool supportsFinal() const override { return true; }
58
59 bool isTemporary() { return is_temporary; }
60
61 /// Check if we have any readers
62 /// must be called with mutex locked
63 bool hasUsers()
64 {
65 return blocks_ptr.use_count() > 1;
66 }
67
68 /// Check we have any active readers
69 /// must be called with mutex locked
70 bool hasActiveUsers()
71 {
72 return active_ptr.use_count() > 1;
73 }
74 /// No users thread mutex, predicate and wake up condition
75 void startNoUsersThread(const UInt64 & timeout);
76 std::mutex no_users_thread_wakeup_mutex;
77 bool no_users_thread_wakeup = false;
78 std::condition_variable no_users_thread_condition;
79 /// Get blocks hash
80 /// must be called with mutex locked
81 String getBlocksHashKey()
82 {
83 if (*blocks_metadata_ptr)
84 return (*blocks_metadata_ptr)->hash;
85 return {};
86 }
87 /// Get blocks version
88 /// must be called with mutex locked
89 UInt64 getBlocksVersion()
90 {
91 if (*blocks_metadata_ptr)
92 return (*blocks_metadata_ptr)->version;
93 return 0;
94 }
95
96 /// Reset blocks
97 /// must be called with mutex locked
98 void reset()
99 {
100 (*blocks_ptr).reset();
101 if (*blocks_metadata_ptr)
102 (*blocks_metadata_ptr)->hash.clear();
103 mergeable_blocks.reset();
104 }
105
106 void checkTableCanBeDropped() const override;
107 void drop(TableStructureWriteLockHolder &) override;
108 void startup() override;
109 void shutdown() override;
110
111 void refresh(const Context & context);
112
113 BlockInputStreams read(
114 const Names & column_names,
115 const SelectQueryInfo & query_info,
116 const Context & context,
117 QueryProcessingStage::Enum processed_stage,
118 size_t max_block_size,
119 unsigned num_streams) override;
120
121 BlockInputStreams watch(
122 const Names & column_names,
123 const SelectQueryInfo & query_info,
124 const Context & context,
125 QueryProcessingStage::Enum & processed_stage,
126 size_t max_block_size,
127 unsigned num_streams) override;
128
129 std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
130 BlocksPtrs getMergeableBlocks() { return mergeable_blocks; }
131 void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; }
132 std::shared_ptr<bool> getActivePtr() { return active_ptr; }
133
134 /// Read new data blocks that store query result
135 bool getNewBlocks();
136
137 Block getHeader() const;
138
139 static void writeIntoLiveView(
140 StorageLiveView & live_view,
141 const Block & block,
142 const Context & context);
143
144private:
145 String select_database_name;
146 String select_table_name;
147 String table_name;
148 String database_name;
149 ASTPtr inner_query;
150 Context & global_context;
151 bool is_temporary = false;
152 /// Mutex to protect access to sample block
153 mutable std::mutex sample_block_lock;
154 mutable Block sample_block;
155
156 /// Mutex for the blocks and ready condition
157 std::mutex mutex;
158 /// New blocks ready condition to broadcast to readers
159 /// that new blocks are available
160 std::condition_variable condition;
161
162 /// Active users
163 std::shared_ptr<bool> active_ptr;
164 /// Current data blocks that store query result
165 std::shared_ptr<BlocksPtr> blocks_ptr;
166 /// Current data blocks metadata
167 std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
168 BlocksPtrs mergeable_blocks;
169
170 /// Background thread for temporary tables
171 /// which drops this table if there are no users
172 static void noUsersThread(std::shared_ptr<StorageLiveView> storage, const UInt64 & timeout);
173 std::mutex no_users_thread_mutex;
174 std::thread no_users_thread;
175 std::atomic<bool> shutdown_called = false;
176 std::atomic<bool> start_no_users_thread_called = false;
177 UInt64 temporary_live_view_timeout;
178
179 StorageLiveView(
180 const String & table_name_,
181 const String & database_name_,
182 Context & local_context,
183 const ASTCreateQuery & query,
184 const ColumnsDescription & columns
185 );
186};
187
188}
189