1#include <map>
2#include <set>
3#include <optional>
4#include <memory>
5#include <Poco/Mutex.h>
6#include <Poco/UUID.h>
7#include <Poco/Net/IPAddress.h>
8#include <Poco/Util/Application.h>
9#include <Common/Macros.h>
10#include <Common/escapeForFileName.h>
11#include <Common/setThreadName.h>
12#include <Common/Stopwatch.h>
13#include <Common/formatReadable.h>
14#include <Common/thread_local_rng.h>
15#include <Compression/ICompressionCodec.h>
16#include <Core/BackgroundSchedulePool.h>
17#include <Formats/FormatFactory.h>
18#include <Databases/IDatabase.h>
19#include <Storages/IStorage.h>
20#include <Storages/MarkCache.h>
21#include <Storages/MergeTree/BackgroundProcessingPool.h>
22#include <Storages/MergeTree/MergeList.h>
23#include <Storages/MergeTree/MergeTreeSettings.h>
24#include <Storages/CompressionCodecSelector.h>
25#include <TableFunctions/TableFunctionFactory.h>
26#include <Interpreters/ActionLocksManager.h>
27#include <Core/Settings.h>
28#include <Access/AccessControlManager.h>
29#include <Access/SettingsConstraints.h>
30#include <Access/QuotaContext.h>
31#include <Access/RowPolicyContext.h>
32#include <Interpreters/ExpressionJIT.h>
33#include <Interpreters/UsersManager.h>
34#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
35#include <Interpreters/EmbeddedDictionaries.h>
36#include <Interpreters/ExternalDictionariesLoader.h>
37#include <Interpreters/ExternalModelsLoader.h>
38#include <Interpreters/ExpressionActions.h>
39#include <Interpreters/ProcessList.h>
40#include <Interpreters/Cluster.h>
41#include <Interpreters/InterserverIOHandler.h>
42#include <Interpreters/SystemLog.h>
43#include <Interpreters/Context.h>
44#include <Interpreters/DDLWorker.h>
45#include <Common/DNSResolver.h>
46#include <IO/ReadBufferFromFile.h>
47#include <IO/UncompressedCache.h>
48#include <Parsers/ASTCreateQuery.h>
49#include <Parsers/ParserCreateQuery.h>
50#include <Parsers/parseQuery.h>
51#include <Common/StackTrace.h>
52#include <Common/Config/ConfigProcessor.h>
53#include <Common/ZooKeeper/ZooKeeper.h>
54#include <Common/ShellCommand.h>
55#include <Common/TraceCollector.h>
56#include <common/logger_useful.h>
57#include <Common/RemoteHostFilter.h>
58
59namespace ProfileEvents
60{
61 extern const Event ContextLock;
62 extern const Event CompiledCacheSizeBytes;
63}
64
65namespace CurrentMetrics
66{
67 extern const Metric ContextLockWait;
68 extern const Metric MemoryTrackingForMerges;
69 extern const Metric BackgroundMovePoolTask;
70 extern const Metric MemoryTrackingInBackgroundMoveProcessingPool;
71}
72
73
74namespace DB
75{
76
77namespace ErrorCodes
78{
79 extern const int DATABASE_ACCESS_DENIED;
80 extern const int UNKNOWN_DATABASE;
81 extern const int UNKNOWN_TABLE;
82 extern const int TABLE_ALREADY_EXISTS;
83 extern const int TABLE_WAS_NOT_DROPPED;
84 extern const int DATABASE_ALREADY_EXISTS;
85 extern const int THERE_IS_NO_SESSION;
86 extern const int THERE_IS_NO_QUERY;
87 extern const int NO_ELEMENTS_IN_CONFIG;
88 extern const int DDL_GUARD_IS_ACTIVE;
89 extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT;
90 extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT;
91 extern const int SESSION_NOT_FOUND;
92 extern const int SESSION_IS_LOCKED;
93 extern const int CANNOT_GET_CREATE_TABLE_QUERY;
94 extern const int LOGICAL_ERROR;
95 extern const int SCALAR_ALREADY_EXISTS;
96 extern const int UNKNOWN_SCALAR;
97 extern const int NOT_ENOUGH_PRIVILEGES;
98}
99
100
101/** Set of known objects (environment), that could be used in query.
102 * Shared (global) part. Order of members (especially, order of destruction) is very important.
103 */
104struct ContextShared
105{
106 Logger * log = &Logger::get("Context");
107
108 /// For access of most of shared objects. Recursive mutex.
109 mutable std::recursive_mutex mutex;
110 /// Separate mutex for access of dictionaries. Separate mutex to avoid locks when server doing request to itself.
111 mutable std::mutex embedded_dictionaries_mutex;
112 mutable std::mutex external_dictionaries_mutex;
113 mutable std::mutex external_models_mutex;
114 /// Separate mutex for re-initialization of zookeer session. This operation could take a long time and must not interfere with another operations.
115 mutable std::mutex zookeeper_mutex;
116
117 mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
118
119 String interserver_io_host; /// The host name by which this server is available for other servers.
120 UInt16 interserver_io_port = 0; /// and port.
121 String interserver_io_user;
122 String interserver_io_password;
123 String interserver_scheme; /// http or https
124
125 String path; /// Path to the data directory, with a slash at the end.
126 String tmp_path; /// The path to the temporary files that occur when processing the request.
127 String flags_path; /// Path to the directory with some control flags for server maintenance.
128 String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
129 String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries.
130 ConfigurationPtr config; /// Global configuration settings.
131
132 Databases databases; /// List of databases and tables in them.
133 mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
134 mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
135 mutable std::optional<ExternalModelsLoader> external_models_loader;
136 String default_profile_name; /// Default profile name used for default values.
137 String system_profile_name; /// Profile used by system processes
138 AccessControlManager access_control_manager;
139 std::unique_ptr<UsersManager> users_manager; /// Known users.
140 mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
141 mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
142 ProcessList process_list; /// Executing queries at the moment.
143 MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
144 ViewDependencies view_dependencies; /// Current dependencies
145 ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
146 InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
147 std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
148 std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
149 std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
150 MultiVersion<Macros> macros; /// Substitutions extracted from config.
151 std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
152 /// Rules for selecting the compression settings, depending on the size of the part.
153 mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
154 /// Storage disk chooser
155 mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector;
156 /// Storage policy chooser
157 mutable std::unique_ptr<StoragePolicySelector> merge_tree_storage_policy_selector;
158
159 std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
160 std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
161 std::atomic_size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
162 String format_schema_path; /// Path to a directory that contains schema files used by input formats.
163 ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
164 std::optional<SystemLogs> system_logs; /// Used to log queries and operations on parts
165
166 RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
167
168 std::unique_ptr<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
169 /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests.
170
171 class SessionKeyHash
172 {
173 public:
174 size_t operator()(const Context::SessionKey & key) const
175 {
176 SipHash hash;
177 hash.update(key.first);
178 hash.update(key.second);
179 return hash.get64();
180 }
181 };
182
183 using Sessions = std::unordered_map<Context::SessionKey, std::shared_ptr<Context>, SessionKeyHash>;
184 using CloseTimes = std::deque<std::vector<Context::SessionKey>>;
185 mutable Sessions sessions;
186 mutable CloseTimes close_times;
187 std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1);
188 std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now();
189 UInt64 close_cycle = 0;
190
191 /// Clusters for distributed tables
192 /// Initialized on demand (on distributed storages initialization) since Settings should be initialized
193 std::unique_ptr<Clusters> clusters;
194 ConfigurationPtr clusters_config; /// Soteres updated configs
195 mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
196
197#if USE_EMBEDDED_COMPILER
198 std::shared_ptr<CompiledExpressionCache> compiled_expression_cache;
199#endif
200
201 bool shutdown_called = false;
202
203 /// Do not allow simultaneous execution of DDL requests on the same table.
204 /// database -> object -> (mutex, counter), counter: how many threads are running a query on the table at the same time
205 /// For the duration of the operation, an element is placed here, and an object is returned,
206 /// which deletes the element in the destructor when counter becomes zero.
207 /// In case the element already exists, waits, when query will be executed in other thread. See class DDLGuard below.
208 using DDLGuards = std::unordered_map<String, DDLGuard::Map>;
209 DDLGuards ddl_guards;
210 /// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order.
211 mutable std::mutex ddl_guards_mutex;
212
213 Stopwatch uptime_watch;
214
215 Context::ApplicationType application_type = Context::ApplicationType::SERVER;
216
217 /// vector of xdbc-bridge commands, they will be killed when Context will be destroyed
218 std::vector<std::unique_ptr<ShellCommand>> bridge_commands;
219
220 Context::ConfigReloadCallback config_reload_callback;
221
222 ContextShared()
223 : macros(std::make_unique<Macros>())
224 {
225 /// TODO: make it singleton (?)
226 static std::atomic<size_t> num_calls{0};
227 if (++num_calls > 1)
228 {
229 std::cerr << "Attempting to create multiple ContextShared instances. Stack trace:\n" << StackTrace().toString();
230 std::cerr.flush();
231 std::terminate();
232 }
233
234 initialize();
235 }
236
237
238 ~ContextShared()
239 {
240 try
241 {
242 shutdown();
243 }
244 catch (...)
245 {
246 tryLogCurrentException(__PRETTY_FUNCTION__);
247 }
248 }
249
250
251 /** Perform a complex job of destroying objects in advance.
252 */
253 void shutdown()
254 {
255 if (shutdown_called)
256 return;
257 shutdown_called = true;
258
259 /** After system_logs have been shut down it is guaranteed that no system table gets created or written to.
260 * Note that part changes at shutdown won't be logged to part log.
261 */
262
263 if (system_logs)
264 system_logs->shutdown();
265
266 /** At this point, some tables may have threads that block our mutex.
267 * To shutdown them correctly, we will copy the current list of tables,
268 * and ask them all to finish their work.
269 * Then delete all objects with tables.
270 */
271
272 Databases current_databases;
273
274 {
275 std::lock_guard lock(mutex);
276 current_databases = databases;
277 }
278
279 /// We still hold "databases" in Context (instead of std::move) for Buffer tables to flush data correctly.
280
281 for (auto & database : current_databases)
282 database.second->shutdown();
283
284 {
285 std::lock_guard lock(mutex);
286 databases.clear();
287 }
288
289 /// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
290 /// TODO: Get rid of this.
291
292 system_logs.reset();
293 embedded_dictionaries.reset();
294 external_dictionaries_loader.reset();
295 external_models_loader.reset();
296 background_pool.reset();
297 background_move_pool.reset();
298 schedule_pool.reset();
299 ddl_worker.reset();
300
301 /// Stop trace collector if any
302 trace_collector.reset();
303 }
304
305 bool hasTraceCollector()
306 {
307 return trace_collector != nullptr;
308 }
309
310 void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log)
311 {
312 if (trace_log == nullptr)
313 return;
314
315 trace_collector = std::make_unique<TraceCollector>(trace_log);
316 }
317
318private:
319 void initialize()
320 {
321 users_manager = std::make_unique<UsersManager>();
322 }
323};
324
325
326Context::Context() = default;
327Context::Context(const Context &) = default;
328Context & Context::operator=(const Context &) = default;
329
330
331Context Context::createGlobal()
332{
333 Context res;
334 res.quota = std::make_shared<QuotaContext>();
335 res.row_policy = std::make_shared<RowPolicyContext>();
336 res.shared = std::make_shared<ContextShared>();
337 return res;
338}
339
340Context::~Context() = default;
341
342
343InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
344
345std::unique_lock<std::recursive_mutex> Context::getLock() const
346{
347 ProfileEvents::increment(ProfileEvents::ContextLock);
348 CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
349 return std::unique_lock(shared->mutex);
350}
351
352ProcessList & Context::getProcessList() { return shared->process_list; }
353const ProcessList & Context::getProcessList() const { return shared->process_list; }
354MergeList & Context::getMergeList() { return shared->merge_list; }
355const MergeList & Context::getMergeList() const { return shared->merge_list; }
356
357
358const Databases Context::getDatabases() const
359{
360 auto lock = getLock();
361 return shared->databases;
362}
363
364Databases Context::getDatabases()
365{
366 auto lock = getLock();
367 return shared->databases;
368}
369
370
371Context::SessionKey Context::getSessionKey(const String & session_id) const
372{
373 auto & user_name = client_info.current_user;
374
375 if (user_name.empty())
376 throw Exception("Empty user name.", ErrorCodes::LOGICAL_ERROR);
377
378 return SessionKey(user_name, session_id);
379}
380
381
382void Context::scheduleCloseSession(const Context::SessionKey & key, std::chrono::steady_clock::duration timeout)
383{
384 const UInt64 close_index = timeout / shared->close_interval + 1;
385 const auto new_close_cycle = shared->close_cycle + close_index;
386
387 if (session_close_cycle != new_close_cycle)
388 {
389 session_close_cycle = new_close_cycle;
390 if (shared->close_times.size() < close_index + 1)
391 shared->close_times.resize(close_index + 1);
392 shared->close_times[close_index].emplace_back(key);
393 }
394}
395
396
397std::shared_ptr<Context> Context::acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check) const
398{
399 auto lock = getLock();
400
401 const auto & key = getSessionKey(session_id);
402 auto it = shared->sessions.find(key);
403
404 if (it == shared->sessions.end())
405 {
406 if (session_check)
407 throw Exception("Session not found.", ErrorCodes::SESSION_NOT_FOUND);
408
409 auto new_session = std::make_shared<Context>(*this);
410
411 new_session->scheduleCloseSession(key, timeout);
412
413 it = shared->sessions.insert(std::make_pair(key, std::move(new_session))).first;
414 }
415 else if (it->second->client_info.current_user != client_info.current_user)
416 {
417 throw Exception("Session belongs to a different user", ErrorCodes::LOGICAL_ERROR);
418 }
419
420 const auto & session = it->second;
421
422 if (session->session_is_used)
423 throw Exception("Session is locked by a concurrent client.", ErrorCodes::SESSION_IS_LOCKED);
424 session->session_is_used = true;
425
426 session->client_info = client_info;
427
428 return session;
429}
430
431
432void Context::releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout)
433{
434 auto lock = getLock();
435
436 session_is_used = false;
437 scheduleCloseSession(getSessionKey(session_id), timeout);
438}
439
440
441std::chrono::steady_clock::duration Context::closeSessions() const
442{
443 auto lock = getLock();
444
445 const auto now = std::chrono::steady_clock::now();
446
447 if (now < shared->close_cycle_time)
448 return shared->close_cycle_time - now;
449
450 const auto current_cycle = shared->close_cycle;
451
452 ++shared->close_cycle;
453 shared->close_cycle_time = now + shared->close_interval;
454
455 if (shared->close_times.empty())
456 return shared->close_interval;
457
458 auto & sessions_to_close = shared->close_times.front();
459
460 for (const auto & key : sessions_to_close)
461 {
462 const auto session = shared->sessions.find(key);
463
464 if (session != shared->sessions.end() && session->second->session_close_cycle <= current_cycle)
465 {
466 if (session->second->session_is_used)
467 session->second->scheduleCloseSession(key, std::chrono::seconds(0));
468 else
469 shared->sessions.erase(session);
470 }
471 }
472
473 shared->close_times.pop_front();
474
475 return shared->close_interval;
476}
477
478
479static String resolveDatabase(const String & database_name, const String & current_database)
480{
481 String res = database_name.empty() ? current_database : database_name;
482 if (res.empty())
483 throw Exception("Default database is not selected", ErrorCodes::UNKNOWN_DATABASE);
484 return res;
485}
486
487
488const DatabasePtr Context::getDatabase(const String & database_name) const
489{
490 auto lock = getLock();
491 String db = resolveDatabase(database_name, current_database);
492 assertDatabaseExists(db);
493 return shared->databases[db];
494}
495
496DatabasePtr Context::getDatabase(const String & database_name)
497{
498 auto lock = getLock();
499 String db = resolveDatabase(database_name, current_database);
500 assertDatabaseExists(db);
501 return shared->databases[db];
502}
503
504const DatabasePtr Context::tryGetDatabase(const String & database_name) const
505{
506 auto lock = getLock();
507 String db = resolveDatabase(database_name, current_database);
508 auto it = shared->databases.find(db);
509 if (it == shared->databases.end())
510 return {};
511 return it->second;
512}
513
514DatabasePtr Context::tryGetDatabase(const String & database_name)
515{
516 auto lock = getLock();
517 String db = resolveDatabase(database_name, current_database);
518 auto it = shared->databases.find(db);
519 if (it == shared->databases.end())
520 return {};
521 return it->second;
522}
523
524String Context::getPath() const
525{
526 auto lock = getLock();
527 return shared->path;
528}
529
530String Context::getTemporaryPath() const
531{
532 auto lock = getLock();
533 return shared->tmp_path;
534}
535
536String Context::getFlagsPath() const
537{
538 auto lock = getLock();
539 return shared->flags_path;
540}
541
542String Context::getUserFilesPath() const
543{
544 auto lock = getLock();
545 return shared->user_files_path;
546}
547
548String Context::getDictionariesLibPath() const
549{
550 auto lock = getLock();
551 return shared->dictionaries_lib_path;
552}
553
554void Context::setPath(const String & path)
555{
556 auto lock = getLock();
557
558 shared->path = path;
559
560 if (shared->tmp_path.empty())
561 shared->tmp_path = shared->path + "tmp/";
562
563 if (shared->flags_path.empty())
564 shared->flags_path = shared->path + "flags/";
565
566 if (shared->user_files_path.empty())
567 shared->user_files_path = shared->path + "user_files/";
568
569 if (shared->dictionaries_lib_path.empty())
570 shared->dictionaries_lib_path = shared->path + "dictionaries_lib/";
571}
572
573void Context::setTemporaryPath(const String & path)
574{
575 auto lock = getLock();
576 shared->tmp_path = path;
577}
578
579void Context::setFlagsPath(const String & path)
580{
581 auto lock = getLock();
582 shared->flags_path = path;
583}
584
585void Context::setUserFilesPath(const String & path)
586{
587 auto lock = getLock();
588 shared->user_files_path = path;
589}
590
591void Context::setDictionariesLibPath(const String & path)
592{
593 auto lock = getLock();
594 shared->dictionaries_lib_path = path;
595}
596
597void Context::setConfig(const ConfigurationPtr & config)
598{
599 auto lock = getLock();
600 shared->config = config;
601}
602
603const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
604{
605 auto lock = getLock();
606 return shared->config ? *shared->config : Poco::Util::Application::instance().config();
607}
608
609AccessControlManager & Context::getAccessControlManager()
610{
611 auto lock = getLock();
612 return shared->access_control_manager;
613}
614
615const AccessControlManager & Context::getAccessControlManager() const
616{
617 auto lock = getLock();
618 return shared->access_control_manager;
619}
620
621void Context::checkQuotaManagementIsAllowed()
622{
623 if (!is_quota_management_allowed)
624 throw Exception(
625 "User " + client_info.current_user + " doesn't have enough privileges to manage quotas", ErrorCodes::NOT_ENOUGH_PRIVILEGES);
626}
627
628void Context::checkRowPolicyManagementIsAllowed()
629{
630 if (!is_row_policy_management_allowed)
631 throw Exception(
632 "User " + client_info.current_user + " doesn't have enough privileges to manage row policies", ErrorCodes::NOT_ENOUGH_PRIVILEGES);
633}
634
635void Context::setUsersConfig(const ConfigurationPtr & config)
636{
637 auto lock = getLock();
638 shared->users_config = config;
639 shared->access_control_manager.loadFromConfig(*shared->users_config);
640 shared->users_manager->loadFromConfig(*shared->users_config);
641}
642
643ConfigurationPtr Context::getUsersConfig()
644{
645 auto lock = getLock();
646 return shared->users_config;
647}
648
649void Context::calculateUserSettings()
650{
651 auto lock = getLock();
652
653 auto user = getUser(client_info.current_user);
654 String profile = user->profile;
655
656 /// 1) Set default settings (hardcoded values)
657 /// NOTE: we ignore global_context settings (from which it is usually copied)
658 /// NOTE: global_context settings are immutable and not auto updated
659 settings = Settings();
660 settings_constraints = nullptr;
661
662 /// 2) Apply settings from default profile
663 auto default_profile_name = getDefaultProfileName();
664 if (profile != default_profile_name)
665 setProfile(default_profile_name);
666
667 /// 3) Apply settings from current user
668 setProfile(profile);
669
670 quota = getAccessControlManager().createQuotaContext(
671 client_info.current_user, client_info.current_address.host(), client_info.quota_key);
672 is_quota_management_allowed = user->is_quota_management_allowed;
673 row_policy = getAccessControlManager().getRowPolicyContext(client_info.current_user);
674 is_row_policy_management_allowed = user->is_row_policy_management_allowed;
675}
676
677
678void Context::setProfile(const String & profile)
679{
680 settings.setProfile(profile, *shared->users_config);
681
682 auto new_constraints
683 = settings_constraints ? std::make_shared<SettingsConstraints>(*settings_constraints) : std::make_shared<SettingsConstraints>();
684 new_constraints->setProfile(profile, *shared->users_config);
685 settings_constraints = std::move(new_constraints);
686}
687
688std::shared_ptr<const User> Context::getUser(const String & user_name)
689{
690 return shared->users_manager->getUser(user_name);
691}
692
693void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key)
694{
695 auto lock = getLock();
696
697 auto user_props = shared->users_manager->authorizeAndGetUser(name, password, address.host());
698
699 client_info.current_user = name;
700 client_info.current_address = address;
701 client_info.current_password = password;
702
703 if (!quota_key.empty())
704 client_info.quota_key = quota_key;
705
706 calculateUserSettings();
707}
708
709
710void Context::checkDatabaseAccessRights(const std::string & database_name) const
711{
712 auto lock = getLock();
713 checkDatabaseAccessRightsImpl(database_name);
714}
715
716bool Context::hasDatabaseAccessRights(const String & database_name) const
717{
718 auto lock = getLock();
719 return client_info.current_user.empty() || (database_name == "system") ||
720 shared->users_manager->hasAccessToDatabase(client_info.current_user, database_name);
721}
722
723bool Context::hasDictionaryAccessRights(const String & dictionary_name) const
724{
725 auto lock = getLock();
726 return client_info.current_user.empty() ||
727 shared->users_manager->hasAccessToDictionary(client_info.current_user, dictionary_name);
728}
729
730void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) const
731{
732 if (client_info.current_user.empty() || (database_name == "system"))
733 {
734 /// An unnamed user, i.e. server, has access to all databases.
735 /// All users have access to the database system.
736 return;
737 }
738 if (!shared->users_manager->hasAccessToDatabase(client_info.current_user, database_name))
739 throw Exception("Access denied to database " + database_name + " for user " + client_info.current_user , ErrorCodes::DATABASE_ACCESS_DENIED);
740}
741
742void Context::addDependencyUnsafe(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
743{
744 checkDatabaseAccessRightsImpl(from.first);
745 checkDatabaseAccessRightsImpl(where.first);
746 shared->view_dependencies[from].insert(where);
747
748 // Notify table of dependencies change
749 auto table = tryGetTable(from.first, from.second);
750 if (table != nullptr)
751 table->updateDependencies();
752}
753
754void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
755{
756 auto lock = getLock();
757 addDependencyUnsafe(from, where);
758}
759
760void Context::removeDependencyUnsafe(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
761{
762 checkDatabaseAccessRightsImpl(from.first);
763 checkDatabaseAccessRightsImpl(where.first);
764 shared->view_dependencies[from].erase(where);
765
766 // Notify table of dependencies change
767 auto table = tryGetTable(from.first, from.second);
768 if (table != nullptr)
769 table->updateDependencies();
770}
771
772void Context::removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
773{
774 auto lock = getLock();
775 removeDependencyUnsafe(from, where);
776}
777
778Dependencies Context::getDependencies(const String & database_name, const String & table_name) const
779{
780 auto lock = getLock();
781
782 String db = resolveDatabase(database_name, current_database);
783
784 if (database_name.empty() && tryGetExternalTable(table_name))
785 {
786 /// Table is temporary. Access granted.
787 }
788 else
789 {
790 checkDatabaseAccessRightsImpl(db);
791 }
792
793 ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name));
794 if (iter == shared->view_dependencies.end())
795 return {};
796
797 return Dependencies(iter->second.begin(), iter->second.end());
798}
799
800bool Context::isTableExist(const String & database_name, const String & table_name) const
801{
802 auto lock = getLock();
803
804 String db = resolveDatabase(database_name, current_database);
805 checkDatabaseAccessRightsImpl(db);
806
807 Databases::const_iterator it = shared->databases.find(db);
808 return shared->databases.end() != it
809 && it->second->isTableExist(*this, table_name);
810}
811
812bool Context::isDictionaryExists(const String & database_name, const String & dictionary_name) const
813{
814 auto lock = getLock();
815
816 String db = resolveDatabase(database_name, current_database);
817 checkDatabaseAccessRightsImpl(db);
818
819 Databases::const_iterator it = shared->databases.find(db);
820 return shared->databases.end() != it && it->second->isDictionaryExist(*this, dictionary_name);
821}
822
823bool Context::isDatabaseExist(const String & database_name) const
824{
825 auto lock = getLock();
826 String db = resolveDatabase(database_name, current_database);
827 checkDatabaseAccessRightsImpl(db);
828 return shared->databases.end() != shared->databases.find(db);
829}
830
831bool Context::isExternalTableExist(const String & table_name) const
832{
833 return external_tables.end() != external_tables.find(table_name);
834}
835
836
837void Context::assertTableDoesntExist(const String & database_name, const String & table_name, bool check_database_access_rights) const
838{
839 auto lock = getLock();
840
841 String db = resolveDatabase(database_name, current_database);
842 if (check_database_access_rights)
843 checkDatabaseAccessRightsImpl(db);
844
845 Databases::const_iterator it = shared->databases.find(db);
846 if (shared->databases.end() != it && it->second->isTableExist(*this, table_name))
847 throw Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
848}
849
850
851void Context::assertDatabaseExists(const String & database_name, bool check_database_access_rights) const
852{
853 auto lock = getLock();
854
855 String db = resolveDatabase(database_name, current_database);
856 if (check_database_access_rights)
857 checkDatabaseAccessRightsImpl(db);
858
859 if (shared->databases.end() == shared->databases.find(db))
860 throw Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
861}
862
863
864void Context::assertDatabaseDoesntExist(const String & database_name) const
865{
866 auto lock = getLock();
867
868 String db = resolveDatabase(database_name, current_database);
869 checkDatabaseAccessRightsImpl(db);
870
871 if (shared->databases.end() != shared->databases.find(db))
872 throw Exception("Database " + backQuoteIfNeed(db) + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
873}
874
875
876const Scalars & Context::getScalars() const
877{
878 return scalars;
879}
880
881
882const Block & Context::getScalar(const String & name) const
883{
884 auto it = scalars.find(name);
885 if (scalars.end() == it)
886 throw Exception("Scalar " + backQuoteIfNeed(name) + " doesn't exist (internal bug)", ErrorCodes::UNKNOWN_SCALAR);
887 return it->second;
888}
889
890
891Tables Context::getExternalTables() const
892{
893 auto lock = getLock();
894
895 Tables res;
896 for (auto & table : external_tables)
897 res[table.first] = table.second.first;
898
899 if (session_context && session_context != this)
900 {
901 Tables buf = session_context->getExternalTables();
902 res.insert(buf.begin(), buf.end());
903 }
904 else if (global_context && global_context != this)
905 {
906 Tables buf = global_context->getExternalTables();
907 res.insert(buf.begin(), buf.end());
908 }
909 return res;
910}
911
912
913StoragePtr Context::tryGetExternalTable(const String & table_name) const
914{
915 TableAndCreateASTs::const_iterator jt = external_tables.find(table_name);
916 if (external_tables.end() == jt)
917 return StoragePtr();
918
919 return jt->second.first;
920}
921
922
923StoragePtr Context::getTable(const String & database_name, const String & table_name) const
924{
925 Exception exc;
926 auto res = getTableImpl(database_name, table_name, &exc);
927 if (!res)
928 throw exc;
929 return res;
930}
931
932
933StoragePtr Context::tryGetTable(const String & database_name, const String & table_name) const
934{
935 return getTableImpl(database_name, table_name, nullptr);
936}
937
938
939StoragePtr Context::getTableImpl(const String & database_name, const String & table_name, Exception * exception) const
940{
941 String db;
942 DatabasePtr database;
943
944 {
945 auto lock = getLock();
946
947 if (database_name.empty())
948 {
949 StoragePtr res = tryGetExternalTable(table_name);
950 if (res)
951 return res;
952 }
953
954 db = resolveDatabase(database_name, current_database);
955 checkDatabaseAccessRightsImpl(db);
956
957 Databases::const_iterator it = shared->databases.find(db);
958 if (shared->databases.end() == it)
959 {
960 if (exception)
961 *exception = Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
962 return {};
963 }
964
965 database = it->second;
966 }
967
968 auto table = database->tryGetTable(*this, table_name);
969 if (!table)
970 {
971 if (exception)
972 *exception = Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
973 return {};
974 }
975
976 return table;
977}
978
979
980void Context::addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast)
981{
982 if (external_tables.end() != external_tables.find(table_name))
983 throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
984
985 external_tables[table_name] = std::pair(storage, ast);
986}
987
988
989void Context::addScalar(const String & name, const Block & block)
990{
991 scalars[name] = block;
992}
993
994
995bool Context::hasScalar(const String & name) const
996{
997 return scalars.count(name);
998}
999
1000
1001StoragePtr Context::tryRemoveExternalTable(const String & table_name)
1002{
1003 TableAndCreateASTs::const_iterator it = external_tables.find(table_name);
1004
1005 if (external_tables.end() == it)
1006 return StoragePtr();
1007
1008 auto storage = it->second.first;
1009 external_tables.erase(it);
1010 return storage;
1011}
1012
1013
1014StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
1015{
1016 /// Slightly suboptimal.
1017 auto hash = table_expression->getTreeHash();
1018 String key = toString(hash.first) + '_' + toString(hash.second);
1019
1020 StoragePtr & res = table_function_results[key];
1021
1022 if (!res)
1023 {
1024 TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);
1025
1026 /// Run it and remember the result
1027 res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
1028 }
1029
1030 return res;
1031}
1032
1033
1034void Context::addViewSource(const StoragePtr & storage)
1035{
1036 if (view_source)
1037 throw Exception(
1038 "Temporary view source storage " + backQuoteIfNeed(view_source->getName()) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
1039 view_source = storage;
1040}
1041
1042
1043StoragePtr Context::getViewSource()
1044{
1045 return view_source;
1046}
1047
1048
1049DDLGuard::DDLGuard(Map & map_, std::unique_lock<std::mutex> guards_lock_, const String & elem)
1050 : map(map_), guards_lock(std::move(guards_lock_))
1051{
1052 it = map.emplace(elem, Entry{std::make_unique<std::mutex>(), 0}).first;
1053 ++it->second.counter;
1054 guards_lock.unlock();
1055 table_lock = std::unique_lock(*it->second.mutex);
1056}
1057
1058DDLGuard::~DDLGuard()
1059{
1060 guards_lock.lock();
1061 --it->second.counter;
1062 if (!it->second.counter)
1063 {
1064 table_lock.unlock();
1065 map.erase(it);
1066 }
1067}
1068
1069std::unique_ptr<DDLGuard> Context::getDDLGuard(const String & database, const String & table) const
1070{
1071 std::unique_lock lock(shared->ddl_guards_mutex);
1072 return std::make_unique<DDLGuard>(shared->ddl_guards[database], std::move(lock), table);
1073}
1074
1075
1076void Context::addDatabase(const String & database_name, const DatabasePtr & database)
1077{
1078 auto lock = getLock();
1079
1080 assertDatabaseDoesntExist(database_name);
1081 shared->databases[database_name] = database;
1082}
1083
1084
1085DatabasePtr Context::detachDatabase(const String & database_name)
1086{
1087 auto lock = getLock();
1088 auto res = getDatabase(database_name);
1089 shared->databases.erase(database_name);
1090
1091 return res;
1092}
1093
1094
1095ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const
1096{
1097 TableAndCreateASTs::const_iterator jt = external_tables.find(table_name);
1098 if (external_tables.end() == jt)
1099 throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " doesn't exist", ErrorCodes::UNKNOWN_TABLE);
1100
1101 return jt->second.second;
1102}
1103
1104Settings Context::getSettings() const
1105{
1106 return settings;
1107}
1108
1109
1110void Context::setSettings(const Settings & settings_)
1111{
1112 settings = settings_;
1113}
1114
1115
1116void Context::setSetting(const String & name, const String & value)
1117{
1118 auto lock = getLock();
1119 if (name == "profile")
1120 {
1121 setProfile(value);
1122 return;
1123 }
1124 settings.set(name, value);
1125}
1126
1127
1128void Context::setSetting(const String & name, const Field & value)
1129{
1130 auto lock = getLock();
1131 if (name == "profile")
1132 {
1133 setProfile(value.safeGet<String>());
1134 return;
1135 }
1136 settings.set(name, value);
1137}
1138
1139
1140void Context::applySettingChange(const SettingChange & change)
1141{
1142 setSetting(change.name, change.value);
1143}
1144
1145
1146void Context::applySettingsChanges(const SettingsChanges & changes)
1147{
1148 auto lock = getLock();
1149 for (const SettingChange & change : changes)
1150 applySettingChange(change);
1151}
1152
1153
1154void Context::checkSettingsConstraints(const SettingChange & change)
1155{
1156 if (settings_constraints)
1157 settings_constraints->check(settings, change);
1158}
1159
1160
1161void Context::checkSettingsConstraints(const SettingsChanges & changes)
1162{
1163 if (settings_constraints)
1164 settings_constraints->check(settings, changes);
1165}
1166
1167
1168String Context::getCurrentDatabase() const
1169{
1170 return current_database;
1171}
1172
1173
1174String Context::getCurrentQueryId() const
1175{
1176 return client_info.current_query_id;
1177}
1178
1179
1180String Context::getInitialQueryId() const
1181{
1182 return client_info.initial_query_id;
1183}
1184
1185
1186void Context::setCurrentDatabase(const String & name)
1187{
1188 auto lock = getLock();
1189 assertDatabaseExists(name);
1190 current_database = name;
1191}
1192
1193
1194void Context::setCurrentQueryId(const String & query_id)
1195{
1196 if (!client_info.current_query_id.empty())
1197 throw Exception("Logical error: attempt to set query_id twice", ErrorCodes::LOGICAL_ERROR);
1198
1199 String query_id_to_set = query_id;
1200
1201 if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves.
1202 {
1203 /// Generate random UUID, but using lower quality RNG,
1204 /// because Poco::UUIDGenerator::generateRandom method is using /dev/random, that is very expensive.
1205 /// NOTE: Actually we don't need to use UUIDs for query identifiers.
1206 /// We could use any suitable string instead.
1207
1208 union
1209 {
1210 char bytes[16];
1211 struct
1212 {
1213 UInt64 a;
1214 UInt64 b;
1215 } words;
1216 } random;
1217
1218 random.words.a = thread_local_rng(); //-V656
1219 random.words.b = thread_local_rng(); //-V656
1220
1221 /// Use protected constructor.
1222 struct qUUID : Poco::UUID
1223 {
1224 qUUID(const char * bytes, Poco::UUID::Version version)
1225 : Poco::UUID(bytes, version) {}
1226 };
1227
1228 query_id_to_set = qUUID(random.bytes, Poco::UUID::UUID_RANDOM).toString();
1229 }
1230
1231 client_info.current_query_id = query_id_to_set;
1232}
1233
1234void Context::killCurrentQuery()
1235{
1236 if (process_list_elem)
1237 {
1238 process_list_elem->cancelQuery(true);
1239 }
1240};
1241
1242String Context::getDefaultFormat() const
1243{
1244 return default_format.empty() ? "TabSeparated" : default_format;
1245}
1246
1247
1248void Context::setDefaultFormat(const String & name)
1249{
1250 default_format = name;
1251}
1252
1253MultiVersion<Macros>::Version Context::getMacros() const
1254{
1255 return shared->macros.get();
1256}
1257
1258void Context::setMacros(std::unique_ptr<Macros> && macros)
1259{
1260 shared->macros.set(std::move(macros));
1261}
1262
1263const Context & Context::getQueryContext() const
1264{
1265 if (!query_context)
1266 throw Exception("There is no query", ErrorCodes::THERE_IS_NO_QUERY);
1267 return *query_context;
1268}
1269
1270Context & Context::getQueryContext()
1271{
1272 if (!query_context)
1273 throw Exception("There is no query", ErrorCodes::THERE_IS_NO_QUERY);
1274 return *query_context;
1275}
1276
1277const Context & Context::getSessionContext() const
1278{
1279 if (!session_context)
1280 throw Exception("There is no session", ErrorCodes::THERE_IS_NO_SESSION);
1281 return *session_context;
1282}
1283
1284Context & Context::getSessionContext()
1285{
1286 if (!session_context)
1287 throw Exception("There is no session", ErrorCodes::THERE_IS_NO_SESSION);
1288 return *session_context;
1289}
1290
1291const Context & Context::getGlobalContext() const
1292{
1293 if (!global_context)
1294 throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
1295 return *global_context;
1296}
1297
1298Context & Context::getGlobalContext()
1299{
1300 if (!global_context)
1301 throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
1302 return *global_context;
1303}
1304
1305
1306const EmbeddedDictionaries & Context::getEmbeddedDictionaries() const
1307{
1308 return getEmbeddedDictionariesImpl(false);
1309}
1310
1311EmbeddedDictionaries & Context::getEmbeddedDictionaries()
1312{
1313 return getEmbeddedDictionariesImpl(false);
1314}
1315
1316
1317const ExternalDictionariesLoader & Context::getExternalDictionariesLoader() const
1318{
1319 std::lock_guard lock(shared->external_dictionaries_mutex);
1320 if (!shared->external_dictionaries_loader)
1321 {
1322 if (!this->global_context)
1323 throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
1324
1325 shared->external_dictionaries_loader.emplace(*this->global_context);
1326 }
1327 return *shared->external_dictionaries_loader;
1328}
1329
1330ExternalDictionariesLoader & Context::getExternalDictionariesLoader()
1331{
1332 return const_cast<ExternalDictionariesLoader &>(const_cast<const Context *>(this)->getExternalDictionariesLoader());
1333}
1334
1335
1336const ExternalModelsLoader & Context::getExternalModelsLoader() const
1337{
1338 std::lock_guard lock(shared->external_models_mutex);
1339 if (!shared->external_models_loader)
1340 {
1341 if (!this->global_context)
1342 throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
1343
1344 shared->external_models_loader.emplace(*this->global_context);
1345 }
1346 return *shared->external_models_loader;
1347}
1348
1349ExternalModelsLoader & Context::getExternalModelsLoader()
1350{
1351 return const_cast<ExternalModelsLoader &>(const_cast<const Context *>(this)->getExternalModelsLoader());
1352}
1353
1354
1355EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_error) const
1356{
1357 std::lock_guard lock(shared->embedded_dictionaries_mutex);
1358
1359 if (!shared->embedded_dictionaries)
1360 {
1361 auto geo_dictionaries_loader = std::make_unique<GeoDictionariesLoader>();
1362
1363 shared->embedded_dictionaries.emplace(
1364 std::move(geo_dictionaries_loader),
1365 *this->global_context,
1366 throw_on_error);
1367 }
1368
1369 return *shared->embedded_dictionaries;
1370}
1371
1372
1373void Context::tryCreateEmbeddedDictionaries() const
1374{
1375 static_cast<void>(getEmbeddedDictionariesImpl(true));
1376}
1377
1378
1379void Context::setProgressCallback(ProgressCallback callback)
1380{
1381 /// Callback is set to a session or to a query. In the session, only one query is processed at a time. Therefore, the lock is not needed.
1382 progress_callback = callback;
1383}
1384
1385ProgressCallback Context::getProgressCallback() const
1386{
1387 return progress_callback;
1388}
1389
1390
1391void Context::setProcessListElement(ProcessList::Element * elem)
1392{
1393 /// Set to a session or query. In the session, only one query is processed at a time. Therefore, the lock is not needed.
1394 process_list_elem = elem;
1395}
1396
1397ProcessList::Element * Context::getProcessListElement() const
1398{
1399 return process_list_elem;
1400}
1401
1402
1403void Context::setUncompressedCache(size_t max_size_in_bytes)
1404{
1405 auto lock = getLock();
1406
1407 if (shared->uncompressed_cache)
1408 throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
1409
1410 shared->uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
1411}
1412
1413
1414UncompressedCachePtr Context::getUncompressedCache() const
1415{
1416 auto lock = getLock();
1417 return shared->uncompressed_cache;
1418}
1419
1420
1421void Context::dropUncompressedCache() const
1422{
1423 auto lock = getLock();
1424 if (shared->uncompressed_cache)
1425 shared->uncompressed_cache->reset();
1426}
1427
1428
1429void Context::setMarkCache(size_t cache_size_in_bytes)
1430{
1431 auto lock = getLock();
1432
1433 if (shared->mark_cache)
1434 throw Exception("Mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
1435
1436 shared->mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes);
1437}
1438
1439
1440MarkCachePtr Context::getMarkCache() const
1441{
1442 auto lock = getLock();
1443 return shared->mark_cache;
1444}
1445
1446
1447void Context::dropMarkCache() const
1448{
1449 auto lock = getLock();
1450 if (shared->mark_cache)
1451 shared->mark_cache->reset();
1452}
1453
1454
1455void Context::dropCaches() const
1456{
1457 auto lock = getLock();
1458
1459 if (shared->uncompressed_cache)
1460 shared->uncompressed_cache->reset();
1461
1462 if (shared->mark_cache)
1463 shared->mark_cache->reset();
1464}
1465
1466BackgroundProcessingPool & Context::getBackgroundPool()
1467{
1468 auto lock = getLock();
1469 if (!shared->background_pool)
1470 shared->background_pool.emplace(settings.background_pool_size);
1471 return *shared->background_pool;
1472}
1473
1474BackgroundProcessingPool & Context::getBackgroundMovePool()
1475{
1476 auto lock = getLock();
1477 if (!shared->background_move_pool)
1478 {
1479 BackgroundProcessingPool::PoolSettings pool_settings;
1480 auto & config = getConfigRef();
1481 pool_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds", 10);
1482 pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part", 1.0);
1483 pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
1484 pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min", 10);
1485 pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max", 600);
1486 pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
1487 pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
1488 pool_settings.tasks_metric = CurrentMetrics::BackgroundMovePoolTask;
1489 pool_settings.memory_metric = CurrentMetrics::MemoryTrackingInBackgroundMoveProcessingPool;
1490 shared->background_move_pool.emplace(settings.background_move_pool_size, pool_settings, "BackgroundMovePool", "BgMoveProcPool");
1491 }
1492 return *shared->background_move_pool;
1493}
1494
1495BackgroundSchedulePool & Context::getSchedulePool()
1496{
1497 auto lock = getLock();
1498 if (!shared->schedule_pool)
1499 shared->schedule_pool.emplace(settings.background_schedule_pool_size);
1500 return *shared->schedule_pool;
1501}
1502
1503void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker)
1504{
1505 auto lock = getLock();
1506 if (shared->ddl_worker)
1507 throw Exception("DDL background thread has already been initialized.", ErrorCodes::LOGICAL_ERROR);
1508 shared->ddl_worker = std::move(ddl_worker);
1509}
1510
1511DDLWorker & Context::getDDLWorker() const
1512{
1513 auto lock = getLock();
1514 if (!shared->ddl_worker)
1515 throw Exception("DDL background thread is not initialized.", ErrorCodes::LOGICAL_ERROR);
1516 return *shared->ddl_worker;
1517}
1518
1519zkutil::ZooKeeperPtr Context::getZooKeeper() const
1520{
1521 std::lock_guard lock(shared->zookeeper_mutex);
1522
1523 if (!shared->zookeeper)
1524 shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(getConfigRef(), "zookeeper");
1525 else if (shared->zookeeper->expired())
1526 shared->zookeeper = shared->zookeeper->startNewSession();
1527
1528 return shared->zookeeper;
1529}
1530
1531void Context::resetZooKeeper() const
1532{
1533 std::lock_guard lock(shared->zookeeper_mutex);
1534 shared->zookeeper.reset();
1535}
1536
1537bool Context::hasZooKeeper() const
1538{
1539 return getConfigRef().has("zookeeper");
1540}
1541
1542
1543void Context::setInterserverIOAddress(const String & host, UInt16 port)
1544{
1545 shared->interserver_io_host = host;
1546 shared->interserver_io_port = port;
1547}
1548
1549std::pair<String, UInt16> Context::getInterserverIOAddress() const
1550{
1551 if (shared->interserver_io_host.empty() || shared->interserver_io_port == 0)
1552 throw Exception("Parameter 'interserver_http(s)_port' required for replication is not specified in configuration file.",
1553 ErrorCodes::NO_ELEMENTS_IN_CONFIG);
1554
1555 return { shared->interserver_io_host, shared->interserver_io_port };
1556}
1557
1558void Context::setInterserverCredentials(const String & user, const String & password)
1559{
1560 shared->interserver_io_user = user;
1561 shared->interserver_io_password = password;
1562}
1563
1564std::pair<String, String> Context::getInterserverCredentials() const
1565{
1566 return { shared->interserver_io_user, shared->interserver_io_password };
1567}
1568
1569void Context::setInterserverScheme(const String & scheme)
1570{
1571 shared->interserver_scheme = scheme;
1572}
1573
1574String Context::getInterserverScheme() const
1575{
1576 return shared->interserver_scheme;
1577}
1578
1579void Context::setRemoteHostFilter(const Poco::Util::AbstractConfiguration & config)
1580{
1581 shared->remote_host_filter.setValuesFromConfig(config);
1582}
1583
1584const RemoteHostFilter & Context::getRemoteHostFilter() const
1585{
1586 return shared->remote_host_filter;
1587}
1588
1589UInt16 Context::getTCPPort() const
1590{
1591 auto lock = getLock();
1592
1593 auto & config = getConfigRef();
1594 return config.getInt("tcp_port", DBMS_DEFAULT_PORT);
1595}
1596
1597std::optional<UInt16> Context::getTCPPortSecure() const
1598{
1599 auto lock = getLock();
1600
1601 auto & config = getConfigRef();
1602 if (config.has("tcp_port_secure"))
1603 return config.getInt("tcp_port_secure");
1604 return {};
1605}
1606
1607std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const
1608{
1609 auto res = getClusters().getCluster(cluster_name);
1610
1611 if (!res)
1612 throw Exception("Requested cluster '" + cluster_name + "' not found", ErrorCodes::BAD_GET);
1613
1614 return res;
1615}
1616
1617
1618std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
1619{
1620 return getClusters().getCluster(cluster_name);
1621}
1622
1623
1624void Context::reloadClusterConfig()
1625{
1626 while (true)
1627 {
1628 ConfigurationPtr cluster_config;
1629 {
1630 std::lock_guard lock(shared->clusters_mutex);
1631 cluster_config = shared->clusters_config;
1632 }
1633
1634 auto & config = cluster_config ? *cluster_config : getConfigRef();
1635 auto new_clusters = std::make_unique<Clusters>(config, settings);
1636
1637 {
1638 std::lock_guard lock(shared->clusters_mutex);
1639 if (shared->clusters_config.get() == cluster_config.get())
1640 {
1641 shared->clusters = std::move(new_clusters);
1642 return;
1643 }
1644
1645 /// Clusters config has been suddenly changed, recompute clusters
1646 }
1647 }
1648}
1649
1650
1651Clusters & Context::getClusters() const
1652{
1653 std::lock_guard lock(shared->clusters_mutex);
1654 if (!shared->clusters)
1655 {
1656 auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
1657 shared->clusters = std::make_unique<Clusters>(config, settings);
1658 }
1659
1660 return *shared->clusters;
1661}
1662
1663
1664/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
1665void Context::setClustersConfig(const ConfigurationPtr & config, const String & config_name)
1666{
1667 std::lock_guard lock(shared->clusters_mutex);
1668
1669 shared->clusters_config = config;
1670
1671 if (!shared->clusters)
1672 shared->clusters = std::make_unique<Clusters>(*shared->clusters_config, settings, config_name);
1673 else
1674 shared->clusters->updateClusters(*shared->clusters_config, settings, config_name);
1675}
1676
1677
1678void Context::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster)
1679{
1680 std::lock_guard lock(shared->clusters_mutex);
1681
1682 if (!shared->clusters)
1683 throw Exception("Clusters are not set", ErrorCodes::LOGICAL_ERROR);
1684
1685 shared->clusters->setCluster(cluster_name, cluster);
1686}
1687
1688
1689void Context::initializeSystemLogs()
1690{
1691 auto lock = getLock();
1692 shared->system_logs.emplace(*global_context, getConfigRef());
1693}
1694
1695bool Context::hasTraceCollector()
1696{
1697 return shared->hasTraceCollector();
1698}
1699
1700void Context::initializeTraceCollector()
1701{
1702 shared->initializeTraceCollector(getTraceLog());
1703}
1704
1705
1706std::shared_ptr<QueryLog> Context::getQueryLog()
1707{
1708 auto lock = getLock();
1709
1710 if (!shared->system_logs || !shared->system_logs->query_log)
1711 return {};
1712
1713 return shared->system_logs->query_log;
1714}
1715
1716
1717std::shared_ptr<QueryThreadLog> Context::getQueryThreadLog()
1718{
1719 auto lock = getLock();
1720
1721 if (!shared->system_logs || !shared->system_logs->query_thread_log)
1722 return {};
1723
1724 return shared->system_logs->query_thread_log;
1725}
1726
1727
1728std::shared_ptr<PartLog> Context::getPartLog(const String & part_database)
1729{
1730 auto lock = getLock();
1731
1732 /// No part log or system logs are shutting down.
1733 if (!shared->system_logs || !shared->system_logs->part_log)
1734 return {};
1735
1736 /// Will not log operations on system tables (including part_log itself).
1737 /// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing,
1738 /// and also make troubles on startup.
1739 if (part_database == shared->system_logs->part_log_database)
1740 return {};
1741
1742 return shared->system_logs->part_log;
1743}
1744
1745
1746std::shared_ptr<TraceLog> Context::getTraceLog()
1747{
1748 auto lock = getLock();
1749
1750 if (!shared->system_logs || !shared->system_logs->trace_log)
1751 return {};
1752
1753 return shared->system_logs->trace_log;
1754}
1755
1756
1757std::shared_ptr<TextLog> Context::getTextLog()
1758{
1759 auto lock = getLock();
1760
1761 if (!shared->system_logs || !shared->system_logs->text_log)
1762 return {};
1763
1764 return shared->system_logs->text_log;
1765}
1766
1767
1768std::shared_ptr<MetricLog> Context::getMetricLog()
1769{
1770 auto lock = getLock();
1771
1772 if (!shared->system_logs || !shared->system_logs->metric_log)
1773 return {};
1774
1775 return shared->system_logs->metric_log;
1776}
1777
1778
1779CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
1780{
1781 auto lock = getLock();
1782
1783 if (!shared->compression_codec_selector)
1784 {
1785 constexpr auto config_name = "compression";
1786 auto & config = getConfigRef();
1787
1788 if (config.has(config_name))
1789 shared->compression_codec_selector = std::make_unique<CompressionCodecSelector>(config, "compression");
1790 else
1791 shared->compression_codec_selector = std::make_unique<CompressionCodecSelector>();
1792 }
1793
1794 return shared->compression_codec_selector->choose(part_size, part_size_ratio);
1795}
1796
1797
1798const DiskPtr & Context::getDisk(const String & name) const
1799{
1800 auto lock = getLock();
1801
1802 const auto & disk_selector = getDiskSelector();
1803
1804 return disk_selector[name];
1805}
1806
1807
1808DiskSelector & Context::getDiskSelector() const
1809{
1810 auto lock = getLock();
1811
1812 if (!shared->merge_tree_disk_selector)
1813 {
1814 constexpr auto config_name = "storage_configuration.disks";
1815 auto & config = getConfigRef();
1816
1817 shared->merge_tree_disk_selector = std::make_unique<DiskSelector>(config, config_name, *this);
1818 }
1819 return *shared->merge_tree_disk_selector;
1820}
1821
1822
1823const StoragePolicyPtr & Context::getStoragePolicy(const String & name) const
1824{
1825 auto lock = getLock();
1826
1827 auto & policy_selector = getStoragePolicySelector();
1828
1829 return policy_selector[name];
1830}
1831
1832
1833StoragePolicySelector & Context::getStoragePolicySelector() const
1834{
1835 auto lock = getLock();
1836
1837 if (!shared->merge_tree_storage_policy_selector)
1838 {
1839 constexpr auto config_name = "storage_configuration.policies";
1840 auto & config = getConfigRef();
1841
1842 shared->merge_tree_storage_policy_selector = std::make_unique<StoragePolicySelector>(config, config_name, getDiskSelector());
1843 }
1844 return *shared->merge_tree_storage_policy_selector;
1845}
1846
1847
1848const MergeTreeSettings & Context::getMergeTreeSettings() const
1849{
1850 auto lock = getLock();
1851
1852 if (!shared->merge_tree_settings)
1853 {
1854 auto & config = getConfigRef();
1855 MergeTreeSettings mt_settings;
1856 mt_settings.loadFromConfig("merge_tree", config);
1857 shared->merge_tree_settings.emplace(mt_settings);
1858 }
1859
1860 return *shared->merge_tree_settings;
1861}
1862
1863
1864void Context::checkCanBeDropped(const String & database, const String & table, const size_t & size, const size_t & max_size_to_drop) const
1865{
1866 if (!max_size_to_drop || size <= max_size_to_drop)
1867 return;
1868
1869 Poco::File force_file(getFlagsPath() + "force_drop_table");
1870 bool force_file_exists = force_file.exists();
1871
1872 if (force_file_exists)
1873 {
1874 try
1875 {
1876 force_file.remove();
1877 return;
1878 }
1879 catch (...)
1880 {
1881 /// User should recreate force file on each drop, it shouldn't be protected
1882 tryLogCurrentException("Drop table check", "Can't remove force file to enable table or partition drop");
1883 }
1884 }
1885
1886 String size_str = formatReadableSizeWithDecimalSuffix(size);
1887 String max_size_to_drop_str = formatReadableSizeWithDecimalSuffix(max_size_to_drop);
1888 std::stringstream ostr;
1889
1890 ostr << "Table or Partition in " << backQuoteIfNeed(database) << "." << backQuoteIfNeed(table) << " was not dropped.\n"
1891 << "Reason:\n"
1892 << "1. Size (" << size_str << ") is greater than max_[table/partition]_size_to_drop (" << max_size_to_drop_str << ")\n"
1893 << "2. File '" << force_file.path() << "' intended to force DROP "
1894 << (force_file_exists ? "exists but not writeable (could not be removed)" : "doesn't exist") << "\n";
1895
1896 ostr << "How to fix this:\n"
1897 << "1. Either increase (or set to zero) max_[table/partition]_size_to_drop in server config and restart ClickHouse\n"
1898 << "2. Either create forcing file " << force_file.path() << " and make sure that ClickHouse has write permission for it.\n"
1899 << "Example:\nsudo touch '" << force_file.path() << "' && sudo chmod 666 '" << force_file.path() << "'";
1900
1901 throw Exception(ostr.str(), ErrorCodes::TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT);
1902}
1903
1904
1905void Context::setMaxTableSizeToDrop(size_t max_size)
1906{
1907 // Is initialized at server startup and updated at config reload
1908 shared->max_table_size_to_drop.store(max_size, std::memory_order_relaxed);
1909}
1910
1911
1912void Context::checkTableCanBeDropped(const String & database, const String & table, const size_t & table_size) const
1913{
1914 size_t max_table_size_to_drop = shared->max_table_size_to_drop.load(std::memory_order_relaxed);
1915
1916 checkCanBeDropped(database, table, table_size, max_table_size_to_drop);
1917}
1918
1919
1920void Context::setMaxPartitionSizeToDrop(size_t max_size)
1921{
1922 // Is initialized at server startup and updated at config reload
1923 shared->max_partition_size_to_drop.store(max_size, std::memory_order_relaxed);
1924}
1925
1926
1927void Context::checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size) const
1928{
1929 size_t max_partition_size_to_drop = shared->max_partition_size_to_drop.load(std::memory_order_relaxed);
1930
1931 checkCanBeDropped(database, table, partition_size, max_partition_size_to_drop);
1932}
1933
1934
1935BlockInputStreamPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const
1936{
1937 return FormatFactory::instance().getInput(name, buf, sample, *this, max_block_size);
1938}
1939
1940BlockOutputStreamPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const
1941{
1942 return FormatFactory::instance().getOutput(name, buf, sample, *this);
1943}
1944
1945OutputFormatPtr Context::getOutputFormatProcessor(const String & name, WriteBuffer & buf, const Block & sample) const
1946{
1947 return FormatFactory::instance().getOutputFormat(name, buf, sample, *this);
1948}
1949
1950
1951time_t Context::getUptimeSeconds() const
1952{
1953 auto lock = getLock();
1954 return shared->uptime_watch.elapsedSeconds();
1955}
1956
1957
1958void Context::setConfigReloadCallback(ConfigReloadCallback && callback)
1959{
1960 /// Is initialized at server startup, so lock isn't required. Otherwise use mutex.
1961 shared->config_reload_callback = std::move(callback);
1962}
1963
1964void Context::reloadConfig() const
1965{
1966 /// Use mutex if callback may be changed after startup.
1967 if (!shared->config_reload_callback)
1968 throw Exception("Can't reload config beacuse config_reload_callback is not set.", ErrorCodes::LOGICAL_ERROR);
1969
1970 shared->config_reload_callback();
1971}
1972
1973
1974void Context::shutdown()
1975{
1976 shared->shutdown();
1977}
1978
1979
1980Context::ApplicationType Context::getApplicationType() const
1981{
1982 return shared->application_type;
1983}
1984
1985void Context::setApplicationType(ApplicationType type)
1986{
1987 /// Lock isn't required, you should set it at start
1988 shared->application_type = type;
1989}
1990
1991void Context::setDefaultProfiles(const Poco::Util::AbstractConfiguration & config)
1992{
1993 shared->default_profile_name = config.getString("default_profile", "default");
1994 shared->system_profile_name = config.getString("system_profile", shared->default_profile_name);
1995 setSetting("profile", shared->system_profile_name);
1996}
1997
1998String Context::getDefaultProfileName() const
1999{
2000 return shared->default_profile_name;
2001}
2002
2003String Context::getSystemProfileName() const
2004{
2005 return shared->system_profile_name;
2006}
2007
2008String Context::getFormatSchemaPath() const
2009{
2010 return shared->format_schema_path;
2011}
2012
2013void Context::setFormatSchemaPath(const String & path)
2014{
2015 shared->format_schema_path = path;
2016}
2017
2018Context::SampleBlockCache & Context::getSampleBlockCache() const
2019{
2020 return getQueryContext().sample_block_cache;
2021}
2022
2023
2024bool Context::hasQueryParameters() const
2025{
2026 return !query_parameters.empty();
2027}
2028
2029
2030const NameToNameMap & Context::getQueryParameters() const
2031{
2032 return query_parameters;
2033}
2034
2035
2036void Context::setQueryParameter(const String & name, const String & value)
2037{
2038 if (!query_parameters.emplace(name, value).second)
2039 throw Exception("Duplicate name " + backQuote(name) + " of query parameter", ErrorCodes::BAD_ARGUMENTS);
2040}
2041
2042
2043#if USE_EMBEDDED_COMPILER
2044
2045std::shared_ptr<CompiledExpressionCache> Context::getCompiledExpressionCache() const
2046{
2047 auto lock = getLock();
2048 return shared->compiled_expression_cache;
2049}
2050
2051void Context::setCompiledExpressionCache(size_t cache_size)
2052{
2053
2054 auto lock = getLock();
2055
2056 if (shared->compiled_expression_cache)
2057 throw Exception("Compiled expressions cache has been already created.", ErrorCodes::LOGICAL_ERROR);
2058
2059 shared->compiled_expression_cache = std::make_shared<CompiledExpressionCache>(cache_size);
2060}
2061
2062void Context::dropCompiledExpressionCache() const
2063{
2064 auto lock = getLock();
2065 if (shared->compiled_expression_cache)
2066 shared->compiled_expression_cache->reset();
2067}
2068
2069#endif
2070
2071
2072void Context::addXDBCBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
2073{
2074 auto lock = getLock();
2075 shared->bridge_commands.emplace_back(std::move(cmd));
2076}
2077
2078
2079IHostContextPtr & Context::getHostContext()
2080{
2081 return host_context;
2082}
2083
2084
2085const IHostContextPtr & Context::getHostContext() const
2086{
2087 return host_context;
2088}
2089
2090
2091std::shared_ptr<ActionLocksManager> Context::getActionLocksManager()
2092{
2093 auto lock = getLock();
2094
2095 if (!shared->action_locks_manager)
2096 shared->action_locks_manager = std::make_shared<ActionLocksManager>(getGlobalContext());
2097
2098 return shared->action_locks_manager;
2099}
2100
2101
2102void Context::setExternalTablesInitializer(ExternalTablesInitializer && initializer)
2103{
2104 if (external_tables_initializer_callback)
2105 throw Exception("External tables initializer is already set", ErrorCodes::LOGICAL_ERROR);
2106
2107 external_tables_initializer_callback = std::move(initializer);
2108}
2109
2110void Context::initializeExternalTablesIfSet()
2111{
2112 if (external_tables_initializer_callback)
2113 {
2114 external_tables_initializer_callback(*this);
2115 /// Reset callback
2116 external_tables_initializer_callback = {};
2117 }
2118}
2119
2120
2121void Context::setInputInitializer(InputInitializer && initializer)
2122{
2123 if (input_initializer_callback)
2124 throw Exception("Input initializer is already set", ErrorCodes::LOGICAL_ERROR);
2125
2126 input_initializer_callback = std::move(initializer);
2127}
2128
2129
2130void Context::initializeInput(const StoragePtr & input_storage)
2131{
2132 if (!input_initializer_callback)
2133 throw Exception("Input initializer is not set", ErrorCodes::LOGICAL_ERROR);
2134
2135 input_initializer_callback(*this, input_storage);
2136 /// Reset callback
2137 input_initializer_callback = {};
2138}
2139
2140
2141void Context::setInputBlocksReaderCallback(InputBlocksReader && reader)
2142{
2143 if (input_blocks_reader)
2144 throw Exception("Input blocks reader is already set", ErrorCodes::LOGICAL_ERROR);
2145
2146 input_blocks_reader = std::move(reader);
2147}
2148
2149
2150InputBlocksReader Context::getInputBlocksReaderCallback() const
2151{
2152 return input_blocks_reader;
2153}
2154
2155
2156void Context::resetInputCallbacks()
2157{
2158 if (input_initializer_callback)
2159 input_initializer_callback = {};
2160
2161 if (input_blocks_reader)
2162 input_blocks_reader = {};
2163}
2164
2165
2166SessionCleaner::~SessionCleaner()
2167{
2168 try
2169 {
2170 {
2171 std::lock_guard lock{mutex};
2172 quit = true;
2173 }
2174
2175 cond.notify_one();
2176
2177 thread.join();
2178 }
2179 catch (...)
2180 {
2181 DB::tryLogCurrentException(__PRETTY_FUNCTION__);
2182 }
2183}
2184
2185void SessionCleaner::run()
2186{
2187 setThreadName("SessionCleaner");
2188
2189 std::unique_lock lock{mutex};
2190
2191 while (true)
2192 {
2193 auto interval = context.closeSessions();
2194
2195 if (cond.wait_for(lock, interval, [this]() -> bool { return quit; }))
2196 break;
2197 }
2198}
2199
2200
2201}
2202