1 | #include <map> |
2 | #include <set> |
3 | #include <optional> |
4 | #include <memory> |
5 | #include <Poco/Mutex.h> |
6 | #include <Poco/UUID.h> |
7 | #include <Poco/Net/IPAddress.h> |
8 | #include <Poco/Util/Application.h> |
9 | #include <Common/Macros.h> |
10 | #include <Common/escapeForFileName.h> |
11 | #include <Common/setThreadName.h> |
12 | #include <Common/Stopwatch.h> |
13 | #include <Common/formatReadable.h> |
14 | #include <Common/thread_local_rng.h> |
15 | #include <Compression/ICompressionCodec.h> |
16 | #include <Core/BackgroundSchedulePool.h> |
17 | #include <Formats/FormatFactory.h> |
18 | #include <Databases/IDatabase.h> |
19 | #include <Storages/IStorage.h> |
20 | #include <Storages/MarkCache.h> |
21 | #include <Storages/MergeTree/BackgroundProcessingPool.h> |
22 | #include <Storages/MergeTree/MergeList.h> |
23 | #include <Storages/MergeTree/MergeTreeSettings.h> |
24 | #include <Storages/CompressionCodecSelector.h> |
25 | #include <TableFunctions/TableFunctionFactory.h> |
26 | #include <Interpreters/ActionLocksManager.h> |
27 | #include <Core/Settings.h> |
28 | #include <Access/AccessControlManager.h> |
29 | #include <Access/SettingsConstraints.h> |
30 | #include <Access/QuotaContext.h> |
31 | #include <Access/RowPolicyContext.h> |
32 | #include <Interpreters/ExpressionJIT.h> |
33 | #include <Interpreters/UsersManager.h> |
34 | #include <Dictionaries/Embedded/GeoDictionariesLoader.h> |
35 | #include <Interpreters/EmbeddedDictionaries.h> |
36 | #include <Interpreters/ExternalDictionariesLoader.h> |
37 | #include <Interpreters/ExternalModelsLoader.h> |
38 | #include <Interpreters/ExpressionActions.h> |
39 | #include <Interpreters/ProcessList.h> |
40 | #include <Interpreters/Cluster.h> |
41 | #include <Interpreters/InterserverIOHandler.h> |
42 | #include <Interpreters/SystemLog.h> |
43 | #include <Interpreters/Context.h> |
44 | #include <Interpreters/DDLWorker.h> |
45 | #include <Common/DNSResolver.h> |
46 | #include <IO/ReadBufferFromFile.h> |
47 | #include <IO/UncompressedCache.h> |
48 | #include <Parsers/ASTCreateQuery.h> |
49 | #include <Parsers/ParserCreateQuery.h> |
50 | #include <Parsers/parseQuery.h> |
51 | #include <Common/StackTrace.h> |
52 | #include <Common/Config/ConfigProcessor.h> |
53 | #include <Common/ZooKeeper/ZooKeeper.h> |
54 | #include <Common/ShellCommand.h> |
55 | #include <Common/TraceCollector.h> |
56 | #include <common/logger_useful.h> |
57 | #include <Common/RemoteHostFilter.h> |
58 | |
59 | namespace ProfileEvents |
60 | { |
61 | extern const Event ContextLock; |
62 | extern const Event CompiledCacheSizeBytes; |
63 | } |
64 | |
65 | namespace CurrentMetrics |
66 | { |
67 | extern const Metric ContextLockWait; |
68 | extern const Metric MemoryTrackingForMerges; |
69 | extern const Metric BackgroundMovePoolTask; |
70 | extern const Metric MemoryTrackingInBackgroundMoveProcessingPool; |
71 | } |
72 | |
73 | |
74 | namespace DB |
75 | { |
76 | |
77 | namespace ErrorCodes |
78 | { |
79 | extern const int DATABASE_ACCESS_DENIED; |
80 | extern const int UNKNOWN_DATABASE; |
81 | extern const int UNKNOWN_TABLE; |
82 | extern const int TABLE_ALREADY_EXISTS; |
83 | extern const int TABLE_WAS_NOT_DROPPED; |
84 | extern const int DATABASE_ALREADY_EXISTS; |
85 | extern const int THERE_IS_NO_SESSION; |
86 | extern const int THERE_IS_NO_QUERY; |
87 | extern const int NO_ELEMENTS_IN_CONFIG; |
88 | extern const int DDL_GUARD_IS_ACTIVE; |
89 | extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT; |
90 | extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT; |
91 | extern const int SESSION_NOT_FOUND; |
92 | extern const int SESSION_IS_LOCKED; |
93 | extern const int CANNOT_GET_CREATE_TABLE_QUERY; |
94 | extern const int LOGICAL_ERROR; |
95 | extern const int SCALAR_ALREADY_EXISTS; |
96 | extern const int UNKNOWN_SCALAR; |
97 | extern const int NOT_ENOUGH_PRIVILEGES; |
98 | } |
99 | |
100 | |
101 | /** Set of known objects (environment), that could be used in query. |
102 | * Shared (global) part. Order of members (especially, order of destruction) is very important. |
103 | */ |
104 | struct ContextShared |
105 | { |
106 | Logger * log = &Logger::get("Context" ); |
107 | |
108 | /// For access of most of shared objects. Recursive mutex. |
109 | mutable std::recursive_mutex mutex; |
110 | /// Separate mutex for access of dictionaries. Separate mutex to avoid locks when server doing request to itself. |
111 | mutable std::mutex embedded_dictionaries_mutex; |
112 | mutable std::mutex external_dictionaries_mutex; |
113 | mutable std::mutex external_models_mutex; |
114 | /// Separate mutex for re-initialization of zookeer session. This operation could take a long time and must not interfere with another operations. |
115 | mutable std::mutex zookeeper_mutex; |
116 | |
117 | mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper. |
118 | |
119 | String interserver_io_host; /// The host name by which this server is available for other servers. |
120 | UInt16 interserver_io_port = 0; /// and port. |
121 | String interserver_io_user; |
122 | String interserver_io_password; |
123 | String interserver_scheme; /// http or https |
124 | |
125 | String path; /// Path to the data directory, with a slash at the end. |
126 | String tmp_path; /// The path to the temporary files that occur when processing the request. |
127 | String flags_path; /// Path to the directory with some control flags for server maintenance. |
128 | String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function. |
129 | String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries. |
130 | ConfigurationPtr config; /// Global configuration settings. |
131 | |
132 | Databases databases; /// List of databases and tables in them. |
133 | mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. |
134 | mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader; |
135 | mutable std::optional<ExternalModelsLoader> external_models_loader; |
136 | String default_profile_name; /// Default profile name used for default values. |
137 | String system_profile_name; /// Profile used by system processes |
138 | AccessControlManager access_control_manager; |
139 | std::unique_ptr<UsersManager> users_manager; /// Known users. |
140 | mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks. |
141 | mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. |
142 | ProcessList process_list; /// Executing queries at the moment. |
143 | MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) |
144 | ViewDependencies view_dependencies; /// Current dependencies |
145 | ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections. |
146 | InterserverIOHandler interserver_io_handler; /// Handler for interserver communication. |
147 | std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables. |
148 | std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables. |
149 | std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) |
150 | MultiVersion<Macros> macros; /// Substitutions extracted from config. |
151 | std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk. |
152 | /// Rules for selecting the compression settings, depending on the size of the part. |
153 | mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector; |
154 | /// Storage disk chooser |
155 | mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector; |
156 | /// Storage policy chooser |
157 | mutable std::unique_ptr<StoragePolicySelector> merge_tree_storage_policy_selector; |
158 | |
159 | std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines. |
160 | std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default) |
161 | std::atomic_size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default) |
162 | String format_schema_path; /// Path to a directory that contains schema files used by input formats. |
163 | ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers |
164 | std::optional<SystemLogs> system_logs; /// Used to log queries and operations on parts |
165 | |
166 | RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml |
167 | |
168 | std::unique_ptr<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries |
169 | /// Named sessions. The user could specify session identifier to reuse settings and temporary tables in subsequent requests. |
170 | |
171 | class SessionKeyHash |
172 | { |
173 | public: |
174 | size_t operator()(const Context::SessionKey & key) const |
175 | { |
176 | SipHash hash; |
177 | hash.update(key.first); |
178 | hash.update(key.second); |
179 | return hash.get64(); |
180 | } |
181 | }; |
182 | |
183 | using Sessions = std::unordered_map<Context::SessionKey, std::shared_ptr<Context>, SessionKeyHash>; |
184 | using CloseTimes = std::deque<std::vector<Context::SessionKey>>; |
185 | mutable Sessions sessions; |
186 | mutable CloseTimes close_times; |
187 | std::chrono::steady_clock::duration close_interval = std::chrono::seconds(1); |
188 | std::chrono::steady_clock::time_point close_cycle_time = std::chrono::steady_clock::now(); |
189 | UInt64 close_cycle = 0; |
190 | |
191 | /// Clusters for distributed tables |
192 | /// Initialized on demand (on distributed storages initialization) since Settings should be initialized |
193 | std::unique_ptr<Clusters> clusters; |
194 | ConfigurationPtr clusters_config; /// Soteres updated configs |
195 | mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config |
196 | |
197 | #if USE_EMBEDDED_COMPILER |
198 | std::shared_ptr<CompiledExpressionCache> compiled_expression_cache; |
199 | #endif |
200 | |
201 | bool shutdown_called = false; |
202 | |
203 | /// Do not allow simultaneous execution of DDL requests on the same table. |
204 | /// database -> object -> (mutex, counter), counter: how many threads are running a query on the table at the same time |
205 | /// For the duration of the operation, an element is placed here, and an object is returned, |
206 | /// which deletes the element in the destructor when counter becomes zero. |
207 | /// In case the element already exists, waits, when query will be executed in other thread. See class DDLGuard below. |
208 | using DDLGuards = std::unordered_map<String, DDLGuard::Map>; |
209 | DDLGuards ddl_guards; |
210 | /// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order. |
211 | mutable std::mutex ddl_guards_mutex; |
212 | |
213 | Stopwatch uptime_watch; |
214 | |
215 | Context::ApplicationType application_type = Context::ApplicationType::SERVER; |
216 | |
217 | /// vector of xdbc-bridge commands, they will be killed when Context will be destroyed |
218 | std::vector<std::unique_ptr<ShellCommand>> bridge_commands; |
219 | |
220 | Context::ConfigReloadCallback config_reload_callback; |
221 | |
222 | ContextShared() |
223 | : macros(std::make_unique<Macros>()) |
224 | { |
225 | /// TODO: make it singleton (?) |
226 | static std::atomic<size_t> num_calls{0}; |
227 | if (++num_calls > 1) |
228 | { |
229 | std::cerr << "Attempting to create multiple ContextShared instances. Stack trace:\n" << StackTrace().toString(); |
230 | std::cerr.flush(); |
231 | std::terminate(); |
232 | } |
233 | |
234 | initialize(); |
235 | } |
236 | |
237 | |
238 | ~ContextShared() |
239 | { |
240 | try |
241 | { |
242 | shutdown(); |
243 | } |
244 | catch (...) |
245 | { |
246 | tryLogCurrentException(__PRETTY_FUNCTION__); |
247 | } |
248 | } |
249 | |
250 | |
251 | /** Perform a complex job of destroying objects in advance. |
252 | */ |
253 | void shutdown() |
254 | { |
255 | if (shutdown_called) |
256 | return; |
257 | shutdown_called = true; |
258 | |
259 | /** After system_logs have been shut down it is guaranteed that no system table gets created or written to. |
260 | * Note that part changes at shutdown won't be logged to part log. |
261 | */ |
262 | |
263 | if (system_logs) |
264 | system_logs->shutdown(); |
265 | |
266 | /** At this point, some tables may have threads that block our mutex. |
267 | * To shutdown them correctly, we will copy the current list of tables, |
268 | * and ask them all to finish their work. |
269 | * Then delete all objects with tables. |
270 | */ |
271 | |
272 | Databases current_databases; |
273 | |
274 | { |
275 | std::lock_guard lock(mutex); |
276 | current_databases = databases; |
277 | } |
278 | |
279 | /// We still hold "databases" in Context (instead of std::move) for Buffer tables to flush data correctly. |
280 | |
281 | for (auto & database : current_databases) |
282 | database.second->shutdown(); |
283 | |
284 | { |
285 | std::lock_guard lock(mutex); |
286 | databases.clear(); |
287 | } |
288 | |
289 | /// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference). |
290 | /// TODO: Get rid of this. |
291 | |
292 | system_logs.reset(); |
293 | embedded_dictionaries.reset(); |
294 | external_dictionaries_loader.reset(); |
295 | external_models_loader.reset(); |
296 | background_pool.reset(); |
297 | background_move_pool.reset(); |
298 | schedule_pool.reset(); |
299 | ddl_worker.reset(); |
300 | |
301 | /// Stop trace collector if any |
302 | trace_collector.reset(); |
303 | } |
304 | |
305 | bool hasTraceCollector() |
306 | { |
307 | return trace_collector != nullptr; |
308 | } |
309 | |
310 | void initializeTraceCollector(std::shared_ptr<TraceLog> trace_log) |
311 | { |
312 | if (trace_log == nullptr) |
313 | return; |
314 | |
315 | trace_collector = std::make_unique<TraceCollector>(trace_log); |
316 | } |
317 | |
318 | private: |
319 | void initialize() |
320 | { |
321 | users_manager = std::make_unique<UsersManager>(); |
322 | } |
323 | }; |
324 | |
325 | |
326 | Context::Context() = default; |
327 | Context::Context(const Context &) = default; |
328 | Context & Context::operator=(const Context &) = default; |
329 | |
330 | |
331 | Context Context::createGlobal() |
332 | { |
333 | Context res; |
334 | res.quota = std::make_shared<QuotaContext>(); |
335 | res.row_policy = std::make_shared<RowPolicyContext>(); |
336 | res.shared = std::make_shared<ContextShared>(); |
337 | return res; |
338 | } |
339 | |
340 | Context::~Context() = default; |
341 | |
342 | |
343 | InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; } |
344 | |
345 | std::unique_lock<std::recursive_mutex> Context::getLock() const |
346 | { |
347 | ProfileEvents::increment(ProfileEvents::ContextLock); |
348 | CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait}; |
349 | return std::unique_lock(shared->mutex); |
350 | } |
351 | |
352 | ProcessList & Context::getProcessList() { return shared->process_list; } |
353 | const ProcessList & Context::getProcessList() const { return shared->process_list; } |
354 | MergeList & Context::getMergeList() { return shared->merge_list; } |
355 | const MergeList & Context::getMergeList() const { return shared->merge_list; } |
356 | |
357 | |
358 | const Databases Context::getDatabases() const |
359 | { |
360 | auto lock = getLock(); |
361 | return shared->databases; |
362 | } |
363 | |
364 | Databases Context::getDatabases() |
365 | { |
366 | auto lock = getLock(); |
367 | return shared->databases; |
368 | } |
369 | |
370 | |
371 | Context::SessionKey Context::getSessionKey(const String & session_id) const |
372 | { |
373 | auto & user_name = client_info.current_user; |
374 | |
375 | if (user_name.empty()) |
376 | throw Exception("Empty user name." , ErrorCodes::LOGICAL_ERROR); |
377 | |
378 | return SessionKey(user_name, session_id); |
379 | } |
380 | |
381 | |
382 | void Context::scheduleCloseSession(const Context::SessionKey & key, std::chrono::steady_clock::duration timeout) |
383 | { |
384 | const UInt64 close_index = timeout / shared->close_interval + 1; |
385 | const auto new_close_cycle = shared->close_cycle + close_index; |
386 | |
387 | if (session_close_cycle != new_close_cycle) |
388 | { |
389 | session_close_cycle = new_close_cycle; |
390 | if (shared->close_times.size() < close_index + 1) |
391 | shared->close_times.resize(close_index + 1); |
392 | shared->close_times[close_index].emplace_back(key); |
393 | } |
394 | } |
395 | |
396 | |
397 | std::shared_ptr<Context> Context::acquireSession(const String & session_id, std::chrono::steady_clock::duration timeout, bool session_check) const |
398 | { |
399 | auto lock = getLock(); |
400 | |
401 | const auto & key = getSessionKey(session_id); |
402 | auto it = shared->sessions.find(key); |
403 | |
404 | if (it == shared->sessions.end()) |
405 | { |
406 | if (session_check) |
407 | throw Exception("Session not found." , ErrorCodes::SESSION_NOT_FOUND); |
408 | |
409 | auto new_session = std::make_shared<Context>(*this); |
410 | |
411 | new_session->scheduleCloseSession(key, timeout); |
412 | |
413 | it = shared->sessions.insert(std::make_pair(key, std::move(new_session))).first; |
414 | } |
415 | else if (it->second->client_info.current_user != client_info.current_user) |
416 | { |
417 | throw Exception("Session belongs to a different user" , ErrorCodes::LOGICAL_ERROR); |
418 | } |
419 | |
420 | const auto & session = it->second; |
421 | |
422 | if (session->session_is_used) |
423 | throw Exception("Session is locked by a concurrent client." , ErrorCodes::SESSION_IS_LOCKED); |
424 | session->session_is_used = true; |
425 | |
426 | session->client_info = client_info; |
427 | |
428 | return session; |
429 | } |
430 | |
431 | |
432 | void Context::releaseSession(const String & session_id, std::chrono::steady_clock::duration timeout) |
433 | { |
434 | auto lock = getLock(); |
435 | |
436 | session_is_used = false; |
437 | scheduleCloseSession(getSessionKey(session_id), timeout); |
438 | } |
439 | |
440 | |
441 | std::chrono::steady_clock::duration Context::closeSessions() const |
442 | { |
443 | auto lock = getLock(); |
444 | |
445 | const auto now = std::chrono::steady_clock::now(); |
446 | |
447 | if (now < shared->close_cycle_time) |
448 | return shared->close_cycle_time - now; |
449 | |
450 | const auto current_cycle = shared->close_cycle; |
451 | |
452 | ++shared->close_cycle; |
453 | shared->close_cycle_time = now + shared->close_interval; |
454 | |
455 | if (shared->close_times.empty()) |
456 | return shared->close_interval; |
457 | |
458 | auto & sessions_to_close = shared->close_times.front(); |
459 | |
460 | for (const auto & key : sessions_to_close) |
461 | { |
462 | const auto session = shared->sessions.find(key); |
463 | |
464 | if (session != shared->sessions.end() && session->second->session_close_cycle <= current_cycle) |
465 | { |
466 | if (session->second->session_is_used) |
467 | session->second->scheduleCloseSession(key, std::chrono::seconds(0)); |
468 | else |
469 | shared->sessions.erase(session); |
470 | } |
471 | } |
472 | |
473 | shared->close_times.pop_front(); |
474 | |
475 | return shared->close_interval; |
476 | } |
477 | |
478 | |
479 | static String resolveDatabase(const String & database_name, const String & current_database) |
480 | { |
481 | String res = database_name.empty() ? current_database : database_name; |
482 | if (res.empty()) |
483 | throw Exception("Default database is not selected" , ErrorCodes::UNKNOWN_DATABASE); |
484 | return res; |
485 | } |
486 | |
487 | |
488 | const DatabasePtr Context::getDatabase(const String & database_name) const |
489 | { |
490 | auto lock = getLock(); |
491 | String db = resolveDatabase(database_name, current_database); |
492 | assertDatabaseExists(db); |
493 | return shared->databases[db]; |
494 | } |
495 | |
496 | DatabasePtr Context::getDatabase(const String & database_name) |
497 | { |
498 | auto lock = getLock(); |
499 | String db = resolveDatabase(database_name, current_database); |
500 | assertDatabaseExists(db); |
501 | return shared->databases[db]; |
502 | } |
503 | |
504 | const DatabasePtr Context::tryGetDatabase(const String & database_name) const |
505 | { |
506 | auto lock = getLock(); |
507 | String db = resolveDatabase(database_name, current_database); |
508 | auto it = shared->databases.find(db); |
509 | if (it == shared->databases.end()) |
510 | return {}; |
511 | return it->second; |
512 | } |
513 | |
514 | DatabasePtr Context::tryGetDatabase(const String & database_name) |
515 | { |
516 | auto lock = getLock(); |
517 | String db = resolveDatabase(database_name, current_database); |
518 | auto it = shared->databases.find(db); |
519 | if (it == shared->databases.end()) |
520 | return {}; |
521 | return it->second; |
522 | } |
523 | |
524 | String Context::getPath() const |
525 | { |
526 | auto lock = getLock(); |
527 | return shared->path; |
528 | } |
529 | |
530 | String Context::getTemporaryPath() const |
531 | { |
532 | auto lock = getLock(); |
533 | return shared->tmp_path; |
534 | } |
535 | |
536 | String Context::getFlagsPath() const |
537 | { |
538 | auto lock = getLock(); |
539 | return shared->flags_path; |
540 | } |
541 | |
542 | String Context::getUserFilesPath() const |
543 | { |
544 | auto lock = getLock(); |
545 | return shared->user_files_path; |
546 | } |
547 | |
548 | String Context::getDictionariesLibPath() const |
549 | { |
550 | auto lock = getLock(); |
551 | return shared->dictionaries_lib_path; |
552 | } |
553 | |
554 | void Context::setPath(const String & path) |
555 | { |
556 | auto lock = getLock(); |
557 | |
558 | shared->path = path; |
559 | |
560 | if (shared->tmp_path.empty()) |
561 | shared->tmp_path = shared->path + "tmp/" ; |
562 | |
563 | if (shared->flags_path.empty()) |
564 | shared->flags_path = shared->path + "flags/" ; |
565 | |
566 | if (shared->user_files_path.empty()) |
567 | shared->user_files_path = shared->path + "user_files/" ; |
568 | |
569 | if (shared->dictionaries_lib_path.empty()) |
570 | shared->dictionaries_lib_path = shared->path + "dictionaries_lib/" ; |
571 | } |
572 | |
573 | void Context::setTemporaryPath(const String & path) |
574 | { |
575 | auto lock = getLock(); |
576 | shared->tmp_path = path; |
577 | } |
578 | |
579 | void Context::setFlagsPath(const String & path) |
580 | { |
581 | auto lock = getLock(); |
582 | shared->flags_path = path; |
583 | } |
584 | |
585 | void Context::setUserFilesPath(const String & path) |
586 | { |
587 | auto lock = getLock(); |
588 | shared->user_files_path = path; |
589 | } |
590 | |
591 | void Context::setDictionariesLibPath(const String & path) |
592 | { |
593 | auto lock = getLock(); |
594 | shared->dictionaries_lib_path = path; |
595 | } |
596 | |
597 | void Context::setConfig(const ConfigurationPtr & config) |
598 | { |
599 | auto lock = getLock(); |
600 | shared->config = config; |
601 | } |
602 | |
603 | const Poco::Util::AbstractConfiguration & Context::getConfigRef() const |
604 | { |
605 | auto lock = getLock(); |
606 | return shared->config ? *shared->config : Poco::Util::Application::instance().config(); |
607 | } |
608 | |
609 | AccessControlManager & Context::getAccessControlManager() |
610 | { |
611 | auto lock = getLock(); |
612 | return shared->access_control_manager; |
613 | } |
614 | |
615 | const AccessControlManager & Context::getAccessControlManager() const |
616 | { |
617 | auto lock = getLock(); |
618 | return shared->access_control_manager; |
619 | } |
620 | |
621 | void Context::checkQuotaManagementIsAllowed() |
622 | { |
623 | if (!is_quota_management_allowed) |
624 | throw Exception( |
625 | "User " + client_info.current_user + " doesn't have enough privileges to manage quotas" , ErrorCodes::NOT_ENOUGH_PRIVILEGES); |
626 | } |
627 | |
628 | void Context::checkRowPolicyManagementIsAllowed() |
629 | { |
630 | if (!is_row_policy_management_allowed) |
631 | throw Exception( |
632 | "User " + client_info.current_user + " doesn't have enough privileges to manage row policies" , ErrorCodes::NOT_ENOUGH_PRIVILEGES); |
633 | } |
634 | |
635 | void Context::setUsersConfig(const ConfigurationPtr & config) |
636 | { |
637 | auto lock = getLock(); |
638 | shared->users_config = config; |
639 | shared->access_control_manager.loadFromConfig(*shared->users_config); |
640 | shared->users_manager->loadFromConfig(*shared->users_config); |
641 | } |
642 | |
643 | ConfigurationPtr Context::getUsersConfig() |
644 | { |
645 | auto lock = getLock(); |
646 | return shared->users_config; |
647 | } |
648 | |
649 | void Context::calculateUserSettings() |
650 | { |
651 | auto lock = getLock(); |
652 | |
653 | auto user = getUser(client_info.current_user); |
654 | String profile = user->profile; |
655 | |
656 | /// 1) Set default settings (hardcoded values) |
657 | /// NOTE: we ignore global_context settings (from which it is usually copied) |
658 | /// NOTE: global_context settings are immutable and not auto updated |
659 | settings = Settings(); |
660 | settings_constraints = nullptr; |
661 | |
662 | /// 2) Apply settings from default profile |
663 | auto default_profile_name = getDefaultProfileName(); |
664 | if (profile != default_profile_name) |
665 | setProfile(default_profile_name); |
666 | |
667 | /// 3) Apply settings from current user |
668 | setProfile(profile); |
669 | |
670 | quota = getAccessControlManager().createQuotaContext( |
671 | client_info.current_user, client_info.current_address.host(), client_info.quota_key); |
672 | is_quota_management_allowed = user->is_quota_management_allowed; |
673 | row_policy = getAccessControlManager().getRowPolicyContext(client_info.current_user); |
674 | is_row_policy_management_allowed = user->is_row_policy_management_allowed; |
675 | } |
676 | |
677 | |
678 | void Context::setProfile(const String & profile) |
679 | { |
680 | settings.setProfile(profile, *shared->users_config); |
681 | |
682 | auto new_constraints |
683 | = settings_constraints ? std::make_shared<SettingsConstraints>(*settings_constraints) : std::make_shared<SettingsConstraints>(); |
684 | new_constraints->setProfile(profile, *shared->users_config); |
685 | settings_constraints = std::move(new_constraints); |
686 | } |
687 | |
688 | std::shared_ptr<const User> Context::getUser(const String & user_name) |
689 | { |
690 | return shared->users_manager->getUser(user_name); |
691 | } |
692 | |
693 | void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key) |
694 | { |
695 | auto lock = getLock(); |
696 | |
697 | auto user_props = shared->users_manager->authorizeAndGetUser(name, password, address.host()); |
698 | |
699 | client_info.current_user = name; |
700 | client_info.current_address = address; |
701 | client_info.current_password = password; |
702 | |
703 | if (!quota_key.empty()) |
704 | client_info.quota_key = quota_key; |
705 | |
706 | calculateUserSettings(); |
707 | } |
708 | |
709 | |
710 | void Context::checkDatabaseAccessRights(const std::string & database_name) const |
711 | { |
712 | auto lock = getLock(); |
713 | checkDatabaseAccessRightsImpl(database_name); |
714 | } |
715 | |
716 | bool Context::hasDatabaseAccessRights(const String & database_name) const |
717 | { |
718 | auto lock = getLock(); |
719 | return client_info.current_user.empty() || (database_name == "system" ) || |
720 | shared->users_manager->hasAccessToDatabase(client_info.current_user, database_name); |
721 | } |
722 | |
723 | bool Context::hasDictionaryAccessRights(const String & dictionary_name) const |
724 | { |
725 | auto lock = getLock(); |
726 | return client_info.current_user.empty() || |
727 | shared->users_manager->hasAccessToDictionary(client_info.current_user, dictionary_name); |
728 | } |
729 | |
730 | void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) const |
731 | { |
732 | if (client_info.current_user.empty() || (database_name == "system" )) |
733 | { |
734 | /// An unnamed user, i.e. server, has access to all databases. |
735 | /// All users have access to the database system. |
736 | return; |
737 | } |
738 | if (!shared->users_manager->hasAccessToDatabase(client_info.current_user, database_name)) |
739 | throw Exception("Access denied to database " + database_name + " for user " + client_info.current_user , ErrorCodes::DATABASE_ACCESS_DENIED); |
740 | } |
741 | |
742 | void Context::addDependencyUnsafe(const DatabaseAndTableName & from, const DatabaseAndTableName & where) |
743 | { |
744 | checkDatabaseAccessRightsImpl(from.first); |
745 | checkDatabaseAccessRightsImpl(where.first); |
746 | shared->view_dependencies[from].insert(where); |
747 | |
748 | // Notify table of dependencies change |
749 | auto table = tryGetTable(from.first, from.second); |
750 | if (table != nullptr) |
751 | table->updateDependencies(); |
752 | } |
753 | |
754 | void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where) |
755 | { |
756 | auto lock = getLock(); |
757 | addDependencyUnsafe(from, where); |
758 | } |
759 | |
760 | void Context::removeDependencyUnsafe(const DatabaseAndTableName & from, const DatabaseAndTableName & where) |
761 | { |
762 | checkDatabaseAccessRightsImpl(from.first); |
763 | checkDatabaseAccessRightsImpl(where.first); |
764 | shared->view_dependencies[from].erase(where); |
765 | |
766 | // Notify table of dependencies change |
767 | auto table = tryGetTable(from.first, from.second); |
768 | if (table != nullptr) |
769 | table->updateDependencies(); |
770 | } |
771 | |
772 | void Context::removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where) |
773 | { |
774 | auto lock = getLock(); |
775 | removeDependencyUnsafe(from, where); |
776 | } |
777 | |
778 | Dependencies Context::getDependencies(const String & database_name, const String & table_name) const |
779 | { |
780 | auto lock = getLock(); |
781 | |
782 | String db = resolveDatabase(database_name, current_database); |
783 | |
784 | if (database_name.empty() && tryGetExternalTable(table_name)) |
785 | { |
786 | /// Table is temporary. Access granted. |
787 | } |
788 | else |
789 | { |
790 | checkDatabaseAccessRightsImpl(db); |
791 | } |
792 | |
793 | ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name)); |
794 | if (iter == shared->view_dependencies.end()) |
795 | return {}; |
796 | |
797 | return Dependencies(iter->second.begin(), iter->second.end()); |
798 | } |
799 | |
800 | bool Context::isTableExist(const String & database_name, const String & table_name) const |
801 | { |
802 | auto lock = getLock(); |
803 | |
804 | String db = resolveDatabase(database_name, current_database); |
805 | checkDatabaseAccessRightsImpl(db); |
806 | |
807 | Databases::const_iterator it = shared->databases.find(db); |
808 | return shared->databases.end() != it |
809 | && it->second->isTableExist(*this, table_name); |
810 | } |
811 | |
812 | bool Context::isDictionaryExists(const String & database_name, const String & dictionary_name) const |
813 | { |
814 | auto lock = getLock(); |
815 | |
816 | String db = resolveDatabase(database_name, current_database); |
817 | checkDatabaseAccessRightsImpl(db); |
818 | |
819 | Databases::const_iterator it = shared->databases.find(db); |
820 | return shared->databases.end() != it && it->second->isDictionaryExist(*this, dictionary_name); |
821 | } |
822 | |
823 | bool Context::isDatabaseExist(const String & database_name) const |
824 | { |
825 | auto lock = getLock(); |
826 | String db = resolveDatabase(database_name, current_database); |
827 | checkDatabaseAccessRightsImpl(db); |
828 | return shared->databases.end() != shared->databases.find(db); |
829 | } |
830 | |
831 | bool Context::isExternalTableExist(const String & table_name) const |
832 | { |
833 | return external_tables.end() != external_tables.find(table_name); |
834 | } |
835 | |
836 | |
837 | void Context::assertTableDoesntExist(const String & database_name, const String & table_name, bool check_database_access_rights) const |
838 | { |
839 | auto lock = getLock(); |
840 | |
841 | String db = resolveDatabase(database_name, current_database); |
842 | if (check_database_access_rights) |
843 | checkDatabaseAccessRightsImpl(db); |
844 | |
845 | Databases::const_iterator it = shared->databases.find(db); |
846 | if (shared->databases.end() != it && it->second->isTableExist(*this, table_name)) |
847 | throw Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " already exists." , ErrorCodes::TABLE_ALREADY_EXISTS); |
848 | } |
849 | |
850 | |
851 | void Context::assertDatabaseExists(const String & database_name, bool check_database_access_rights) const |
852 | { |
853 | auto lock = getLock(); |
854 | |
855 | String db = resolveDatabase(database_name, current_database); |
856 | if (check_database_access_rights) |
857 | checkDatabaseAccessRightsImpl(db); |
858 | |
859 | if (shared->databases.end() == shared->databases.find(db)) |
860 | throw Exception("Database " + backQuoteIfNeed(db) + " doesn't exist" , ErrorCodes::UNKNOWN_DATABASE); |
861 | } |
862 | |
863 | |
864 | void Context::assertDatabaseDoesntExist(const String & database_name) const |
865 | { |
866 | auto lock = getLock(); |
867 | |
868 | String db = resolveDatabase(database_name, current_database); |
869 | checkDatabaseAccessRightsImpl(db); |
870 | |
871 | if (shared->databases.end() != shared->databases.find(db)) |
872 | throw Exception("Database " + backQuoteIfNeed(db) + " already exists." , ErrorCodes::DATABASE_ALREADY_EXISTS); |
873 | } |
874 | |
875 | |
876 | const Scalars & Context::getScalars() const |
877 | { |
878 | return scalars; |
879 | } |
880 | |
881 | |
882 | const Block & Context::getScalar(const String & name) const |
883 | { |
884 | auto it = scalars.find(name); |
885 | if (scalars.end() == it) |
886 | throw Exception("Scalar " + backQuoteIfNeed(name) + " doesn't exist (internal bug)" , ErrorCodes::UNKNOWN_SCALAR); |
887 | return it->second; |
888 | } |
889 | |
890 | |
891 | Tables Context::getExternalTables() const |
892 | { |
893 | auto lock = getLock(); |
894 | |
895 | Tables res; |
896 | for (auto & table : external_tables) |
897 | res[table.first] = table.second.first; |
898 | |
899 | if (session_context && session_context != this) |
900 | { |
901 | Tables buf = session_context->getExternalTables(); |
902 | res.insert(buf.begin(), buf.end()); |
903 | } |
904 | else if (global_context && global_context != this) |
905 | { |
906 | Tables buf = global_context->getExternalTables(); |
907 | res.insert(buf.begin(), buf.end()); |
908 | } |
909 | return res; |
910 | } |
911 | |
912 | |
913 | StoragePtr Context::tryGetExternalTable(const String & table_name) const |
914 | { |
915 | TableAndCreateASTs::const_iterator jt = external_tables.find(table_name); |
916 | if (external_tables.end() == jt) |
917 | return StoragePtr(); |
918 | |
919 | return jt->second.first; |
920 | } |
921 | |
922 | |
923 | StoragePtr Context::getTable(const String & database_name, const String & table_name) const |
924 | { |
925 | Exception exc; |
926 | auto res = getTableImpl(database_name, table_name, &exc); |
927 | if (!res) |
928 | throw exc; |
929 | return res; |
930 | } |
931 | |
932 | |
933 | StoragePtr Context::tryGetTable(const String & database_name, const String & table_name) const |
934 | { |
935 | return getTableImpl(database_name, table_name, nullptr); |
936 | } |
937 | |
938 | |
939 | StoragePtr Context::getTableImpl(const String & database_name, const String & table_name, Exception * exception) const |
940 | { |
941 | String db; |
942 | DatabasePtr database; |
943 | |
944 | { |
945 | auto lock = getLock(); |
946 | |
947 | if (database_name.empty()) |
948 | { |
949 | StoragePtr res = tryGetExternalTable(table_name); |
950 | if (res) |
951 | return res; |
952 | } |
953 | |
954 | db = resolveDatabase(database_name, current_database); |
955 | checkDatabaseAccessRightsImpl(db); |
956 | |
957 | Databases::const_iterator it = shared->databases.find(db); |
958 | if (shared->databases.end() == it) |
959 | { |
960 | if (exception) |
961 | *exception = Exception("Database " + backQuoteIfNeed(db) + " doesn't exist" , ErrorCodes::UNKNOWN_DATABASE); |
962 | return {}; |
963 | } |
964 | |
965 | database = it->second; |
966 | } |
967 | |
968 | auto table = database->tryGetTable(*this, table_name); |
969 | if (!table) |
970 | { |
971 | if (exception) |
972 | *exception = Exception("Table " + backQuoteIfNeed(db) + "." + backQuoteIfNeed(table_name) + " doesn't exist." , ErrorCodes::UNKNOWN_TABLE); |
973 | return {}; |
974 | } |
975 | |
976 | return table; |
977 | } |
978 | |
979 | |
980 | void Context::addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast) |
981 | { |
982 | if (external_tables.end() != external_tables.find(table_name)) |
983 | throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists." , ErrorCodes::TABLE_ALREADY_EXISTS); |
984 | |
985 | external_tables[table_name] = std::pair(storage, ast); |
986 | } |
987 | |
988 | |
989 | void Context::addScalar(const String & name, const Block & block) |
990 | { |
991 | scalars[name] = block; |
992 | } |
993 | |
994 | |
995 | bool Context::hasScalar(const String & name) const |
996 | { |
997 | return scalars.count(name); |
998 | } |
999 | |
1000 | |
1001 | StoragePtr Context::tryRemoveExternalTable(const String & table_name) |
1002 | { |
1003 | TableAndCreateASTs::const_iterator it = external_tables.find(table_name); |
1004 | |
1005 | if (external_tables.end() == it) |
1006 | return StoragePtr(); |
1007 | |
1008 | auto storage = it->second.first; |
1009 | external_tables.erase(it); |
1010 | return storage; |
1011 | } |
1012 | |
1013 | |
1014 | StoragePtr Context::executeTableFunction(const ASTPtr & table_expression) |
1015 | { |
1016 | /// Slightly suboptimal. |
1017 | auto hash = table_expression->getTreeHash(); |
1018 | String key = toString(hash.first) + '_' + toString(hash.second); |
1019 | |
1020 | StoragePtr & res = table_function_results[key]; |
1021 | |
1022 | if (!res) |
1023 | { |
1024 | TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this); |
1025 | |
1026 | /// Run it and remember the result |
1027 | res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName()); |
1028 | } |
1029 | |
1030 | return res; |
1031 | } |
1032 | |
1033 | |
1034 | void Context::addViewSource(const StoragePtr & storage) |
1035 | { |
1036 | if (view_source) |
1037 | throw Exception( |
1038 | "Temporary view source storage " + backQuoteIfNeed(view_source->getName()) + " already exists." , ErrorCodes::TABLE_ALREADY_EXISTS); |
1039 | view_source = storage; |
1040 | } |
1041 | |
1042 | |
1043 | StoragePtr Context::getViewSource() |
1044 | { |
1045 | return view_source; |
1046 | } |
1047 | |
1048 | |
1049 | DDLGuard::DDLGuard(Map & map_, std::unique_lock<std::mutex> guards_lock_, const String & elem) |
1050 | : map(map_), guards_lock(std::move(guards_lock_)) |
1051 | { |
1052 | it = map.emplace(elem, Entry{std::make_unique<std::mutex>(), 0}).first; |
1053 | ++it->second.counter; |
1054 | guards_lock.unlock(); |
1055 | table_lock = std::unique_lock(*it->second.mutex); |
1056 | } |
1057 | |
1058 | DDLGuard::~DDLGuard() |
1059 | { |
1060 | guards_lock.lock(); |
1061 | --it->second.counter; |
1062 | if (!it->second.counter) |
1063 | { |
1064 | table_lock.unlock(); |
1065 | map.erase(it); |
1066 | } |
1067 | } |
1068 | |
1069 | std::unique_ptr<DDLGuard> Context::getDDLGuard(const String & database, const String & table) const |
1070 | { |
1071 | std::unique_lock lock(shared->ddl_guards_mutex); |
1072 | return std::make_unique<DDLGuard>(shared->ddl_guards[database], std::move(lock), table); |
1073 | } |
1074 | |
1075 | |
1076 | void Context::addDatabase(const String & database_name, const DatabasePtr & database) |
1077 | { |
1078 | auto lock = getLock(); |
1079 | |
1080 | assertDatabaseDoesntExist(database_name); |
1081 | shared->databases[database_name] = database; |
1082 | } |
1083 | |
1084 | |
1085 | DatabasePtr Context::detachDatabase(const String & database_name) |
1086 | { |
1087 | auto lock = getLock(); |
1088 | auto res = getDatabase(database_name); |
1089 | shared->databases.erase(database_name); |
1090 | |
1091 | return res; |
1092 | } |
1093 | |
1094 | |
1095 | ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const |
1096 | { |
1097 | TableAndCreateASTs::const_iterator jt = external_tables.find(table_name); |
1098 | if (external_tables.end() == jt) |
1099 | throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " doesn't exist" , ErrorCodes::UNKNOWN_TABLE); |
1100 | |
1101 | return jt->second.second; |
1102 | } |
1103 | |
1104 | Settings Context::getSettings() const |
1105 | { |
1106 | return settings; |
1107 | } |
1108 | |
1109 | |
1110 | void Context::setSettings(const Settings & settings_) |
1111 | { |
1112 | settings = settings_; |
1113 | } |
1114 | |
1115 | |
1116 | void Context::setSetting(const String & name, const String & value) |
1117 | { |
1118 | auto lock = getLock(); |
1119 | if (name == "profile" ) |
1120 | { |
1121 | setProfile(value); |
1122 | return; |
1123 | } |
1124 | settings.set(name, value); |
1125 | } |
1126 | |
1127 | |
1128 | void Context::setSetting(const String & name, const Field & value) |
1129 | { |
1130 | auto lock = getLock(); |
1131 | if (name == "profile" ) |
1132 | { |
1133 | setProfile(value.safeGet<String>()); |
1134 | return; |
1135 | } |
1136 | settings.set(name, value); |
1137 | } |
1138 | |
1139 | |
1140 | void Context::applySettingChange(const SettingChange & change) |
1141 | { |
1142 | setSetting(change.name, change.value); |
1143 | } |
1144 | |
1145 | |
1146 | void Context::applySettingsChanges(const SettingsChanges & changes) |
1147 | { |
1148 | auto lock = getLock(); |
1149 | for (const SettingChange & change : changes) |
1150 | applySettingChange(change); |
1151 | } |
1152 | |
1153 | |
1154 | void Context::checkSettingsConstraints(const SettingChange & change) |
1155 | { |
1156 | if (settings_constraints) |
1157 | settings_constraints->check(settings, change); |
1158 | } |
1159 | |
1160 | |
1161 | void Context::checkSettingsConstraints(const SettingsChanges & changes) |
1162 | { |
1163 | if (settings_constraints) |
1164 | settings_constraints->check(settings, changes); |
1165 | } |
1166 | |
1167 | |
1168 | String Context::getCurrentDatabase() const |
1169 | { |
1170 | return current_database; |
1171 | } |
1172 | |
1173 | |
1174 | String Context::getCurrentQueryId() const |
1175 | { |
1176 | return client_info.current_query_id; |
1177 | } |
1178 | |
1179 | |
1180 | String Context::getInitialQueryId() const |
1181 | { |
1182 | return client_info.initial_query_id; |
1183 | } |
1184 | |
1185 | |
1186 | void Context::setCurrentDatabase(const String & name) |
1187 | { |
1188 | auto lock = getLock(); |
1189 | assertDatabaseExists(name); |
1190 | current_database = name; |
1191 | } |
1192 | |
1193 | |
1194 | void Context::setCurrentQueryId(const String & query_id) |
1195 | { |
1196 | if (!client_info.current_query_id.empty()) |
1197 | throw Exception("Logical error: attempt to set query_id twice" , ErrorCodes::LOGICAL_ERROR); |
1198 | |
1199 | String query_id_to_set = query_id; |
1200 | |
1201 | if (query_id_to_set.empty()) /// If the user did not submit his query_id, then we generate it ourselves. |
1202 | { |
1203 | /// Generate random UUID, but using lower quality RNG, |
1204 | /// because Poco::UUIDGenerator::generateRandom method is using /dev/random, that is very expensive. |
1205 | /// NOTE: Actually we don't need to use UUIDs for query identifiers. |
1206 | /// We could use any suitable string instead. |
1207 | |
1208 | union |
1209 | { |
1210 | char bytes[16]; |
1211 | struct |
1212 | { |
1213 | UInt64 a; |
1214 | UInt64 b; |
1215 | } words; |
1216 | } random; |
1217 | |
1218 | random.words.a = thread_local_rng(); //-V656 |
1219 | random.words.b = thread_local_rng(); //-V656 |
1220 | |
1221 | /// Use protected constructor. |
1222 | struct qUUID : Poco::UUID |
1223 | { |
1224 | qUUID(const char * bytes, Poco::UUID::Version version) |
1225 | : Poco::UUID(bytes, version) {} |
1226 | }; |
1227 | |
1228 | query_id_to_set = qUUID(random.bytes, Poco::UUID::UUID_RANDOM).toString(); |
1229 | } |
1230 | |
1231 | client_info.current_query_id = query_id_to_set; |
1232 | } |
1233 | |
1234 | void Context::killCurrentQuery() |
1235 | { |
1236 | if (process_list_elem) |
1237 | { |
1238 | process_list_elem->cancelQuery(true); |
1239 | } |
1240 | }; |
1241 | |
1242 | String Context::getDefaultFormat() const |
1243 | { |
1244 | return default_format.empty() ? "TabSeparated" : default_format; |
1245 | } |
1246 | |
1247 | |
1248 | void Context::setDefaultFormat(const String & name) |
1249 | { |
1250 | default_format = name; |
1251 | } |
1252 | |
1253 | MultiVersion<Macros>::Version Context::getMacros() const |
1254 | { |
1255 | return shared->macros.get(); |
1256 | } |
1257 | |
1258 | void Context::setMacros(std::unique_ptr<Macros> && macros) |
1259 | { |
1260 | shared->macros.set(std::move(macros)); |
1261 | } |
1262 | |
1263 | const Context & Context::getQueryContext() const |
1264 | { |
1265 | if (!query_context) |
1266 | throw Exception("There is no query" , ErrorCodes::THERE_IS_NO_QUERY); |
1267 | return *query_context; |
1268 | } |
1269 | |
1270 | Context & Context::getQueryContext() |
1271 | { |
1272 | if (!query_context) |
1273 | throw Exception("There is no query" , ErrorCodes::THERE_IS_NO_QUERY); |
1274 | return *query_context; |
1275 | } |
1276 | |
1277 | const Context & Context::getSessionContext() const |
1278 | { |
1279 | if (!session_context) |
1280 | throw Exception("There is no session" , ErrorCodes::THERE_IS_NO_SESSION); |
1281 | return *session_context; |
1282 | } |
1283 | |
1284 | Context & Context::getSessionContext() |
1285 | { |
1286 | if (!session_context) |
1287 | throw Exception("There is no session" , ErrorCodes::THERE_IS_NO_SESSION); |
1288 | return *session_context; |
1289 | } |
1290 | |
1291 | const Context & Context::getGlobalContext() const |
1292 | { |
1293 | if (!global_context) |
1294 | throw Exception("Logical error: there is no global context" , ErrorCodes::LOGICAL_ERROR); |
1295 | return *global_context; |
1296 | } |
1297 | |
1298 | Context & Context::getGlobalContext() |
1299 | { |
1300 | if (!global_context) |
1301 | throw Exception("Logical error: there is no global context" , ErrorCodes::LOGICAL_ERROR); |
1302 | return *global_context; |
1303 | } |
1304 | |
1305 | |
1306 | const EmbeddedDictionaries & Context::getEmbeddedDictionaries() const |
1307 | { |
1308 | return getEmbeddedDictionariesImpl(false); |
1309 | } |
1310 | |
1311 | EmbeddedDictionaries & Context::getEmbeddedDictionaries() |
1312 | { |
1313 | return getEmbeddedDictionariesImpl(false); |
1314 | } |
1315 | |
1316 | |
1317 | const ExternalDictionariesLoader & Context::getExternalDictionariesLoader() const |
1318 | { |
1319 | std::lock_guard lock(shared->external_dictionaries_mutex); |
1320 | if (!shared->external_dictionaries_loader) |
1321 | { |
1322 | if (!this->global_context) |
1323 | throw Exception("Logical error: there is no global context" , ErrorCodes::LOGICAL_ERROR); |
1324 | |
1325 | shared->external_dictionaries_loader.emplace(*this->global_context); |
1326 | } |
1327 | return *shared->external_dictionaries_loader; |
1328 | } |
1329 | |
1330 | ExternalDictionariesLoader & Context::getExternalDictionariesLoader() |
1331 | { |
1332 | return const_cast<ExternalDictionariesLoader &>(const_cast<const Context *>(this)->getExternalDictionariesLoader()); |
1333 | } |
1334 | |
1335 | |
1336 | const ExternalModelsLoader & Context::getExternalModelsLoader() const |
1337 | { |
1338 | std::lock_guard lock(shared->external_models_mutex); |
1339 | if (!shared->external_models_loader) |
1340 | { |
1341 | if (!this->global_context) |
1342 | throw Exception("Logical error: there is no global context" , ErrorCodes::LOGICAL_ERROR); |
1343 | |
1344 | shared->external_models_loader.emplace(*this->global_context); |
1345 | } |
1346 | return *shared->external_models_loader; |
1347 | } |
1348 | |
1349 | ExternalModelsLoader & Context::getExternalModelsLoader() |
1350 | { |
1351 | return const_cast<ExternalModelsLoader &>(const_cast<const Context *>(this)->getExternalModelsLoader()); |
1352 | } |
1353 | |
1354 | |
1355 | EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_error) const |
1356 | { |
1357 | std::lock_guard lock(shared->embedded_dictionaries_mutex); |
1358 | |
1359 | if (!shared->embedded_dictionaries) |
1360 | { |
1361 | auto geo_dictionaries_loader = std::make_unique<GeoDictionariesLoader>(); |
1362 | |
1363 | shared->embedded_dictionaries.emplace( |
1364 | std::move(geo_dictionaries_loader), |
1365 | *this->global_context, |
1366 | throw_on_error); |
1367 | } |
1368 | |
1369 | return *shared->embedded_dictionaries; |
1370 | } |
1371 | |
1372 | |
1373 | void Context::tryCreateEmbeddedDictionaries() const |
1374 | { |
1375 | static_cast<void>(getEmbeddedDictionariesImpl(true)); |
1376 | } |
1377 | |
1378 | |
1379 | void Context::setProgressCallback(ProgressCallback callback) |
1380 | { |
1381 | /// Callback is set to a session or to a query. In the session, only one query is processed at a time. Therefore, the lock is not needed. |
1382 | progress_callback = callback; |
1383 | } |
1384 | |
1385 | ProgressCallback Context::getProgressCallback() const |
1386 | { |
1387 | return progress_callback; |
1388 | } |
1389 | |
1390 | |
1391 | void Context::setProcessListElement(ProcessList::Element * elem) |
1392 | { |
1393 | /// Set to a session or query. In the session, only one query is processed at a time. Therefore, the lock is not needed. |
1394 | process_list_elem = elem; |
1395 | } |
1396 | |
1397 | ProcessList::Element * Context::getProcessListElement() const |
1398 | { |
1399 | return process_list_elem; |
1400 | } |
1401 | |
1402 | |
1403 | void Context::setUncompressedCache(size_t max_size_in_bytes) |
1404 | { |
1405 | auto lock = getLock(); |
1406 | |
1407 | if (shared->uncompressed_cache) |
1408 | throw Exception("Uncompressed cache has been already created." , ErrorCodes::LOGICAL_ERROR); |
1409 | |
1410 | shared->uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes); |
1411 | } |
1412 | |
1413 | |
1414 | UncompressedCachePtr Context::getUncompressedCache() const |
1415 | { |
1416 | auto lock = getLock(); |
1417 | return shared->uncompressed_cache; |
1418 | } |
1419 | |
1420 | |
1421 | void Context::dropUncompressedCache() const |
1422 | { |
1423 | auto lock = getLock(); |
1424 | if (shared->uncompressed_cache) |
1425 | shared->uncompressed_cache->reset(); |
1426 | } |
1427 | |
1428 | |
1429 | void Context::setMarkCache(size_t cache_size_in_bytes) |
1430 | { |
1431 | auto lock = getLock(); |
1432 | |
1433 | if (shared->mark_cache) |
1434 | throw Exception("Mark cache has been already created." , ErrorCodes::LOGICAL_ERROR); |
1435 | |
1436 | shared->mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes); |
1437 | } |
1438 | |
1439 | |
1440 | MarkCachePtr Context::getMarkCache() const |
1441 | { |
1442 | auto lock = getLock(); |
1443 | return shared->mark_cache; |
1444 | } |
1445 | |
1446 | |
1447 | void Context::dropMarkCache() const |
1448 | { |
1449 | auto lock = getLock(); |
1450 | if (shared->mark_cache) |
1451 | shared->mark_cache->reset(); |
1452 | } |
1453 | |
1454 | |
1455 | void Context::dropCaches() const |
1456 | { |
1457 | auto lock = getLock(); |
1458 | |
1459 | if (shared->uncompressed_cache) |
1460 | shared->uncompressed_cache->reset(); |
1461 | |
1462 | if (shared->mark_cache) |
1463 | shared->mark_cache->reset(); |
1464 | } |
1465 | |
1466 | BackgroundProcessingPool & Context::getBackgroundPool() |
1467 | { |
1468 | auto lock = getLock(); |
1469 | if (!shared->background_pool) |
1470 | shared->background_pool.emplace(settings.background_pool_size); |
1471 | return *shared->background_pool; |
1472 | } |
1473 | |
1474 | BackgroundProcessingPool & Context::getBackgroundMovePool() |
1475 | { |
1476 | auto lock = getLock(); |
1477 | if (!shared->background_move_pool) |
1478 | { |
1479 | BackgroundProcessingPool::PoolSettings pool_settings; |
1480 | auto & config = getConfigRef(); |
1481 | pool_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds" , 10); |
1482 | pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part" , 1.0); |
1483 | pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do" , 0.1); |
1484 | pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min" , 10); |
1485 | pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max" , 600); |
1486 | pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier" , 1.1); |
1487 | pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part" , 1.0); |
1488 | pool_settings.tasks_metric = CurrentMetrics::BackgroundMovePoolTask; |
1489 | pool_settings.memory_metric = CurrentMetrics::MemoryTrackingInBackgroundMoveProcessingPool; |
1490 | shared->background_move_pool.emplace(settings.background_move_pool_size, pool_settings, "BackgroundMovePool" , "BgMoveProcPool" ); |
1491 | } |
1492 | return *shared->background_move_pool; |
1493 | } |
1494 | |
1495 | BackgroundSchedulePool & Context::getSchedulePool() |
1496 | { |
1497 | auto lock = getLock(); |
1498 | if (!shared->schedule_pool) |
1499 | shared->schedule_pool.emplace(settings.background_schedule_pool_size); |
1500 | return *shared->schedule_pool; |
1501 | } |
1502 | |
1503 | void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker) |
1504 | { |
1505 | auto lock = getLock(); |
1506 | if (shared->ddl_worker) |
1507 | throw Exception("DDL background thread has already been initialized." , ErrorCodes::LOGICAL_ERROR); |
1508 | shared->ddl_worker = std::move(ddl_worker); |
1509 | } |
1510 | |
1511 | DDLWorker & Context::getDDLWorker() const |
1512 | { |
1513 | auto lock = getLock(); |
1514 | if (!shared->ddl_worker) |
1515 | throw Exception("DDL background thread is not initialized." , ErrorCodes::LOGICAL_ERROR); |
1516 | return *shared->ddl_worker; |
1517 | } |
1518 | |
1519 | zkutil::ZooKeeperPtr Context::getZooKeeper() const |
1520 | { |
1521 | std::lock_guard lock(shared->zookeeper_mutex); |
1522 | |
1523 | if (!shared->zookeeper) |
1524 | shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(getConfigRef(), "zookeeper" ); |
1525 | else if (shared->zookeeper->expired()) |
1526 | shared->zookeeper = shared->zookeeper->startNewSession(); |
1527 | |
1528 | return shared->zookeeper; |
1529 | } |
1530 | |
1531 | void Context::resetZooKeeper() const |
1532 | { |
1533 | std::lock_guard lock(shared->zookeeper_mutex); |
1534 | shared->zookeeper.reset(); |
1535 | } |
1536 | |
1537 | bool Context::hasZooKeeper() const |
1538 | { |
1539 | return getConfigRef().has("zookeeper" ); |
1540 | } |
1541 | |
1542 | |
1543 | void Context::setInterserverIOAddress(const String & host, UInt16 port) |
1544 | { |
1545 | shared->interserver_io_host = host; |
1546 | shared->interserver_io_port = port; |
1547 | } |
1548 | |
1549 | std::pair<String, UInt16> Context::getInterserverIOAddress() const |
1550 | { |
1551 | if (shared->interserver_io_host.empty() || shared->interserver_io_port == 0) |
1552 | throw Exception("Parameter 'interserver_http(s)_port' required for replication is not specified in configuration file." , |
1553 | ErrorCodes::NO_ELEMENTS_IN_CONFIG); |
1554 | |
1555 | return { shared->interserver_io_host, shared->interserver_io_port }; |
1556 | } |
1557 | |
1558 | void Context::setInterserverCredentials(const String & user, const String & password) |
1559 | { |
1560 | shared->interserver_io_user = user; |
1561 | shared->interserver_io_password = password; |
1562 | } |
1563 | |
1564 | std::pair<String, String> Context::getInterserverCredentials() const |
1565 | { |
1566 | return { shared->interserver_io_user, shared->interserver_io_password }; |
1567 | } |
1568 | |
1569 | void Context::setInterserverScheme(const String & scheme) |
1570 | { |
1571 | shared->interserver_scheme = scheme; |
1572 | } |
1573 | |
1574 | String Context::getInterserverScheme() const |
1575 | { |
1576 | return shared->interserver_scheme; |
1577 | } |
1578 | |
1579 | void Context::setRemoteHostFilter(const Poco::Util::AbstractConfiguration & config) |
1580 | { |
1581 | shared->remote_host_filter.setValuesFromConfig(config); |
1582 | } |
1583 | |
1584 | const RemoteHostFilter & Context::getRemoteHostFilter() const |
1585 | { |
1586 | return shared->remote_host_filter; |
1587 | } |
1588 | |
1589 | UInt16 Context::getTCPPort() const |
1590 | { |
1591 | auto lock = getLock(); |
1592 | |
1593 | auto & config = getConfigRef(); |
1594 | return config.getInt("tcp_port" , DBMS_DEFAULT_PORT); |
1595 | } |
1596 | |
1597 | std::optional<UInt16> Context::getTCPPortSecure() const |
1598 | { |
1599 | auto lock = getLock(); |
1600 | |
1601 | auto & config = getConfigRef(); |
1602 | if (config.has("tcp_port_secure" )) |
1603 | return config.getInt("tcp_port_secure" ); |
1604 | return {}; |
1605 | } |
1606 | |
1607 | std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const |
1608 | { |
1609 | auto res = getClusters().getCluster(cluster_name); |
1610 | |
1611 | if (!res) |
1612 | throw Exception("Requested cluster '" + cluster_name + "' not found" , ErrorCodes::BAD_GET); |
1613 | |
1614 | return res; |
1615 | } |
1616 | |
1617 | |
1618 | std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const |
1619 | { |
1620 | return getClusters().getCluster(cluster_name); |
1621 | } |
1622 | |
1623 | |
1624 | void Context::reloadClusterConfig() |
1625 | { |
1626 | while (true) |
1627 | { |
1628 | ConfigurationPtr cluster_config; |
1629 | { |
1630 | std::lock_guard lock(shared->clusters_mutex); |
1631 | cluster_config = shared->clusters_config; |
1632 | } |
1633 | |
1634 | auto & config = cluster_config ? *cluster_config : getConfigRef(); |
1635 | auto new_clusters = std::make_unique<Clusters>(config, settings); |
1636 | |
1637 | { |
1638 | std::lock_guard lock(shared->clusters_mutex); |
1639 | if (shared->clusters_config.get() == cluster_config.get()) |
1640 | { |
1641 | shared->clusters = std::move(new_clusters); |
1642 | return; |
1643 | } |
1644 | |
1645 | /// Clusters config has been suddenly changed, recompute clusters |
1646 | } |
1647 | } |
1648 | } |
1649 | |
1650 | |
1651 | Clusters & Context::getClusters() const |
1652 | { |
1653 | std::lock_guard lock(shared->clusters_mutex); |
1654 | if (!shared->clusters) |
1655 | { |
1656 | auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef(); |
1657 | shared->clusters = std::make_unique<Clusters>(config, settings); |
1658 | } |
1659 | |
1660 | return *shared->clusters; |
1661 | } |
1662 | |
1663 | |
1664 | /// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters |
1665 | void Context::setClustersConfig(const ConfigurationPtr & config, const String & config_name) |
1666 | { |
1667 | std::lock_guard lock(shared->clusters_mutex); |
1668 | |
1669 | shared->clusters_config = config; |
1670 | |
1671 | if (!shared->clusters) |
1672 | shared->clusters = std::make_unique<Clusters>(*shared->clusters_config, settings, config_name); |
1673 | else |
1674 | shared->clusters->updateClusters(*shared->clusters_config, settings, config_name); |
1675 | } |
1676 | |
1677 | |
1678 | void Context::setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster) |
1679 | { |
1680 | std::lock_guard lock(shared->clusters_mutex); |
1681 | |
1682 | if (!shared->clusters) |
1683 | throw Exception("Clusters are not set" , ErrorCodes::LOGICAL_ERROR); |
1684 | |
1685 | shared->clusters->setCluster(cluster_name, cluster); |
1686 | } |
1687 | |
1688 | |
1689 | void Context::initializeSystemLogs() |
1690 | { |
1691 | auto lock = getLock(); |
1692 | shared->system_logs.emplace(*global_context, getConfigRef()); |
1693 | } |
1694 | |
1695 | bool Context::hasTraceCollector() |
1696 | { |
1697 | return shared->hasTraceCollector(); |
1698 | } |
1699 | |
1700 | void Context::initializeTraceCollector() |
1701 | { |
1702 | shared->initializeTraceCollector(getTraceLog()); |
1703 | } |
1704 | |
1705 | |
1706 | std::shared_ptr<QueryLog> Context::getQueryLog() |
1707 | { |
1708 | auto lock = getLock(); |
1709 | |
1710 | if (!shared->system_logs || !shared->system_logs->query_log) |
1711 | return {}; |
1712 | |
1713 | return shared->system_logs->query_log; |
1714 | } |
1715 | |
1716 | |
1717 | std::shared_ptr<QueryThreadLog> Context::getQueryThreadLog() |
1718 | { |
1719 | auto lock = getLock(); |
1720 | |
1721 | if (!shared->system_logs || !shared->system_logs->query_thread_log) |
1722 | return {}; |
1723 | |
1724 | return shared->system_logs->query_thread_log; |
1725 | } |
1726 | |
1727 | |
1728 | std::shared_ptr<PartLog> Context::getPartLog(const String & part_database) |
1729 | { |
1730 | auto lock = getLock(); |
1731 | |
1732 | /// No part log or system logs are shutting down. |
1733 | if (!shared->system_logs || !shared->system_logs->part_log) |
1734 | return {}; |
1735 | |
1736 | /// Will not log operations on system tables (including part_log itself). |
1737 | /// It doesn't make sense and not allow to destruct PartLog correctly due to infinite logging and flushing, |
1738 | /// and also make troubles on startup. |
1739 | if (part_database == shared->system_logs->part_log_database) |
1740 | return {}; |
1741 | |
1742 | return shared->system_logs->part_log; |
1743 | } |
1744 | |
1745 | |
1746 | std::shared_ptr<TraceLog> Context::getTraceLog() |
1747 | { |
1748 | auto lock = getLock(); |
1749 | |
1750 | if (!shared->system_logs || !shared->system_logs->trace_log) |
1751 | return {}; |
1752 | |
1753 | return shared->system_logs->trace_log; |
1754 | } |
1755 | |
1756 | |
1757 | std::shared_ptr<TextLog> Context::getTextLog() |
1758 | { |
1759 | auto lock = getLock(); |
1760 | |
1761 | if (!shared->system_logs || !shared->system_logs->text_log) |
1762 | return {}; |
1763 | |
1764 | return shared->system_logs->text_log; |
1765 | } |
1766 | |
1767 | |
1768 | std::shared_ptr<MetricLog> Context::getMetricLog() |
1769 | { |
1770 | auto lock = getLock(); |
1771 | |
1772 | if (!shared->system_logs || !shared->system_logs->metric_log) |
1773 | return {}; |
1774 | |
1775 | return shared->system_logs->metric_log; |
1776 | } |
1777 | |
1778 | |
1779 | CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const |
1780 | { |
1781 | auto lock = getLock(); |
1782 | |
1783 | if (!shared->compression_codec_selector) |
1784 | { |
1785 | constexpr auto config_name = "compression" ; |
1786 | auto & config = getConfigRef(); |
1787 | |
1788 | if (config.has(config_name)) |
1789 | shared->compression_codec_selector = std::make_unique<CompressionCodecSelector>(config, "compression" ); |
1790 | else |
1791 | shared->compression_codec_selector = std::make_unique<CompressionCodecSelector>(); |
1792 | } |
1793 | |
1794 | return shared->compression_codec_selector->choose(part_size, part_size_ratio); |
1795 | } |
1796 | |
1797 | |
1798 | const DiskPtr & Context::getDisk(const String & name) const |
1799 | { |
1800 | auto lock = getLock(); |
1801 | |
1802 | const auto & disk_selector = getDiskSelector(); |
1803 | |
1804 | return disk_selector[name]; |
1805 | } |
1806 | |
1807 | |
1808 | DiskSelector & Context::getDiskSelector() const |
1809 | { |
1810 | auto lock = getLock(); |
1811 | |
1812 | if (!shared->merge_tree_disk_selector) |
1813 | { |
1814 | constexpr auto config_name = "storage_configuration.disks" ; |
1815 | auto & config = getConfigRef(); |
1816 | |
1817 | shared->merge_tree_disk_selector = std::make_unique<DiskSelector>(config, config_name, *this); |
1818 | } |
1819 | return *shared->merge_tree_disk_selector; |
1820 | } |
1821 | |
1822 | |
1823 | const StoragePolicyPtr & Context::getStoragePolicy(const String & name) const |
1824 | { |
1825 | auto lock = getLock(); |
1826 | |
1827 | auto & policy_selector = getStoragePolicySelector(); |
1828 | |
1829 | return policy_selector[name]; |
1830 | } |
1831 | |
1832 | |
1833 | StoragePolicySelector & Context::getStoragePolicySelector() const |
1834 | { |
1835 | auto lock = getLock(); |
1836 | |
1837 | if (!shared->merge_tree_storage_policy_selector) |
1838 | { |
1839 | constexpr auto config_name = "storage_configuration.policies" ; |
1840 | auto & config = getConfigRef(); |
1841 | |
1842 | shared->merge_tree_storage_policy_selector = std::make_unique<StoragePolicySelector>(config, config_name, getDiskSelector()); |
1843 | } |
1844 | return *shared->merge_tree_storage_policy_selector; |
1845 | } |
1846 | |
1847 | |
1848 | const MergeTreeSettings & Context::getMergeTreeSettings() const |
1849 | { |
1850 | auto lock = getLock(); |
1851 | |
1852 | if (!shared->merge_tree_settings) |
1853 | { |
1854 | auto & config = getConfigRef(); |
1855 | MergeTreeSettings mt_settings; |
1856 | mt_settings.loadFromConfig("merge_tree" , config); |
1857 | shared->merge_tree_settings.emplace(mt_settings); |
1858 | } |
1859 | |
1860 | return *shared->merge_tree_settings; |
1861 | } |
1862 | |
1863 | |
1864 | void Context::checkCanBeDropped(const String & database, const String & table, const size_t & size, const size_t & max_size_to_drop) const |
1865 | { |
1866 | if (!max_size_to_drop || size <= max_size_to_drop) |
1867 | return; |
1868 | |
1869 | Poco::File force_file(getFlagsPath() + "force_drop_table" ); |
1870 | bool force_file_exists = force_file.exists(); |
1871 | |
1872 | if (force_file_exists) |
1873 | { |
1874 | try |
1875 | { |
1876 | force_file.remove(); |
1877 | return; |
1878 | } |
1879 | catch (...) |
1880 | { |
1881 | /// User should recreate force file on each drop, it shouldn't be protected |
1882 | tryLogCurrentException("Drop table check" , "Can't remove force file to enable table or partition drop" ); |
1883 | } |
1884 | } |
1885 | |
1886 | String size_str = formatReadableSizeWithDecimalSuffix(size); |
1887 | String max_size_to_drop_str = formatReadableSizeWithDecimalSuffix(max_size_to_drop); |
1888 | std::stringstream ostr; |
1889 | |
1890 | ostr << "Table or Partition in " << backQuoteIfNeed(database) << "." << backQuoteIfNeed(table) << " was not dropped.\n" |
1891 | << "Reason:\n" |
1892 | << "1. Size (" << size_str << ") is greater than max_[table/partition]_size_to_drop (" << max_size_to_drop_str << ")\n" |
1893 | << "2. File '" << force_file.path() << "' intended to force DROP " |
1894 | << (force_file_exists ? "exists but not writeable (could not be removed)" : "doesn't exist" ) << "\n" ; |
1895 | |
1896 | ostr << "How to fix this:\n" |
1897 | << "1. Either increase (or set to zero) max_[table/partition]_size_to_drop in server config and restart ClickHouse\n" |
1898 | << "2. Either create forcing file " << force_file.path() << " and make sure that ClickHouse has write permission for it.\n" |
1899 | << "Example:\nsudo touch '" << force_file.path() << "' && sudo chmod 666 '" << force_file.path() << "'" ; |
1900 | |
1901 | throw Exception(ostr.str(), ErrorCodes::TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT); |
1902 | } |
1903 | |
1904 | |
1905 | void Context::setMaxTableSizeToDrop(size_t max_size) |
1906 | { |
1907 | // Is initialized at server startup and updated at config reload |
1908 | shared->max_table_size_to_drop.store(max_size, std::memory_order_relaxed); |
1909 | } |
1910 | |
1911 | |
1912 | void Context::checkTableCanBeDropped(const String & database, const String & table, const size_t & table_size) const |
1913 | { |
1914 | size_t max_table_size_to_drop = shared->max_table_size_to_drop.load(std::memory_order_relaxed); |
1915 | |
1916 | checkCanBeDropped(database, table, table_size, max_table_size_to_drop); |
1917 | } |
1918 | |
1919 | |
1920 | void Context::setMaxPartitionSizeToDrop(size_t max_size) |
1921 | { |
1922 | // Is initialized at server startup and updated at config reload |
1923 | shared->max_partition_size_to_drop.store(max_size, std::memory_order_relaxed); |
1924 | } |
1925 | |
1926 | |
1927 | void Context::checkPartitionCanBeDropped(const String & database, const String & table, const size_t & partition_size) const |
1928 | { |
1929 | size_t max_partition_size_to_drop = shared->max_partition_size_to_drop.load(std::memory_order_relaxed); |
1930 | |
1931 | checkCanBeDropped(database, table, partition_size, max_partition_size_to_drop); |
1932 | } |
1933 | |
1934 | |
1935 | BlockInputStreamPtr Context::getInputFormat(const String & name, ReadBuffer & buf, const Block & sample, UInt64 max_block_size) const |
1936 | { |
1937 | return FormatFactory::instance().getInput(name, buf, sample, *this, max_block_size); |
1938 | } |
1939 | |
1940 | BlockOutputStreamPtr Context::getOutputFormat(const String & name, WriteBuffer & buf, const Block & sample) const |
1941 | { |
1942 | return FormatFactory::instance().getOutput(name, buf, sample, *this); |
1943 | } |
1944 | |
1945 | OutputFormatPtr Context::getOutputFormatProcessor(const String & name, WriteBuffer & buf, const Block & sample) const |
1946 | { |
1947 | return FormatFactory::instance().getOutputFormat(name, buf, sample, *this); |
1948 | } |
1949 | |
1950 | |
1951 | time_t Context::getUptimeSeconds() const |
1952 | { |
1953 | auto lock = getLock(); |
1954 | return shared->uptime_watch.elapsedSeconds(); |
1955 | } |
1956 | |
1957 | |
1958 | void Context::setConfigReloadCallback(ConfigReloadCallback && callback) |
1959 | { |
1960 | /// Is initialized at server startup, so lock isn't required. Otherwise use mutex. |
1961 | shared->config_reload_callback = std::move(callback); |
1962 | } |
1963 | |
1964 | void Context::reloadConfig() const |
1965 | { |
1966 | /// Use mutex if callback may be changed after startup. |
1967 | if (!shared->config_reload_callback) |
1968 | throw Exception("Can't reload config beacuse config_reload_callback is not set." , ErrorCodes::LOGICAL_ERROR); |
1969 | |
1970 | shared->config_reload_callback(); |
1971 | } |
1972 | |
1973 | |
1974 | void Context::shutdown() |
1975 | { |
1976 | shared->shutdown(); |
1977 | } |
1978 | |
1979 | |
1980 | Context::ApplicationType Context::getApplicationType() const |
1981 | { |
1982 | return shared->application_type; |
1983 | } |
1984 | |
1985 | void Context::setApplicationType(ApplicationType type) |
1986 | { |
1987 | /// Lock isn't required, you should set it at start |
1988 | shared->application_type = type; |
1989 | } |
1990 | |
1991 | void Context::setDefaultProfiles(const Poco::Util::AbstractConfiguration & config) |
1992 | { |
1993 | shared->default_profile_name = config.getString("default_profile" , "default" ); |
1994 | shared->system_profile_name = config.getString("system_profile" , shared->default_profile_name); |
1995 | setSetting("profile" , shared->system_profile_name); |
1996 | } |
1997 | |
1998 | String Context::getDefaultProfileName() const |
1999 | { |
2000 | return shared->default_profile_name; |
2001 | } |
2002 | |
2003 | String Context::getSystemProfileName() const |
2004 | { |
2005 | return shared->system_profile_name; |
2006 | } |
2007 | |
2008 | String Context::getFormatSchemaPath() const |
2009 | { |
2010 | return shared->format_schema_path; |
2011 | } |
2012 | |
2013 | void Context::setFormatSchemaPath(const String & path) |
2014 | { |
2015 | shared->format_schema_path = path; |
2016 | } |
2017 | |
2018 | Context::SampleBlockCache & Context::getSampleBlockCache() const |
2019 | { |
2020 | return getQueryContext().sample_block_cache; |
2021 | } |
2022 | |
2023 | |
2024 | bool Context::hasQueryParameters() const |
2025 | { |
2026 | return !query_parameters.empty(); |
2027 | } |
2028 | |
2029 | |
2030 | const NameToNameMap & Context::getQueryParameters() const |
2031 | { |
2032 | return query_parameters; |
2033 | } |
2034 | |
2035 | |
2036 | void Context::setQueryParameter(const String & name, const String & value) |
2037 | { |
2038 | if (!query_parameters.emplace(name, value).second) |
2039 | throw Exception("Duplicate name " + backQuote(name) + " of query parameter" , ErrorCodes::BAD_ARGUMENTS); |
2040 | } |
2041 | |
2042 | |
2043 | #if USE_EMBEDDED_COMPILER |
2044 | |
2045 | std::shared_ptr<CompiledExpressionCache> Context::getCompiledExpressionCache() const |
2046 | { |
2047 | auto lock = getLock(); |
2048 | return shared->compiled_expression_cache; |
2049 | } |
2050 | |
2051 | void Context::setCompiledExpressionCache(size_t cache_size) |
2052 | { |
2053 | |
2054 | auto lock = getLock(); |
2055 | |
2056 | if (shared->compiled_expression_cache) |
2057 | throw Exception("Compiled expressions cache has been already created." , ErrorCodes::LOGICAL_ERROR); |
2058 | |
2059 | shared->compiled_expression_cache = std::make_shared<CompiledExpressionCache>(cache_size); |
2060 | } |
2061 | |
2062 | void Context::dropCompiledExpressionCache() const |
2063 | { |
2064 | auto lock = getLock(); |
2065 | if (shared->compiled_expression_cache) |
2066 | shared->compiled_expression_cache->reset(); |
2067 | } |
2068 | |
2069 | #endif |
2070 | |
2071 | |
2072 | void Context::addXDBCBridgeCommand(std::unique_ptr<ShellCommand> cmd) const |
2073 | { |
2074 | auto lock = getLock(); |
2075 | shared->bridge_commands.emplace_back(std::move(cmd)); |
2076 | } |
2077 | |
2078 | |
2079 | IHostContextPtr & Context::getHostContext() |
2080 | { |
2081 | return host_context; |
2082 | } |
2083 | |
2084 | |
2085 | const IHostContextPtr & Context::getHostContext() const |
2086 | { |
2087 | return host_context; |
2088 | } |
2089 | |
2090 | |
2091 | std::shared_ptr<ActionLocksManager> Context::getActionLocksManager() |
2092 | { |
2093 | auto lock = getLock(); |
2094 | |
2095 | if (!shared->action_locks_manager) |
2096 | shared->action_locks_manager = std::make_shared<ActionLocksManager>(getGlobalContext()); |
2097 | |
2098 | return shared->action_locks_manager; |
2099 | } |
2100 | |
2101 | |
2102 | void Context::setExternalTablesInitializer(ExternalTablesInitializer && initializer) |
2103 | { |
2104 | if (external_tables_initializer_callback) |
2105 | throw Exception("External tables initializer is already set" , ErrorCodes::LOGICAL_ERROR); |
2106 | |
2107 | external_tables_initializer_callback = std::move(initializer); |
2108 | } |
2109 | |
2110 | void Context::initializeExternalTablesIfSet() |
2111 | { |
2112 | if (external_tables_initializer_callback) |
2113 | { |
2114 | external_tables_initializer_callback(*this); |
2115 | /// Reset callback |
2116 | external_tables_initializer_callback = {}; |
2117 | } |
2118 | } |
2119 | |
2120 | |
2121 | void Context::setInputInitializer(InputInitializer && initializer) |
2122 | { |
2123 | if (input_initializer_callback) |
2124 | throw Exception("Input initializer is already set" , ErrorCodes::LOGICAL_ERROR); |
2125 | |
2126 | input_initializer_callback = std::move(initializer); |
2127 | } |
2128 | |
2129 | |
2130 | void Context::initializeInput(const StoragePtr & input_storage) |
2131 | { |
2132 | if (!input_initializer_callback) |
2133 | throw Exception("Input initializer is not set" , ErrorCodes::LOGICAL_ERROR); |
2134 | |
2135 | input_initializer_callback(*this, input_storage); |
2136 | /// Reset callback |
2137 | input_initializer_callback = {}; |
2138 | } |
2139 | |
2140 | |
2141 | void Context::setInputBlocksReaderCallback(InputBlocksReader && reader) |
2142 | { |
2143 | if (input_blocks_reader) |
2144 | throw Exception("Input blocks reader is already set" , ErrorCodes::LOGICAL_ERROR); |
2145 | |
2146 | input_blocks_reader = std::move(reader); |
2147 | } |
2148 | |
2149 | |
2150 | InputBlocksReader Context::getInputBlocksReaderCallback() const |
2151 | { |
2152 | return input_blocks_reader; |
2153 | } |
2154 | |
2155 | |
2156 | void Context::resetInputCallbacks() |
2157 | { |
2158 | if (input_initializer_callback) |
2159 | input_initializer_callback = {}; |
2160 | |
2161 | if (input_blocks_reader) |
2162 | input_blocks_reader = {}; |
2163 | } |
2164 | |
2165 | |
2166 | SessionCleaner::~SessionCleaner() |
2167 | { |
2168 | try |
2169 | { |
2170 | { |
2171 | std::lock_guard lock{mutex}; |
2172 | quit = true; |
2173 | } |
2174 | |
2175 | cond.notify_one(); |
2176 | |
2177 | thread.join(); |
2178 | } |
2179 | catch (...) |
2180 | { |
2181 | DB::tryLogCurrentException(__PRETTY_FUNCTION__); |
2182 | } |
2183 | } |
2184 | |
2185 | void SessionCleaner::run() |
2186 | { |
2187 | setThreadName("SessionCleaner" ); |
2188 | |
2189 | std::unique_lock lock{mutex}; |
2190 | |
2191 | while (true) |
2192 | { |
2193 | auto interval = context.closeSessions(); |
2194 | |
2195 | if (cond.wait_for(lock, interval, [this]() -> bool { return quit; })) |
2196 | break; |
2197 | } |
2198 | } |
2199 | |
2200 | |
2201 | } |
2202 | |