1#include <Interpreters/InterpreterKillQueryQuery.h>
2#include <Parsers/ASTKillQueryQuery.h>
3#include <Parsers/queryToString.h>
4#include <Interpreters/Context.h>
5#include <Interpreters/DDLWorker.h>
6#include <Interpreters/ProcessList.h>
7#include <Interpreters/executeQuery.h>
8#include <Interpreters/CancellationCode.h>
9#include <Columns/ColumnString.h>
10#include <Common/typeid_cast.h>
11#include <DataTypes/DataTypeString.h>
12#include <Columns/ColumnsNumber.h>
13#include <DataTypes/DataTypesNumber.h>
14#include <DataStreams/OneBlockInputStream.h>
15#include <Storages/IStorage.h>
16#include <thread>
17#include <iostream>
18#include <cstddef>
19
20
21namespace DB
22{
23
24namespace ErrorCodes
25{
26 extern const int READONLY;
27 extern const int LOGICAL_ERROR;
28 extern const int CANNOT_KILL;
29}
30
31
32static const char * cancellationCodeToStatus(CancellationCode code)
33{
34 switch (code)
35 {
36 case CancellationCode::NotFound:
37 return "finished";
38 case CancellationCode::QueryIsNotInitializedYet:
39 return "pending";
40 case CancellationCode::CancelCannotBeSent:
41 return "cant_cancel";
42 case CancellationCode::CancelSent:
43 return "waiting";
44 default:
45 return "unknown_status";
46 }
47}
48
49
50struct QueryDescriptor
51{
52 String query_id;
53 String user;
54 size_t source_num;
55 bool processed = false;
56
57 QueryDescriptor(String && query_id_, String && user_, size_t source_num_, bool processed_ = false)
58 : query_id(std::move(query_id_)), user(std::move(user_)), source_num(source_num_), processed(processed_) {}
59};
60
61using QueryDescriptors = std::vector<QueryDescriptor>;
62
63
64static void insertResultRow(size_t n, CancellationCode code, const Block & source, const Block & header, MutableColumns & columns)
65{
66 columns[0]->insert(cancellationCodeToStatus(code));
67
68 for (size_t col_num = 1, size = columns.size(); col_num < size; ++col_num)
69 columns[col_num]->insertFrom(*source.getByName(header.getByPosition(col_num).name).column, n);
70}
71
72static QueryDescriptors extractQueriesExceptMeAndCheckAccess(const Block & processes_block, Context & context)
73{
74 QueryDescriptors res;
75 size_t num_processes = processes_block.rows();
76 res.reserve(num_processes);
77
78 const ColumnString & query_id_col = typeid_cast<const ColumnString &>(*processes_block.getByName("query_id").column);
79 const ColumnString & user_col = typeid_cast<const ColumnString &>(*processes_block.getByName("user").column);
80 const ClientInfo & my_client = context.getProcessListElement()->getClientInfo();
81
82 for (size_t i = 0; i < num_processes; ++i)
83 {
84 auto query_id = query_id_col.getDataAt(i).toString();
85 auto user = user_col.getDataAt(i).toString();
86
87 if (my_client.current_query_id == query_id && my_client.current_user == user)
88 continue;
89
90 if (context.getSettingsRef().readonly && my_client.current_user != user)
91 {
92 throw Exception("Readonly user " + my_client.current_user + " attempts to kill query created by " + user,
93 ErrorCodes::READONLY);
94 }
95
96 res.emplace_back(std::move(query_id), std::move(user), i, false);
97 }
98
99 return res;
100}
101
102
103
104class SyncKillQueryInputStream : public IBlockInputStream
105{
106public:
107 SyncKillQueryInputStream(ProcessList & process_list_, QueryDescriptors && processes_to_stop_, Block && processes_block_,
108 const Block & res_sample_block_)
109 : process_list(process_list_),
110 processes_to_stop(std::move(processes_to_stop_)),
111 processes_block(std::move(processes_block_)),
112 res_sample_block(res_sample_block_)
113 {
114 addTotalRowsApprox(processes_to_stop.size());
115 }
116
117 String getName() const override
118 {
119 return "SynchronousQueryKiller";
120 }
121
122 Block getHeader() const override { return res_sample_block; }
123
124 Block readImpl() override
125 {
126 size_t num_result_queries = processes_to_stop.size();
127
128 if (num_processed_queries >= num_result_queries)
129 return Block();
130
131 MutableColumns columns = res_sample_block.cloneEmptyColumns();
132
133 do
134 {
135 for (auto & curr_process : processes_to_stop)
136 {
137 if (curr_process.processed)
138 continue;
139
140 auto code = process_list.sendCancelToQuery(curr_process.query_id, curr_process.user, true);
141
142 if (code != CancellationCode::QueryIsNotInitializedYet && code != CancellationCode::CancelSent)
143 {
144 curr_process.processed = true;
145 insertResultRow(curr_process.source_num, code, processes_block, res_sample_block, columns);
146 ++num_processed_queries;
147 }
148 /// Wait if CancelSent
149 }
150
151 /// KILL QUERY could be killed also
152 if (isCancelled())
153 break;
154
155 /// Sleep if there are unprocessed queries
156 if (num_processed_queries < num_result_queries)
157 std::this_thread::sleep_for(std::chrono::milliseconds(100));
158
159 /// Don't produce empty block
160 } while (columns.empty() || columns[0]->empty());
161
162 return res_sample_block.cloneWithColumns(std::move(columns));
163 }
164
165 ProcessList & process_list;
166 QueryDescriptors processes_to_stop;
167 Block processes_block;
168 Block res_sample_block;
169 size_t num_processed_queries = 0;
170};
171
172
173BlockIO InterpreterKillQueryQuery::execute()
174{
175 const auto & query = query_ptr->as<ASTKillQueryQuery &>();
176
177 if (!query.cluster.empty())
178 return executeDDLQueryOnCluster(query_ptr, context, {"system"});
179
180 BlockIO res_io;
181 switch (query.type)
182 {
183 case ASTKillQueryQuery::Type::Query:
184 {
185 Block processes_block = getSelectResult("query_id, user, query", "system.processes");
186 if (!processes_block)
187 return res_io;
188
189 ProcessList & process_list = context.getProcessList();
190 QueryDescriptors queries_to_stop = extractQueriesExceptMeAndCheckAccess(processes_block, context);
191
192 auto header = processes_block.cloneEmpty();
193 header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
194
195 if (!query.sync || query.test)
196 {
197 MutableColumns res_columns = header.cloneEmptyColumns();
198
199 for (const auto & query_desc : queries_to_stop)
200 {
201 auto code = (query.test) ? CancellationCode::Unknown : process_list.sendCancelToQuery(query_desc.query_id, query_desc.user, true);
202 insertResultRow(query_desc.source_num, code, processes_block, header, res_columns);
203 }
204
205 res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
206 }
207 else
208 {
209 res_io.in = std::make_shared<SyncKillQueryInputStream>(
210 process_list, std::move(queries_to_stop), std::move(processes_block), header);
211 }
212
213 break;
214 }
215 case ASTKillQueryQuery::Type::Mutation:
216 {
217 if (context.getSettingsRef().readonly)
218 throw Exception("Cannot execute query in readonly mode", ErrorCodes::READONLY);
219
220 Block mutations_block = getSelectResult("database, table, mutation_id", "system.mutations");
221 if (!mutations_block)
222 return res_io;
223
224 const ColumnString & database_col = typeid_cast<const ColumnString &>(*mutations_block.getByName("database").column);
225 const ColumnString & table_col = typeid_cast<const ColumnString &>(*mutations_block.getByName("table").column);
226 const ColumnString & mutation_id_col = typeid_cast<const ColumnString &>(*mutations_block.getByName("mutation_id").column);
227
228 auto header = mutations_block.cloneEmpty();
229 header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
230
231 MutableColumns res_columns = header.cloneEmptyColumns();
232
233 for (size_t i = 0; i < mutations_block.rows(); ++i)
234 {
235 auto database_name = database_col.getDataAt(i).toString();
236 auto table_name = table_col.getDataAt(i).toString();
237 auto mutation_id = mutation_id_col.getDataAt(i).toString();
238
239 CancellationCode code = CancellationCode::Unknown;
240 if (!query.test)
241 {
242 auto storage = context.tryGetTable(database_name, table_name);
243 if (!storage)
244 code = CancellationCode::NotFound;
245 else
246 code = storage->killMutation(mutation_id);
247 }
248
249 insertResultRow(i, code, mutations_block, header, res_columns);
250 }
251
252 res_io.in = std::make_shared<OneBlockInputStream>(header.cloneWithColumns(std::move(res_columns)));
253
254 break;
255 }
256 }
257
258 return res_io;
259}
260
261Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const String & table)
262{
263 String select_query = "SELECT " + columns + " FROM " + table;
264 auto & where_expression = query_ptr->as<ASTKillQueryQuery>()->where_expression;
265 if (where_expression)
266 select_query += " WHERE " + queryToString(where_expression);
267
268 BlockIO block_io = executeQuery(select_query, context, true, QueryProcessingStage::Complete, false, false);
269 Block res = block_io.in->read();
270
271 if (res && block_io.in->read())
272 throw Exception("Expected one block from input stream", ErrorCodes::LOGICAL_ERROR);
273
274 return res;
275}
276
277}
278