1#include <Interpreters/ClusterProxy/executeQuery.h>
2#include <Interpreters/ClusterProxy/IStreamFactory.h>
3#include <Core/Settings.h>
4#include <Interpreters/Context.h>
5#include <Interpreters/Cluster.h>
6#include <Interpreters/IInterpreter.h>
7#include <Parsers/queryToString.h>
8#include <Interpreters/ProcessList.h>
9
10
11namespace DB
12{
13
14namespace ClusterProxy
15{
16
17Context removeUserRestrictionsFromSettings(const Context & context, const Settings & settings)
18{
19 Settings new_settings = settings;
20 new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
21
22 /// Does not matter on remote servers, because queries are sent under different user.
23 new_settings.max_concurrent_queries_for_user = 0;
24 new_settings.max_memory_usage_for_user = 0;
25 /// This setting is really not for user and should not be sent to remote server.
26 new_settings.max_memory_usage_for_all_queries = 0;
27
28 /// Set as unchanged to avoid sending to remote server.
29 new_settings.max_concurrent_queries_for_user.changed = false;
30 new_settings.max_memory_usage_for_user.changed = false;
31 new_settings.max_memory_usage_for_all_queries.changed = false;
32
33 Context new_context(context);
34 new_context.setSettings(new_settings);
35
36 return new_context;
37}
38
39BlockInputStreams executeQuery(
40 IStreamFactory & stream_factory, const ClusterPtr & cluster,
41 const ASTPtr & query_ast, const Context & context, const Settings & settings)
42{
43 BlockInputStreams res;
44
45 const std::string query = queryToString(query_ast);
46
47 Context new_context = removeUserRestrictionsFromSettings(context, settings);
48
49 ThrottlerPtr user_level_throttler;
50 if (auto process_list_element = context.getProcessListElement())
51 user_level_throttler = process_list_element->getUserNetworkThrottler();
52
53 /// Network bandwidth limit, if needed.
54 ThrottlerPtr throttler;
55 if (settings.max_network_bandwidth || settings.max_network_bytes)
56 {
57 throttler = std::make_shared<Throttler>(
58 settings.max_network_bandwidth,
59 settings.max_network_bytes,
60 "Limit for bytes to send or receive over network exceeded.",
61 user_level_throttler);
62 }
63 else
64 throttler = user_level_throttler;
65
66 for (const auto & shard_info : cluster->getShardsInfo())
67 stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);
68
69 return res;
70}
71
72}
73
74}
75