1#pragma once
2
3#include <Core/Block.h>
4#include <Core/NamesAndTypes.h>
5#include <Core/Settings.h>
6#include <Core/Types.h>
7#include <DataStreams/IBlockStream_fwd.h>
8#include <Interpreters/ClientInfo.h>
9#include <Interpreters/Users.h>
10#include <Parsers/IAST_fwd.h>
11#include <Common/LRUCache.h>
12#include <Common/MultiVersion.h>
13#include <Common/ThreadPool.h>
14#include "config_core.h"
15#include <Storages/IStorage_fwd.h>
16#include <Disks/DiskSpaceMonitor.h>
17#include <atomic>
18#include <chrono>
19#include <condition_variable>
20#include <functional>
21#include <memory>
22#include <mutex>
23#include <optional>
24#include <thread>
25#include <Common/RemoteHostFilter.h>
26
27
28namespace Poco
29{
30 namespace Net
31 {
32 class IPAddress;
33 }
34}
35
36namespace zkutil
37{
38 class ZooKeeper;
39}
40
41
42namespace DB
43{
44
45struct ContextShared;
46class Context;
47class QuotaContext;
48class RowPolicyContext;
49class EmbeddedDictionaries;
50class ExternalDictionariesLoader;
51class ExternalModelsLoader;
52class InterserverIOHandler;
53class BackgroundProcessingPool;
54class BackgroundSchedulePool;
55class MergeList;
56class Cluster;
57class Compiler;
58class MarkCache;
59class UncompressedCache;
60class ProcessList;
61class QueryStatus;
62class Macros;
63struct Progress;
64class Clusters;
65class QueryLog;
66class QueryThreadLog;
67class PartLog;
68class TextLog;
69class TraceLog;
70class MetricLog;
71struct MergeTreeSettings;
72class IDatabase;
73class DDLGuard;
74class DDLWorker;
75class ITableFunction;
76class Block;
77class ActionLocksManager;
78using ActionLocksManagerPtr = std::shared_ptr<ActionLocksManager>;
79class ShellCommand;
80class ICompressionCodec;
81class AccessControlManager;
82class SettingsConstraints;
83class RemoteHostFilter;
84
85class IOutputFormat;
86using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
87
88#if USE_EMBEDDED_COMPILER
89
90class CompiledExpressionCache;
91
92#endif
93
94/// (database name, table name)
95using DatabaseAndTableName = std::pair<String, String>;
96
97/// Table -> set of table-views that make SELECT from it.
98using ViewDependencies = std::map<DatabaseAndTableName, std::set<DatabaseAndTableName>>;
99using Dependencies = std::vector<DatabaseAndTableName>;
100
101using TableAndCreateAST = std::pair<StoragePtr, ASTPtr>;
102using TableAndCreateASTs = std::map<String, TableAndCreateAST>;
103
104/// Callback for external tables initializer
105using ExternalTablesInitializer = std::function<void(Context &)>;
106
107/// Callback for initialize input()
108using InputInitializer = std::function<void(Context &, const StoragePtr &)>;
109/// Callback for reading blocks of data from client for function input()
110using InputBlocksReader = std::function<Block(Context &)>;
111
112/// Scalar results of sub queries
113using Scalars = std::map<String, Block>;
114
115/// An empty interface for an arbitrary object that may be attached by a shared pointer
116/// to query context, when using ClickHouse as a library.
117struct IHostContext
118{
119 virtual ~IHostContext() = default;
120};
121
122using IHostContextPtr = std::shared_ptr<IHostContext>;
123
124/** A set of known objects that can be used in the query.
125 * Consists of a shared part (always common to all sessions and queries)
126 * and copied part (which can be its own for each session or query).
127 *
128 * Everything is encapsulated for all sorts of checks and locks.
129 */
130class Context
131{
132private:
133 using Shared = std::shared_ptr<ContextShared>;
134 Shared shared;
135
136 ClientInfo client_info;
137 ExternalTablesInitializer external_tables_initializer_callback;
138
139 InputInitializer input_initializer_callback;
140 InputBlocksReader input_blocks_reader;
141
142 std::shared_ptr<QuotaContext> quota; /// Current quota. By default - empty quota, that have no limits.
143 bool is_quota_management_allowed = false; /// Whether the current user is allowed to manage quotas via SQL commands.
144 std::shared_ptr<RowPolicyContext> row_policy;
145 bool is_row_policy_management_allowed = false; /// Whether the current user is allowed to manage row policies via SQL commands.
146 String current_database;
147 Settings settings; /// Setting for query execution.
148 std::shared_ptr<const SettingsConstraints> settings_constraints;
149 using ProgressCallback = std::function<void(const Progress & progress)>;
150 ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
151 QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query.
152 std::pair<String, String> insertion_table; /// Saved insertion table in query context
153
154 String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
155 /// Thus, used in HTTP interface. If not specified - then some globally default format is used.
156 // TODO maybe replace with DatabaseMemory?
157 TableAndCreateASTs external_tables; /// Temporary tables.
158 Scalars scalars;
159 StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views
160 Tables table_function_results; /// Temporary tables obtained by execution of table functions. Keyed by AST tree id.
161 Context * query_context = nullptr;
162 Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
163 Context * global_context = nullptr; /// Global context. Could be equal to this.
164
165 UInt64 session_close_cycle = 0;
166 bool session_is_used = false;
167
168 using SampleBlockCache = std::unordered_map<std::string, Block>;
169 mutable SampleBlockCache sample_block_cache;
170
171 using DatabasePtr = std::shared_ptr<IDatabase>;
172 using Databases = std::map<String, std::shared_ptr<IDatabase>>;
173
174 NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements.
175 /// (key=name, value)
176
177 IHostContextPtr host_context; /// Arbitrary object that may used to attach some host specific information to query context,
178 /// when using ClickHouse as a library in some project. For example, it may contain host
179 /// logger, some query identification information, profiling guards, etc. This field is
180 /// to be customized in HTTP and TCP servers by overloading the customizeContext(DB::Context&)
181 /// methods.
182
183 /// Use copy constructor or createGlobal() instead
184 Context();
185
186public:
187 /// Create initial Context with ContextShared and etc.
188 static Context createGlobal();
189
190 Context(const Context &);
191 Context & operator=(const Context &);
192 ~Context();
193
194 String getPath() const;
195 String getTemporaryPath() const;
196 String getFlagsPath() const;
197 String getUserFilesPath() const;
198 String getDictionariesLibPath() const;
199
200 void setPath(const String & path);
201 void setTemporaryPath(const String & path);
202 void setFlagsPath(const String & path);
203 void setUserFilesPath(const String & path);
204 void setDictionariesLibPath(const String & path);
205
206 using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
207
208 /// Global application configuration settings.
209 void setConfig(const ConfigurationPtr & config);
210 const Poco::Util::AbstractConfiguration & getConfigRef() const;
211
212 AccessControlManager & getAccessControlManager();
213 const AccessControlManager & getAccessControlManager() const;
214 std::shared_ptr<QuotaContext> getQuota() const { return quota; }
215 void checkQuotaManagementIsAllowed();
216 std::shared_ptr<RowPolicyContext> getRowPolicy() const { return row_policy; }
217 void checkRowPolicyManagementIsAllowed();
218
219 /** Take the list of users, quotas and configuration profiles from this config.
220 * The list of users is completely replaced.
221 * The accumulated quota values are not reset if the quota is not deleted.
222 */
223 void setUsersConfig(const ConfigurationPtr & config);
224 ConfigurationPtr getUsersConfig();
225
226 /// Must be called before getClientInfo.
227 void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key);
228
229 /// Used by MySQL Secure Password Authentication plugin.
230 std::shared_ptr<const User> getUser(const String & user_name);
231
232 /// Compute and set actual user settings, client_info.current_user should be set
233 void calculateUserSettings();
234
235 /// We have to copy external tables inside executeQuery() to track limits. Therefore, set callback for it. Must set once.
236 void setExternalTablesInitializer(ExternalTablesInitializer && initializer);
237 /// This method is called in executeQuery() and will call the external tables initializer.
238 void initializeExternalTablesIfSet();
239
240 /// When input() is present we have to send columns structure to client
241 void setInputInitializer(InputInitializer && initializer);
242 /// This method is called in StorageInput::read while executing query
243 void initializeInput(const StoragePtr & input_storage);
244
245 /// Callback for read data blocks from client one by one for function input()
246 void setInputBlocksReaderCallback(InputBlocksReader && reader);
247 /// Get callback for reading data for input()
248 InputBlocksReader getInputBlocksReaderCallback() const;
249 void resetInputCallbacks();
250
251 ClientInfo & getClientInfo() { return client_info; }
252 const ClientInfo & getClientInfo() const { return client_info; }
253
254 void addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where);
255 void removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where);
256 Dependencies getDependencies(const String & database_name, const String & table_name) const;
257
258 /// Functions where we can lock the context manually
259 void addDependencyUnsafe(const DatabaseAndTableName & from, const DatabaseAndTableName & where);
260 void removeDependencyUnsafe(const DatabaseAndTableName & from, const DatabaseAndTableName & where);
261
262 /// Checking the existence of the table/database. Database can be empty - in this case the current database is used.
263 bool isTableExist(const String & database_name, const String & table_name) const;
264 bool isDatabaseExist(const String & database_name) const;
265 bool isDictionaryExists(const String & database_name, const String & dictionary_name) const;
266 bool isExternalTableExist(const String & table_name) const;
267 bool hasDatabaseAccessRights(const String & database_name) const;
268
269 bool hasDictionaryAccessRights(const String & dictionary_name) const;
270
271 /** The parameter check_database_access_rights exists to not check the permissions of the database again,
272 * when assertTableDoesntExist or assertDatabaseExists is called inside another function that already
273 * made this check.
274 */
275 void assertTableDoesntExist(const String & database_name, const String & table_name, bool check_database_acccess_rights = true) const;
276 void assertDatabaseExists(const String & database_name, bool check_database_acccess_rights = true) const;
277
278 void assertDatabaseDoesntExist(const String & database_name) const;
279 void checkDatabaseAccessRights(const std::string & database_name) const;
280
281 const Scalars & getScalars() const;
282 const Block & getScalar(const String & name) const;
283 Tables getExternalTables() const;
284 StoragePtr tryGetExternalTable(const String & table_name) const;
285 StoragePtr getTable(const String & database_name, const String & table_name) const;
286 StoragePtr tryGetTable(const String & database_name, const String & table_name) const;
287 void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {});
288 void addScalar(const String & name, const Block & block);
289 bool hasScalar(const String & name) const;
290 StoragePtr tryRemoveExternalTable(const String & table_name);
291
292 StoragePtr executeTableFunction(const ASTPtr & table_expression);
293
294 void addViewSource(const StoragePtr & storage);
295 StoragePtr getViewSource();
296
297 void addDatabase(const String & database_name, const DatabasePtr & database);
298 DatabasePtr detachDatabase(const String & database_name);
299
300 /// Get an object that protects the table from concurrently executing multiple DDL operations.
301 std::unique_ptr<DDLGuard> getDDLGuard(const String & database, const String & table) const;
302
303 String getCurrentDatabase() const;
304 String getCurrentQueryId() const;
305
306 /// Id of initiating query for distributed queries; or current query id if it's not a distributed query.
307 String getInitialQueryId() const;
308
309 void setCurrentDatabase(const String & name);
310 void setCurrentQueryId(const String & query_id);
311
312 void killCurrentQuery();
313
314 void setInsertionTable(std::pair<String, String> && db_and_table) { insertion_table = db_and_table; }
315 const std::pair<String, String> & getInsertionTable() const { return insertion_table; }
316
317 String getDefaultFormat() const; /// If default_format is not specified, some global default format is returned.
318 void setDefaultFormat(const String & name);
319
320 MultiVersion<Macros>::Version getMacros() const;
321 void setMacros(std::unique_ptr<Macros> && macros);
322
323 Settings getSettings() const;
324 void setSettings(const Settings & settings_);
325
326 /// Set settings by name.
327 void setSetting(const String & name, const String & value);
328 void setSetting(const String & name, const Field & value);
329 void applySettingChange(const SettingChange & change);
330 void applySettingsChanges(const SettingsChanges & changes);
331
332 /// Checks the constraints.
333 void checkSettingsConstraints(const SettingChange & change);
334 void checkSettingsConstraints(const SettingsChanges & changes);
335
336 /// Returns the current constraints (can return null).
337 std::shared_ptr<const SettingsConstraints> getSettingsConstraints() const { return settings_constraints; }
338
339 const EmbeddedDictionaries & getEmbeddedDictionaries() const;
340 const ExternalDictionariesLoader & getExternalDictionariesLoader() const;
341 const ExternalModelsLoader & getExternalModelsLoader() const;
342 EmbeddedDictionaries & getEmbeddedDictionaries();
343 ExternalDictionariesLoader & getExternalDictionariesLoader();
344 ExternalModelsLoader & getExternalModelsLoader();
345 void tryCreateEmbeddedDictionaries() const;
346
347 /// I/O formats.
348 BlockInputStreamPtr getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const;
349 BlockOutputStreamPtr getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const;
350
351 OutputFormatPtr getOutputFormatProcessor(const String & name, WriteBuffer & buf, const Block & sample) const;
352
353 InterserverIOHandler & getInterserverIOHandler();
354
355 /// How other servers can access this for downloading replicated data.
356 void setInterserverIOAddress(const String & host, UInt16 port);
357 std::pair<String, UInt16> getInterserverIOAddress() const;
358
359 /// Credentials which server will use to communicate with others
360 void setInterserverCredentials(const String & user, const String & password);
361 std::pair<String, String> getInterserverCredentials() const;
362
363 /// Interserver requests scheme (http or https)
364 void setInterserverScheme(const String & scheme);
365 String getInterserverScheme() const;
366
367 /// Storage of allowed hosts from config.xml
368 void setRemoteHostFilter(const Poco::Util::AbstractConfiguration & config);
369 const RemoteHostFilter & getRemoteHostFilter() const;
370
371 /// The port that the server listens for executing SQL queries.
372 UInt16 getTCPPort() const;
373
374 std::optional<UInt16> getTCPPortSecure() const;
375
376 /// Get query for the CREATE table.
377 ASTPtr getCreateExternalTableQuery(const String & table_name) const;
378
379 const DatabasePtr getDatabase(const String & database_name) const;
380 DatabasePtr getDatabase(const String & database_name);
381 const DatabasePtr tryGetDatabase(const String & database_name) const;
382 DatabasePtr tryGetDatabase(const String & database_name);
383
384 const Databases getDatabases() const;
385 Databases getDatabases();
386
387 std::shared_ptr<Context> acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check) const;
388 void releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout);
389
390 /// Close sessions, that has been expired. Returns how long to wait for next session to be expired, if no new sessions will be added.
391 std::chrono::steady_clock::duration closeSessions() const;
392
393 /// For methods below you may need to acquire a lock by yourself.
394 std::unique_lock<std::recursive_mutex> getLock() const;
395
396 const Context & getQueryContext() const;
397 Context & getQueryContext();
398 bool hasQueryContext() const { return query_context != nullptr; }
399
400 const Context & getSessionContext() const;
401 Context & getSessionContext();
402 bool hasSessionContext() const { return session_context != nullptr; }
403
404 const Context & getGlobalContext() const;
405 Context & getGlobalContext();
406 bool hasGlobalContext() const { return global_context != nullptr; }
407
408 void setQueryContext(Context & context_) { query_context = &context_; }
409 void setSessionContext(Context & context_) { session_context = &context_; }
410
411 void makeQueryContext() { query_context = this; }
412 void makeSessionContext() { session_context = this; }
413 void makeGlobalContext() { global_context = this; }
414
415 const Settings & getSettingsRef() const { return settings; }
416 Settings & getSettingsRef() { return settings; }
417
418 void setProgressCallback(ProgressCallback callback);
419 /// Used in InterpreterSelectQuery to pass it to the IBlockInputStream.
420 ProgressCallback getProgressCallback() const;
421
422 /** Set in executeQuery and InterpreterSelectQuery. Then it is used in IBlockInputStream,
423 * to update and monitor information about the total number of resources spent for the query.
424 */
425 void setProcessListElement(QueryStatus * elem);
426 /// Can return nullptr if the query was not inserted into the ProcessList.
427 QueryStatus * getProcessListElement() const;
428
429 /// List all queries.
430 ProcessList & getProcessList();
431 const ProcessList & getProcessList() const;
432
433 MergeList & getMergeList();
434 const MergeList & getMergeList() const;
435
436 /// If the current session is expired at the time of the call, synchronously creates and returns a new session with the startNewSession() call.
437 /// If no ZooKeeper configured, throws an exception.
438 std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
439 /// Has ready or expired ZooKeeper
440 bool hasZooKeeper() const;
441 /// Reset current zookeeper session. Do not create a new one.
442 void resetZooKeeper() const;
443
444 /// Create a cache of uncompressed blocks of specified size. This can be done only once.
445 void setUncompressedCache(size_t max_size_in_bytes);
446 std::shared_ptr<UncompressedCache> getUncompressedCache() const;
447 void dropUncompressedCache() const;
448
449 /// Create a cache of marks of specified size. This can be done only once.
450 void setMarkCache(size_t cache_size_in_bytes);
451 std::shared_ptr<MarkCache> getMarkCache() const;
452 void dropMarkCache() const;
453
454 /** Clear the caches of the uncompressed blocks and marks.
455 * This is usually done when renaming tables, changing the type of columns, deleting a table.
456 * - since caches are linked to file names, and become incorrect.
457 * (when deleting a table - it is necessary, since in its place another can appear)
458 * const - because the change in the cache is not considered significant.
459 */
460 void dropCaches() const;
461
462 BackgroundProcessingPool & getBackgroundPool();
463 BackgroundProcessingPool & getBackgroundMovePool();
464 BackgroundSchedulePool & getSchedulePool();
465
466 void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
467 DDLWorker & getDDLWorker() const;
468
469 Clusters & getClusters() const;
470 std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
471 std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
472 void setClustersConfig(const ConfigurationPtr & config, const String & config_name = "remote_servers");
473 /// Sets custom cluster, but doesn't update configuration
474 void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
475 void reloadClusterConfig();
476
477 Compiler & getCompiler();
478
479 /// Call after initialization before using system logs. Call for global context.
480 void initializeSystemLogs();
481
482 void initializeTraceCollector();
483 bool hasTraceCollector();
484
485 /// Nullptr if the query log is not ready for this moment.
486 std::shared_ptr<QueryLog> getQueryLog();
487 std::shared_ptr<QueryThreadLog> getQueryThreadLog();
488 std::shared_ptr<TraceLog> getTraceLog();
489 std::shared_ptr<TextLog> getTextLog();
490 std::shared_ptr<MetricLog> getMetricLog();
491
492 /// Returns an object used to log opertaions with parts if it possible.
493 /// Provide table name to make required cheks.
494 std::shared_ptr<PartLog> getPartLog(const String & part_database);
495
496 const MergeTreeSettings & getMergeTreeSettings() const;
497
498 /// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check)
499 void setMaxTableSizeToDrop(size_t max_size);
500 void checkTableCanBeDropped(const String & database, const String & table, const size_t & table_size) const;
501
502 /// Prevents DROP PARTITION if its size is greater than max_size (50GB by default, max_size=0 turn off this check)
503 void setMaxPartitionSizeToDrop(size_t max_size);
504 void checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size) const;
505
506 /// Lets you select the compression codec according to the conditions described in the configuration file.
507 std::shared_ptr<ICompressionCodec> chooseCompressionCodec(size_t part_size, double part_size_ratio) const;
508
509 DiskSelector & getDiskSelector() const;
510
511 /// Provides storage disks
512 const DiskPtr & getDisk(const String & name) const;
513 const DiskPtr & getDefaultDisk() const { return getDisk("default"); }
514
515 StoragePolicySelector & getStoragePolicySelector() const;
516
517 /// Provides storage politics schemes
518 const StoragePolicyPtr & getStoragePolicy(const String &name) const;
519
520 /// Get the server uptime in seconds.
521 time_t getUptimeSeconds() const;
522
523 using ConfigReloadCallback = std::function<void()>;
524 void setConfigReloadCallback(ConfigReloadCallback && callback);
525 void reloadConfig() const;
526
527 void shutdown();
528
529 ActionLocksManagerPtr getActionLocksManager();
530
531 enum class ApplicationType
532 {
533 SERVER, /// The program is run as clickhouse-server daemon (default behavior)
534 CLIENT, /// clickhouse-client
535 LOCAL /// clickhouse-local
536 };
537
538 ApplicationType getApplicationType() const;
539 void setApplicationType(ApplicationType type);
540
541 /// Sets default_profile and system_profile, must be called once during the initialization
542 void setDefaultProfiles(const Poco::Util::AbstractConfiguration & config);
543 String getDefaultProfileName() const;
544 String getSystemProfileName() const;
545
546 /// Base path for format schemas
547 String getFormatSchemaPath() const;
548 void setFormatSchemaPath(const String & path);
549
550 /// User name and session identifier. Named sessions are local to users.
551 using SessionKey = std::pair<String, String>;
552
553 SampleBlockCache & getSampleBlockCache() const;
554
555 /// Query parameters for prepared statements.
556 bool hasQueryParameters() const;
557 const NameToNameMap & getQueryParameters() const;
558 void setQueryParameter(const String & name, const String & value);
559 void setQueryParameters(const NameToNameMap & parameters) { query_parameters = parameters; }
560
561#if USE_EMBEDDED_COMPILER
562 std::shared_ptr<CompiledExpressionCache> getCompiledExpressionCache() const;
563 void setCompiledExpressionCache(size_t cache_size);
564 void dropCompiledExpressionCache() const;
565#endif
566
567 /// Add started bridge command. It will be killed after context destruction
568 void addXDBCBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;
569
570 IHostContextPtr & getHostContext();
571 const IHostContextPtr & getHostContext() const;
572
573 struct MySQLWireContext
574 {
575 uint8_t sequence_id = 0;
576 uint32_t client_capabilities = 0;
577 size_t max_packet_size = 0;
578 };
579
580 MySQLWireContext mysql;
581private:
582 /** Check if the current client has access to the specified database.
583 * If access is denied, throw an exception.
584 * NOTE: This method should always be called when the `shared->mutex` mutex is acquired.
585 */
586 void checkDatabaseAccessRightsImpl(const std::string & database_name) const;
587
588 void setProfile(const String & profile);
589
590 EmbeddedDictionaries & getEmbeddedDictionariesImpl(bool throw_on_error) const;
591
592 StoragePtr getTableImpl(const String & database_name, const String & table_name, Exception * exception) const;
593
594 SessionKey getSessionKey(const String & session_id) const;
595
596 /// Session will be closed after specified timeout.
597 void scheduleCloseSession(const SessionKey & key, std::chrono::steady_clock::duration timeout);
598
599 void checkCanBeDropped(const String & database, const String & table, const size_t & size, const size_t & max_size_to_drop) const;
600};
601
602
603/// Allows executing DDL query only in one thread.
604/// Puts an element into the map, locks tables's mutex, counts how much threads run parallel query on the table,
605/// when counter is 0 erases element in the destructor.
606/// If the element already exists in the map, waits, when ddl query will be finished in other thread.
607class DDLGuard
608{
609public:
610 struct Entry
611 {
612 std::unique_ptr<std::mutex> mutex;
613 UInt32 counter;
614 };
615
616 /// Element name -> (mutex, counter).
617 /// NOTE: using std::map here (and not std::unordered_map) to avoid iterator invalidation on insertion.
618 using Map = std::map<String, Entry>;
619
620 DDLGuard(Map & map_, std::unique_lock<std::mutex> guards_lock_, const String & elem);
621 ~DDLGuard();
622
623private:
624 Map & map;
625 Map::iterator it;
626 std::unique_lock<std::mutex> guards_lock;
627 std::unique_lock<std::mutex> table_lock;
628};
629
630
631class SessionCleaner
632{
633public:
634 SessionCleaner(Context & context_)
635 : context{context_}
636 {
637 }
638 ~SessionCleaner();
639
640private:
641 void run();
642
643 Context & context;
644
645 std::mutex mutex;
646 std::condition_variable cond;
647 std::atomic<bool> quit{false};
648 ThreadFromGlobalPool thread{&SessionCleaner::run, this};
649};
650
651}
652