1 | #include "duckdb/main/client_context.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/scalar_function_catalog_entry.hpp" |
4 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
5 | #include "duckdb/catalog/catalog_search_path.hpp" |
6 | #include "duckdb/common/file_system.hpp" |
7 | #include "duckdb/common/http_state.hpp" |
8 | #include "duckdb/common/preserved_error.hpp" |
9 | #include "duckdb/common/progress_bar/progress_bar.hpp" |
10 | #include "duckdb/common/serializer/buffered_deserializer.hpp" |
11 | #include "duckdb/common/serializer/buffered_file_writer.hpp" |
12 | #include "duckdb/common/serializer/buffered_serializer.hpp" |
13 | #include "duckdb/common/types/column/column_data_collection.hpp" |
14 | #include "duckdb/execution/column_binding_resolver.hpp" |
15 | #include "duckdb/execution/operator/helper/physical_result_collector.hpp" |
16 | #include "duckdb/execution/physical_plan_generator.hpp" |
17 | #include "duckdb/main/appender.hpp" |
18 | #include "duckdb/main/attached_database.hpp" |
19 | #include "duckdb/main/client_context_file_opener.hpp" |
20 | #include "duckdb/main/client_data.hpp" |
21 | #include "duckdb/main/database.hpp" |
22 | #include "duckdb/main/database_manager.hpp" |
23 | #include "duckdb/main/error_manager.hpp" |
24 | #include "duckdb/main/materialized_query_result.hpp" |
25 | #include "duckdb/main/query_profiler.hpp" |
26 | #include "duckdb/main/query_result.hpp" |
27 | #include "duckdb/main/relation.hpp" |
28 | #include "duckdb/main/stream_query_result.hpp" |
29 | #include "duckdb/optimizer/optimizer.hpp" |
30 | #include "duckdb/parallel/task_scheduler.hpp" |
31 | #include "duckdb/parser/expression/constant_expression.hpp" |
32 | #include "duckdb/parser/expression/parameter_expression.hpp" |
33 | #include "duckdb/parser/parsed_data/create_function_info.hpp" |
34 | #include "duckdb/parser/parsed_expression_iterator.hpp" |
35 | #include "duckdb/parser/parser.hpp" |
36 | #include "duckdb/parser/query_node/select_node.hpp" |
37 | #include "duckdb/parser/statement/drop_statement.hpp" |
38 | #include "duckdb/parser/statement/execute_statement.hpp" |
39 | #include "duckdb/parser/statement/explain_statement.hpp" |
40 | #include "duckdb/parser/statement/prepare_statement.hpp" |
41 | #include "duckdb/parser/statement/relation_statement.hpp" |
42 | #include "duckdb/parser/statement/select_statement.hpp" |
43 | #include "duckdb/planner/operator/logical_execute.hpp" |
44 | #include "duckdb/planner/planner.hpp" |
45 | #include "duckdb/planner/pragma_handler.hpp" |
46 | #include "duckdb/storage/data_table.hpp" |
47 | #include "duckdb/transaction/meta_transaction.hpp" |
48 | #include "duckdb/transaction/transaction.hpp" |
49 | #include "duckdb/transaction/transaction_manager.hpp" |
50 | |
51 | namespace duckdb { |
52 | |
53 | struct ActiveQueryContext { |
54 | //! The query that is currently being executed |
55 | string query; |
56 | //! The currently open result |
57 | BaseQueryResult *open_result = nullptr; |
58 | //! Prepared statement data |
59 | shared_ptr<PreparedStatementData> prepared; |
60 | //! The query executor |
61 | unique_ptr<Executor> executor; |
62 | //! The progress bar |
63 | unique_ptr<ProgressBar> progress_bar; |
64 | }; |
65 | |
66 | ClientContext::ClientContext(shared_ptr<DatabaseInstance> database) |
67 | : db(std::move(database)), interrupted(false), client_data(make_uniq<ClientData>(args&: *this)), transaction(*this) { |
68 | } |
69 | |
70 | ClientContext::~ClientContext() { |
71 | if (Exception::UncaughtException()) { |
72 | return; |
73 | } |
74 | // destroy the client context and rollback if there is an active transaction |
75 | // but only if we are not destroying this client context as part of an exception stack unwind |
76 | Destroy(); |
77 | } |
78 | |
79 | unique_ptr<ClientContextLock> ClientContext::LockContext() { |
80 | return make_uniq<ClientContextLock>(args&: context_lock); |
81 | } |
82 | |
83 | void ClientContext::Destroy() { |
84 | auto lock = LockContext(); |
85 | if (transaction.HasActiveTransaction()) { |
86 | transaction.ResetActiveQuery(); |
87 | if (!transaction.IsAutoCommit()) { |
88 | transaction.Rollback(); |
89 | } |
90 | } |
91 | CleanupInternal(lock&: *lock); |
92 | } |
93 | |
94 | unique_ptr<DataChunk> ClientContext::Fetch(ClientContextLock &lock, StreamQueryResult &result) { |
95 | D_ASSERT(IsActiveResult(lock, &result)); |
96 | D_ASSERT(active_query->executor); |
97 | return FetchInternal(lock, executor&: *active_query->executor, result); |
98 | } |
99 | |
100 | unique_ptr<DataChunk> ClientContext::FetchInternal(ClientContextLock &lock, Executor &executor, |
101 | BaseQueryResult &result) { |
102 | bool invalidate_query = true; |
103 | try { |
104 | // fetch the chunk and return it |
105 | auto chunk = executor.FetchChunk(); |
106 | if (!chunk || chunk->size() == 0) { |
107 | CleanupInternal(lock, result: &result); |
108 | } |
109 | return chunk; |
110 | } catch (StandardException &ex) { |
111 | // standard exceptions do not invalidate the current transaction |
112 | result.SetError(PreservedError(ex)); |
113 | invalidate_query = false; |
114 | } catch (FatalException &ex) { |
115 | // fatal exceptions invalidate the entire database |
116 | result.SetError(PreservedError(ex)); |
117 | auto &db = DatabaseInstance::GetDatabase(context&: *this); |
118 | ValidChecker::Invalidate(o&: db, error: ex.what()); |
119 | } catch (const Exception &ex) { |
120 | result.SetError(PreservedError(ex)); |
121 | } catch (std::exception &ex) { |
122 | result.SetError(PreservedError(ex)); |
123 | } catch (...) { // LCOV_EXCL_START |
124 | result.SetError(PreservedError("Unhandled exception in FetchInternal" )); |
125 | } // LCOV_EXCL_STOP |
126 | CleanupInternal(lock, result: &result, invalidate_transaction: invalidate_query); |
127 | return nullptr; |
128 | } |
129 | |
130 | void ClientContext::BeginTransactionInternal(ClientContextLock &lock, bool requires_valid_transaction) { |
131 | // check if we are on AutoCommit. In this case we should start a transaction |
132 | D_ASSERT(!active_query); |
133 | auto &db = DatabaseInstance::GetDatabase(context&: *this); |
134 | if (ValidChecker::IsInvalidated(o&: db)) { |
135 | throw FatalException(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_DATABASE, |
136 | params: ValidChecker::InvalidatedMessage(o&: db))); |
137 | } |
138 | if (requires_valid_transaction && transaction.HasActiveTransaction() && |
139 | ValidChecker::IsInvalidated(o&: transaction.ActiveTransaction())) { |
140 | throw Exception(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_TRANSACTION)); |
141 | } |
142 | active_query = make_uniq<ActiveQueryContext>(); |
143 | if (transaction.IsAutoCommit()) { |
144 | transaction.BeginTransaction(); |
145 | } |
146 | } |
147 | |
148 | void ClientContext::BeginQueryInternal(ClientContextLock &lock, const string &query) { |
149 | BeginTransactionInternal(lock, requires_valid_transaction: false); |
150 | LogQueryInternal(lock, query); |
151 | active_query->query = query; |
152 | query_progress = -1; |
153 | transaction.SetActiveQuery(db->GetDatabaseManager().GetNewQueryNumber()); |
154 | } |
155 | |
156 | PreservedError ClientContext::EndQueryInternal(ClientContextLock &lock, bool success, bool invalidate_transaction) { |
157 | client_data->profiler->EndQuery(); |
158 | |
159 | if (client_data->http_state) { |
160 | client_data->http_state->Reset(); |
161 | } |
162 | |
163 | // Notify any registered state of query end |
164 | for (auto const &s : registered_state) { |
165 | s.second->QueryEnd(); |
166 | } |
167 | |
168 | D_ASSERT(active_query.get()); |
169 | active_query.reset(); |
170 | query_progress = -1; |
171 | PreservedError error; |
172 | try { |
173 | if (transaction.HasActiveTransaction()) { |
174 | // Move the query profiler into the history |
175 | auto &prev_profilers = client_data->query_profiler_history->GetPrevProfilers(); |
176 | prev_profilers.emplace_back(args: transaction.GetActiveQuery(), args: std::move(client_data->profiler)); |
177 | // Reinitialize the query profiler |
178 | client_data->profiler = make_shared<QueryProfiler>(args&: *this); |
179 | // Propagate settings of the saved query into the new profiler. |
180 | client_data->profiler->Propagate(qp&: *prev_profilers.back().second); |
181 | if (prev_profilers.size() >= client_data->query_profiler_history->GetPrevProfilersSize()) { |
182 | prev_profilers.pop_front(); |
183 | } |
184 | |
185 | transaction.ResetActiveQuery(); |
186 | if (transaction.IsAutoCommit()) { |
187 | if (success) { |
188 | transaction.Commit(); |
189 | } else { |
190 | transaction.Rollback(); |
191 | } |
192 | } else if (invalidate_transaction) { |
193 | D_ASSERT(!success); |
194 | ValidChecker::Invalidate(o&: ActiveTransaction(), error: "Failed to commit" ); |
195 | } |
196 | } |
197 | } catch (FatalException &ex) { |
198 | auto &db = DatabaseInstance::GetDatabase(context&: *this); |
199 | ValidChecker::Invalidate(o&: db, error: ex.what()); |
200 | error = PreservedError(ex); |
201 | } catch (const Exception &ex) { |
202 | error = PreservedError(ex); |
203 | } catch (std::exception &ex) { |
204 | error = PreservedError(ex); |
205 | } catch (...) { // LCOV_EXCL_START |
206 | error = PreservedError("Unhandled exception!" ); |
207 | } // LCOV_EXCL_STOP |
208 | return error; |
209 | } |
210 | |
211 | void ClientContext::CleanupInternal(ClientContextLock &lock, BaseQueryResult *result, bool invalidate_transaction) { |
212 | client_data->http_state = make_shared<HTTPState>(); |
213 | if (!active_query) { |
214 | // no query currently active |
215 | return; |
216 | } |
217 | if (active_query->executor) { |
218 | active_query->executor->CancelTasks(); |
219 | } |
220 | active_query->progress_bar.reset(); |
221 | |
222 | auto error = EndQueryInternal(lock, success: result ? !result->HasError() : false, invalidate_transaction); |
223 | if (result && !result->HasError()) { |
224 | // if an error occurred while committing report it in the result |
225 | result->SetError(error); |
226 | } |
227 | D_ASSERT(!active_query); |
228 | } |
229 | |
230 | Executor &ClientContext::GetExecutor() { |
231 | D_ASSERT(active_query); |
232 | D_ASSERT(active_query->executor); |
233 | return *active_query->executor; |
234 | } |
235 | |
236 | const string &ClientContext::GetCurrentQuery() { |
237 | D_ASSERT(active_query); |
238 | return active_query->query; |
239 | } |
240 | |
241 | unique_ptr<QueryResult> ClientContext::FetchResultInternal(ClientContextLock &lock, PendingQueryResult &pending) { |
242 | D_ASSERT(active_query); |
243 | D_ASSERT(active_query->open_result == &pending); |
244 | D_ASSERT(active_query->prepared); |
245 | auto &executor = GetExecutor(); |
246 | auto &prepared = *active_query->prepared; |
247 | bool create_stream_result = prepared.properties.allow_stream_result && pending.allow_stream_result; |
248 | if (create_stream_result) { |
249 | D_ASSERT(!executor.HasResultCollector()); |
250 | active_query->progress_bar.reset(); |
251 | query_progress = -1; |
252 | |
253 | // successfully compiled SELECT clause, and it is the last statement |
254 | // return a StreamQueryResult so the client can call Fetch() on it and stream the result |
255 | auto stream_result = make_uniq<StreamQueryResult>(args&: pending.statement_type, args&: pending.properties, |
256 | args: shared_from_this(), args&: pending.types, args&: pending.names); |
257 | active_query->open_result = stream_result.get(); |
258 | return std::move(stream_result); |
259 | } |
260 | unique_ptr<QueryResult> result; |
261 | if (executor.HasResultCollector()) { |
262 | // we have a result collector - fetch the result directly from the result collector |
263 | result = executor.GetResult(); |
264 | CleanupInternal(lock, result: result.get(), invalidate_transaction: false); |
265 | } else { |
266 | // no result collector - create a materialized result by continuously fetching |
267 | auto result_collection = make_uniq<ColumnDataCollection>(args&: Allocator::DefaultAllocator(), args&: pending.types); |
268 | D_ASSERT(!result_collection->Types().empty()); |
269 | auto materialized_result = |
270 | make_uniq<MaterializedQueryResult>(args&: pending.statement_type, args&: pending.properties, args&: pending.names, |
271 | args: std::move(result_collection), args: GetClientProperties()); |
272 | |
273 | auto &collection = materialized_result->Collection(); |
274 | D_ASSERT(!collection.Types().empty()); |
275 | ColumnDataAppendState append_state; |
276 | collection.InitializeAppend(state&: append_state); |
277 | while (true) { |
278 | auto chunk = FetchInternal(lock, executor&: GetExecutor(), result&: *materialized_result); |
279 | if (!chunk || chunk->size() == 0) { |
280 | break; |
281 | } |
282 | #ifdef DEBUG |
283 | for (idx_t i = 0; i < chunk->ColumnCount(); i++) { |
284 | if (pending.types[i].id() == LogicalTypeId::VARCHAR) { |
285 | chunk->data[i].UTFVerify(chunk->size()); |
286 | } |
287 | } |
288 | #endif |
289 | collection.Append(state&: append_state, new_chunk&: *chunk); |
290 | } |
291 | result = std::move(materialized_result); |
292 | } |
293 | return result; |
294 | } |
295 | |
296 | static bool IsExplainAnalyze(SQLStatement *statement) { |
297 | if (!statement) { |
298 | return false; |
299 | } |
300 | if (statement->type != StatementType::EXPLAIN_STATEMENT) { |
301 | return false; |
302 | } |
303 | auto &explain = statement->Cast<ExplainStatement>(); |
304 | return explain.explain_type == ExplainType::EXPLAIN_ANALYZE; |
305 | } |
306 | |
307 | shared_ptr<PreparedStatementData> ClientContext::CreatePreparedStatement(ClientContextLock &lock, const string &query, |
308 | unique_ptr<SQLStatement> statement, |
309 | vector<Value> *values) { |
310 | StatementType statement_type = statement->type; |
311 | auto result = make_shared<PreparedStatementData>(args&: statement_type); |
312 | |
313 | auto &profiler = QueryProfiler::Get(context&: *this); |
314 | profiler.StartQuery(query, is_explain_analyze: IsExplainAnalyze(statement: statement.get()), start_at_optimizer: true); |
315 | profiler.StartPhase(phase: "planner" ); |
316 | Planner planner(*this); |
317 | if (values) { |
318 | for (auto &value : *values) { |
319 | planner.parameter_data.emplace_back(args&: value); |
320 | } |
321 | } |
322 | |
323 | client_data->http_state = make_shared<HTTPState>(); |
324 | planner.CreatePlan(statement: std::move(statement)); |
325 | D_ASSERT(planner.plan || !planner.properties.bound_all_parameters); |
326 | profiler.EndPhase(); |
327 | |
328 | auto plan = std::move(planner.plan); |
329 | // extract the result column names from the plan |
330 | result->properties = planner.properties; |
331 | result->names = planner.names; |
332 | result->types = planner.types; |
333 | result->value_map = std::move(planner.value_map); |
334 | result->catalog_version = MetaTransaction::Get(context&: *this).catalog_version; |
335 | |
336 | if (!planner.properties.bound_all_parameters) { |
337 | return result; |
338 | } |
339 | #ifdef DEBUG |
340 | plan->Verify(*this); |
341 | #endif |
342 | if (config.enable_optimizer && plan->RequireOptimizer()) { |
343 | profiler.StartPhase(phase: "optimizer" ); |
344 | Optimizer optimizer(*planner.binder, *this); |
345 | plan = optimizer.Optimize(plan: std::move(plan)); |
346 | D_ASSERT(plan); |
347 | profiler.EndPhase(); |
348 | |
349 | #ifdef DEBUG |
350 | plan->Verify(*this); |
351 | #endif |
352 | } |
353 | |
354 | profiler.StartPhase(phase: "physical_planner" ); |
355 | // now convert logical query plan into a physical query plan |
356 | PhysicalPlanGenerator physical_planner(*this); |
357 | auto physical_plan = physical_planner.CreatePlan(logical: std::move(plan)); |
358 | profiler.EndPhase(); |
359 | |
360 | #ifdef DEBUG |
361 | D_ASSERT(!physical_plan->ToString().empty()); |
362 | #endif |
363 | result->plan = std::move(physical_plan); |
364 | return result; |
365 | } |
366 | |
367 | double ClientContext::GetProgress() { |
368 | return query_progress.load(); |
369 | } |
370 | |
371 | unique_ptr<PendingQueryResult> ClientContext::PendingPreparedStatement(ClientContextLock &lock, |
372 | shared_ptr<PreparedStatementData> statement_p, |
373 | PendingQueryParameters parameters) { |
374 | D_ASSERT(active_query); |
375 | auto &statement = *statement_p; |
376 | if (ValidChecker::IsInvalidated(o&: ActiveTransaction()) && statement.properties.requires_valid_transaction) { |
377 | throw Exception(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_TRANSACTION)); |
378 | } |
379 | auto &transaction = MetaTransaction::Get(context&: *this); |
380 | auto &manager = DatabaseManager::Get(db&: *this); |
381 | for (auto &modified_database : statement.properties.modified_databases) { |
382 | auto entry = manager.GetDatabase(context&: *this, name: modified_database); |
383 | if (!entry) { |
384 | throw InternalException("Database \"%s\" not found" , modified_database); |
385 | } |
386 | if (entry->IsReadOnly()) { |
387 | throw Exception(StringUtil::Format( |
388 | fmt_str: "Cannot execute statement of type \"%s\" on database \"%s\" which is attached in read-only mode!" , |
389 | params: StatementTypeToString(type: statement.statement_type), params: modified_database)); |
390 | } |
391 | transaction.ModifyDatabase(db&: *entry); |
392 | } |
393 | |
394 | // bind the bound values before execution |
395 | statement.Bind(values: parameters.parameters ? *parameters.parameters : vector<Value>()); |
396 | |
397 | active_query->executor = make_uniq<Executor>(args&: *this); |
398 | auto &executor = *active_query->executor; |
399 | if (config.enable_progress_bar) { |
400 | progress_bar_display_create_func_t display_create_func = nullptr; |
401 | if (config.print_progress_bar) { |
402 | // If a custom display is set, use that, otherwise just use the default |
403 | display_create_func = |
404 | config.display_create_func ? config.display_create_func : ProgressBar::DefaultProgressBarDisplay; |
405 | } |
406 | active_query->progress_bar = make_uniq<ProgressBar>(args&: executor, args&: config.wait_time, args&: display_create_func); |
407 | active_query->progress_bar->Start(); |
408 | query_progress = 0; |
409 | } |
410 | auto stream_result = parameters.allow_stream_result && statement.properties.allow_stream_result; |
411 | if (!stream_result && statement.properties.return_type == StatementReturnType::QUERY_RESULT) { |
412 | unique_ptr<PhysicalResultCollector> collector; |
413 | auto &config = ClientConfig::GetConfig(context&: *this); |
414 | auto get_method = |
415 | config.result_collector ? config.result_collector : PhysicalResultCollector::GetResultCollector; |
416 | collector = get_method(*this, statement); |
417 | D_ASSERT(collector->type == PhysicalOperatorType::RESULT_COLLECTOR); |
418 | executor.Initialize(physical_plan: std::move(collector)); |
419 | } else { |
420 | executor.Initialize(physical_plan&: *statement.plan); |
421 | } |
422 | auto types = executor.GetTypes(); |
423 | D_ASSERT(types == statement.types); |
424 | D_ASSERT(!active_query->open_result); |
425 | |
426 | auto pending_result = |
427 | make_uniq<PendingQueryResult>(args: shared_from_this(), args&: *statement_p, args: std::move(types), args&: stream_result); |
428 | active_query->prepared = std::move(statement_p); |
429 | active_query->open_result = pending_result.get(); |
430 | return pending_result; |
431 | } |
432 | |
433 | PendingExecutionResult ClientContext::ExecuteTaskInternal(ClientContextLock &lock, PendingQueryResult &result) { |
434 | D_ASSERT(active_query); |
435 | D_ASSERT(active_query->open_result == &result); |
436 | try { |
437 | auto result = active_query->executor->ExecuteTask(); |
438 | if (active_query->progress_bar) { |
439 | active_query->progress_bar->Update(final: result == PendingExecutionResult::RESULT_READY); |
440 | query_progress = active_query->progress_bar->GetCurrentPercentage(); |
441 | } |
442 | return result; |
443 | } catch (FatalException &ex) { |
444 | // fatal exceptions invalidate the entire database |
445 | result.SetError(PreservedError(ex)); |
446 | auto &db = DatabaseInstance::GetDatabase(context&: *this); |
447 | ValidChecker::Invalidate(o&: db, error: ex.what()); |
448 | } catch (const Exception &ex) { |
449 | result.SetError(PreservedError(ex)); |
450 | } catch (std::exception &ex) { |
451 | result.SetError(PreservedError(ex)); |
452 | } catch (...) { // LCOV_EXCL_START |
453 | result.SetError(PreservedError("Unhandled exception in ExecuteTaskInternal" )); |
454 | } // LCOV_EXCL_STOP |
455 | EndQueryInternal(lock, success: false, invalidate_transaction: true); |
456 | return PendingExecutionResult::EXECUTION_ERROR; |
457 | } |
458 | |
459 | void ClientContext::InitialCleanup(ClientContextLock &lock) { |
460 | //! Cleanup any open results and reset the interrupted flag |
461 | CleanupInternal(lock); |
462 | interrupted = false; |
463 | } |
464 | |
465 | vector<unique_ptr<SQLStatement>> ClientContext::ParseStatements(const string &query) { |
466 | auto lock = LockContext(); |
467 | return ParseStatementsInternal(lock&: *lock, query); |
468 | } |
469 | |
470 | vector<unique_ptr<SQLStatement>> ClientContext::ParseStatementsInternal(ClientContextLock &lock, const string &query) { |
471 | Parser parser(GetParserOptions()); |
472 | parser.ParseQuery(query); |
473 | |
474 | PragmaHandler handler(*this); |
475 | handler.HandlePragmaStatements(lock, statements&: parser.statements); |
476 | |
477 | return std::move(parser.statements); |
478 | } |
479 | |
480 | void ClientContext::HandlePragmaStatements(vector<unique_ptr<SQLStatement>> &statements) { |
481 | auto lock = LockContext(); |
482 | |
483 | PragmaHandler handler(*this); |
484 | handler.HandlePragmaStatements(lock&: *lock, statements); |
485 | } |
486 | |
487 | unique_ptr<LogicalOperator> ClientContext::(const string &query) { |
488 | auto lock = LockContext(); |
489 | |
490 | auto statements = ParseStatementsInternal(lock&: *lock, query); |
491 | if (statements.size() != 1) { |
492 | throw Exception("ExtractPlan can only prepare a single statement" ); |
493 | } |
494 | |
495 | unique_ptr<LogicalOperator> plan; |
496 | client_data->http_state = make_shared<HTTPState>(); |
497 | RunFunctionInTransactionInternal(lock&: *lock, fun: [&]() { |
498 | Planner planner(*this); |
499 | planner.CreatePlan(statement: std::move(statements[0])); |
500 | D_ASSERT(planner.plan); |
501 | |
502 | plan = std::move(planner.plan); |
503 | |
504 | if (config.enable_optimizer) { |
505 | Optimizer optimizer(*planner.binder, *this); |
506 | plan = optimizer.Optimize(plan: std::move(plan)); |
507 | } |
508 | |
509 | ColumnBindingResolver resolver; |
510 | resolver.Verify(op&: *plan); |
511 | resolver.VisitOperator(op&: *plan); |
512 | |
513 | plan->ResolveOperatorTypes(); |
514 | }); |
515 | return plan; |
516 | } |
517 | |
518 | unique_ptr<PreparedStatement> ClientContext::PrepareInternal(ClientContextLock &lock, |
519 | unique_ptr<SQLStatement> statement) { |
520 | auto n_param = statement->n_param; |
521 | auto named_param_map = std::move(statement->named_param_map); |
522 | auto statement_query = statement->query; |
523 | shared_ptr<PreparedStatementData> prepared_data; |
524 | auto unbound_statement = statement->Copy(); |
525 | RunFunctionInTransactionInternal( |
526 | lock, fun: [&]() { prepared_data = CreatePreparedStatement(lock, query: statement_query, statement: std::move(statement)); }, requires_valid_transaction: false); |
527 | prepared_data->unbound_statement = std::move(unbound_statement); |
528 | return make_uniq<PreparedStatement>(args: shared_from_this(), args: std::move(prepared_data), args: std::move(statement_query), |
529 | args&: n_param, args: std::move(named_param_map)); |
530 | } |
531 | |
532 | unique_ptr<PreparedStatement> ClientContext::Prepare(unique_ptr<SQLStatement> statement) { |
533 | auto lock = LockContext(); |
534 | // prepare the query |
535 | try { |
536 | InitialCleanup(lock&: *lock); |
537 | return PrepareInternal(lock&: *lock, statement: std::move(statement)); |
538 | } catch (const Exception &ex) { |
539 | return make_uniq<PreparedStatement>(args: PreservedError(ex)); |
540 | } catch (std::exception &ex) { |
541 | return make_uniq<PreparedStatement>(args: PreservedError(ex)); |
542 | } |
543 | } |
544 | |
545 | unique_ptr<PreparedStatement> ClientContext::Prepare(const string &query) { |
546 | auto lock = LockContext(); |
547 | // prepare the query |
548 | try { |
549 | InitialCleanup(lock&: *lock); |
550 | |
551 | // first parse the query |
552 | auto statements = ParseStatementsInternal(lock&: *lock, query); |
553 | if (statements.empty()) { |
554 | throw Exception("No statement to prepare!" ); |
555 | } |
556 | if (statements.size() > 1) { |
557 | throw Exception("Cannot prepare multiple statements at once!" ); |
558 | } |
559 | return PrepareInternal(lock&: *lock, statement: std::move(statements[0])); |
560 | } catch (const Exception &ex) { |
561 | return make_uniq<PreparedStatement>(args: PreservedError(ex)); |
562 | } catch (std::exception &ex) { |
563 | return make_uniq<PreparedStatement>(args: PreservedError(ex)); |
564 | } |
565 | } |
566 | |
567 | unique_ptr<PendingQueryResult> ClientContext::PendingQueryPreparedInternal(ClientContextLock &lock, const string &query, |
568 | shared_ptr<PreparedStatementData> &prepared, |
569 | PendingQueryParameters parameters) { |
570 | try { |
571 | InitialCleanup(lock); |
572 | } catch (const Exception &ex) { |
573 | return make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
574 | } catch (std::exception &ex) { |
575 | return make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
576 | } |
577 | return PendingStatementOrPreparedStatementInternal(lock, query, statement: nullptr, prepared, parameters); |
578 | } |
579 | |
580 | unique_ptr<PendingQueryResult> ClientContext::PendingQuery(const string &query, |
581 | shared_ptr<PreparedStatementData> &prepared, |
582 | PendingQueryParameters parameters) { |
583 | auto lock = LockContext(); |
584 | return PendingQueryPreparedInternal(lock&: *lock, query, prepared, parameters); |
585 | } |
586 | |
587 | unique_ptr<QueryResult> ClientContext::Execute(const string &query, shared_ptr<PreparedStatementData> &prepared, |
588 | PendingQueryParameters parameters) { |
589 | auto lock = LockContext(); |
590 | auto pending = PendingQueryPreparedInternal(lock&: *lock, query, prepared, parameters); |
591 | if (pending->HasError()) { |
592 | return make_uniq<MaterializedQueryResult>(args&: pending->GetErrorObject()); |
593 | } |
594 | return pending->ExecuteInternal(lock&: *lock); |
595 | } |
596 | |
597 | unique_ptr<QueryResult> ClientContext::Execute(const string &query, shared_ptr<PreparedStatementData> &prepared, |
598 | vector<Value> &values, bool allow_stream_result) { |
599 | PendingQueryParameters parameters; |
600 | parameters.parameters = &values; |
601 | parameters.allow_stream_result = allow_stream_result; |
602 | return Execute(query, prepared, parameters); |
603 | } |
604 | |
605 | unique_ptr<PendingQueryResult> ClientContext::PendingStatementInternal(ClientContextLock &lock, const string &query, |
606 | unique_ptr<SQLStatement> statement, |
607 | PendingQueryParameters parameters) { |
608 | // prepare the query for execution |
609 | auto prepared = CreatePreparedStatement(lock, query, statement: std::move(statement), values: parameters.parameters); |
610 | if (prepared->properties.parameter_count > 0 && !parameters.parameters) { |
611 | string error_message = StringUtil::Format(fmt_str: "Expected %lld parameters, but none were supplied" , |
612 | params: prepared->properties.parameter_count); |
613 | return make_uniq<PendingQueryResult>(args: PreservedError(error_message)); |
614 | } |
615 | if (!prepared->properties.bound_all_parameters) { |
616 | return make_uniq<PendingQueryResult>(args: PreservedError("Not all parameters were bound" )); |
617 | } |
618 | // execute the prepared statement |
619 | return PendingPreparedStatement(lock, statement_p: std::move(prepared), parameters); |
620 | } |
621 | |
622 | unique_ptr<QueryResult> ClientContext::RunStatementInternal(ClientContextLock &lock, const string &query, |
623 | unique_ptr<SQLStatement> statement, |
624 | bool allow_stream_result, bool verify) { |
625 | PendingQueryParameters parameters; |
626 | parameters.allow_stream_result = allow_stream_result; |
627 | auto pending = PendingQueryInternal(lock, statement: std::move(statement), parameters, verify); |
628 | if (pending->HasError()) { |
629 | return make_uniq<MaterializedQueryResult>(args&: pending->GetErrorObject()); |
630 | } |
631 | return ExecutePendingQueryInternal(lock, query&: *pending); |
632 | } |
633 | |
634 | bool ClientContext::IsActiveResult(ClientContextLock &lock, BaseQueryResult *result) { |
635 | if (!active_query) { |
636 | return false; |
637 | } |
638 | return active_query->open_result == result; |
639 | } |
640 | |
641 | unique_ptr<PendingQueryResult> ClientContext::PendingStatementOrPreparedStatementInternal( |
642 | ClientContextLock &lock, const string &query, unique_ptr<SQLStatement> statement, |
643 | shared_ptr<PreparedStatementData> &prepared, PendingQueryParameters parameters) { |
644 | // check if we are on AutoCommit. In this case we should start a transaction. |
645 | if (statement && config.AnyVerification()) { |
646 | // query verification is enabled |
647 | // create a copy of the statement, and use the copy |
648 | // this way we verify that the copy correctly copies all properties |
649 | auto copied_statement = statement->Copy(); |
650 | switch (statement->type) { |
651 | case StatementType::SELECT_STATEMENT: { |
652 | // in case this is a select query, we verify the original statement |
653 | PreservedError error; |
654 | try { |
655 | error = VerifyQuery(lock, query, statement: std::move(statement)); |
656 | } catch (const Exception &ex) { |
657 | error = PreservedError(ex); |
658 | } catch (std::exception &ex) { |
659 | error = PreservedError(ex); |
660 | } |
661 | if (error) { |
662 | // error in verifying query |
663 | return make_uniq<PendingQueryResult>(args&: error); |
664 | } |
665 | statement = std::move(copied_statement); |
666 | break; |
667 | } |
668 | #ifndef DUCKDB_ALTERNATIVE_VERIFY |
669 | case StatementType::COPY_STATEMENT: |
670 | case StatementType::INSERT_STATEMENT: |
671 | case StatementType::DELETE_STATEMENT: |
672 | case StatementType::UPDATE_STATEMENT: { |
673 | Parser parser; |
674 | PreservedError error; |
675 | try { |
676 | parser.ParseQuery(query: statement->ToString()); |
677 | } catch (const Exception &ex) { |
678 | error = PreservedError(ex); |
679 | } catch (std::exception &ex) { |
680 | error = PreservedError(ex); |
681 | } |
682 | if (error) { |
683 | // error in verifying query |
684 | return make_uniq<PendingQueryResult>(args&: error); |
685 | } |
686 | statement = std::move(parser.statements[0]); |
687 | break; |
688 | } |
689 | #endif |
690 | default: |
691 | statement = std::move(copied_statement); |
692 | break; |
693 | } |
694 | } |
695 | return PendingStatementOrPreparedStatement(lock, query, statement: std::move(statement), prepared, parameters); |
696 | } |
697 | |
698 | unique_ptr<PendingQueryResult> ClientContext::PendingStatementOrPreparedStatement( |
699 | ClientContextLock &lock, const string &query, unique_ptr<SQLStatement> statement, |
700 | shared_ptr<PreparedStatementData> &prepared, PendingQueryParameters parameters) { |
701 | unique_ptr<PendingQueryResult> result; |
702 | |
703 | try { |
704 | BeginQueryInternal(lock, query); |
705 | } catch (FatalException &ex) { |
706 | // fatal exceptions invalidate the entire database |
707 | auto &db = DatabaseInstance::GetDatabase(context&: *this); |
708 | ValidChecker::Invalidate(o&: db, error: ex.what()); |
709 | result = make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
710 | return result; |
711 | } catch (const Exception &ex) { |
712 | return make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
713 | } catch (std::exception &ex) { |
714 | return make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
715 | } |
716 | // start the profiler |
717 | auto &profiler = QueryProfiler::Get(context&: *this); |
718 | profiler.StartQuery(query, is_explain_analyze: IsExplainAnalyze(statement: statement ? statement.get() : prepared->unbound_statement.get())); |
719 | |
720 | bool invalidate_query = true; |
721 | try { |
722 | if (statement) { |
723 | result = PendingStatementInternal(lock, query, statement: std::move(statement), parameters); |
724 | } else { |
725 | if (prepared->RequireRebind(context&: *this, values: *parameters.parameters)) { |
726 | // catalog was modified: rebind the statement before execution |
727 | auto new_prepared = |
728 | CreatePreparedStatement(lock, query, statement: prepared->unbound_statement->Copy(), values: parameters.parameters); |
729 | D_ASSERT(new_prepared->properties.bound_all_parameters); |
730 | new_prepared->unbound_statement = std::move(prepared->unbound_statement); |
731 | prepared = std::move(new_prepared); |
732 | prepared->properties.bound_all_parameters = false; |
733 | } |
734 | result = PendingPreparedStatement(lock, statement_p: prepared, parameters); |
735 | } |
736 | } catch (StandardException &ex) { |
737 | // standard exceptions do not invalidate the current transaction |
738 | result = make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
739 | invalidate_query = false; |
740 | } catch (FatalException &ex) { |
741 | // fatal exceptions invalidate the entire database |
742 | if (!config.query_verification_enabled) { |
743 | auto &db = DatabaseInstance::GetDatabase(context&: *this); |
744 | ValidChecker::Invalidate(o&: db, error: ex.what()); |
745 | } |
746 | result = make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
747 | } catch (const Exception &ex) { |
748 | // other types of exceptions do invalidate the current transaction |
749 | result = make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
750 | } catch (std::exception &ex) { |
751 | // other types of exceptions do invalidate the current transaction |
752 | result = make_uniq<PendingQueryResult>(args: PreservedError(ex)); |
753 | } |
754 | if (result->HasError()) { |
755 | // query failed: abort now |
756 | EndQueryInternal(lock, success: false, invalidate_transaction: invalidate_query); |
757 | return result; |
758 | } |
759 | D_ASSERT(active_query->open_result == result.get()); |
760 | return result; |
761 | } |
762 | |
763 | void ClientContext::LogQueryInternal(ClientContextLock &, const string &query) { |
764 | if (!client_data->log_query_writer) { |
765 | #ifdef DUCKDB_FORCE_QUERY_LOG |
766 | try { |
767 | string log_path(DUCKDB_FORCE_QUERY_LOG); |
768 | client_data->log_query_writer = |
769 | make_uniq<BufferedFileWriter>(FileSystem::GetFileSystem(*this), log_path, |
770 | BufferedFileWriter::DEFAULT_OPEN_FLAGS, client_data->file_opener.get()); |
771 | } catch (...) { |
772 | return; |
773 | } |
774 | #else |
775 | return; |
776 | #endif |
777 | } |
778 | // log query path is set: log the query |
779 | client_data->log_query_writer->WriteData(buffer: const_data_ptr_cast(src: query.c_str()), write_size: query.size()); |
780 | client_data->log_query_writer->WriteData(buffer: const_data_ptr_cast(src: "\n" ), write_size: 1); |
781 | client_data->log_query_writer->Flush(); |
782 | client_data->log_query_writer->Sync(); |
783 | } |
784 | |
785 | unique_ptr<QueryResult> ClientContext::Query(unique_ptr<SQLStatement> statement, bool allow_stream_result) { |
786 | auto pending_query = PendingQuery(statement: std::move(statement), allow_stream_result); |
787 | if (pending_query->HasError()) { |
788 | return make_uniq<MaterializedQueryResult>(args&: pending_query->GetErrorObject()); |
789 | } |
790 | return pending_query->Execute(); |
791 | } |
792 | |
793 | unique_ptr<QueryResult> ClientContext::Query(const string &query, bool allow_stream_result) { |
794 | auto lock = LockContext(); |
795 | |
796 | PreservedError error; |
797 | vector<unique_ptr<SQLStatement>> statements; |
798 | if (!ParseStatements(lock&: *lock, query, result&: statements, error)) { |
799 | return make_uniq<MaterializedQueryResult>(args: std::move(error)); |
800 | } |
801 | if (statements.empty()) { |
802 | // no statements, return empty successful result |
803 | StatementProperties properties; |
804 | vector<string> names; |
805 | auto collection = make_uniq<ColumnDataCollection>(args&: Allocator::DefaultAllocator()); |
806 | return make_uniq<MaterializedQueryResult>(args: StatementType::INVALID_STATEMENT, args&: properties, args: std::move(names), |
807 | args: std::move(collection), args: GetClientProperties()); |
808 | } |
809 | |
810 | unique_ptr<QueryResult> result; |
811 | QueryResult *last_result = nullptr; |
812 | bool last_had_result = false; |
813 | for (idx_t i = 0; i < statements.size(); i++) { |
814 | auto &statement = statements[i]; |
815 | bool is_last_statement = i + 1 == statements.size(); |
816 | PendingQueryParameters parameters; |
817 | parameters.allow_stream_result = allow_stream_result && is_last_statement; |
818 | auto pending_query = PendingQueryInternal(lock&: *lock, statement: std::move(statement), parameters); |
819 | auto has_result = pending_query->properties.return_type == StatementReturnType::QUERY_RESULT; |
820 | unique_ptr<QueryResult> current_result; |
821 | if (pending_query->HasError()) { |
822 | current_result = make_uniq<MaterializedQueryResult>(args&: pending_query->GetErrorObject()); |
823 | } else { |
824 | current_result = ExecutePendingQueryInternal(lock&: *lock, query&: *pending_query); |
825 | } |
826 | // now append the result to the list of results |
827 | if (!last_result || !last_had_result) { |
828 | // first result of the query |
829 | result = std::move(current_result); |
830 | last_result = result.get(); |
831 | last_had_result = has_result; |
832 | } else { |
833 | // later results; attach to the result chain |
834 | // but only if there is a result |
835 | if (!has_result) { |
836 | continue; |
837 | } |
838 | last_result->next = std::move(current_result); |
839 | last_result = last_result->next.get(); |
840 | } |
841 | } |
842 | return result; |
843 | } |
844 | |
845 | bool ClientContext::ParseStatements(ClientContextLock &lock, const string &query, |
846 | vector<unique_ptr<SQLStatement>> &result, PreservedError &error) { |
847 | try { |
848 | InitialCleanup(lock); |
849 | // parse the query and transform it into a set of statements |
850 | result = ParseStatementsInternal(lock, query); |
851 | return true; |
852 | } catch (const Exception &ex) { |
853 | error = PreservedError(ex); |
854 | return false; |
855 | } catch (std::exception &ex) { |
856 | error = PreservedError(ex); |
857 | return false; |
858 | } |
859 | } |
860 | |
861 | unique_ptr<PendingQueryResult> ClientContext::PendingQuery(const string &query, bool allow_stream_result) { |
862 | auto lock = LockContext(); |
863 | |
864 | PreservedError error; |
865 | vector<unique_ptr<SQLStatement>> statements; |
866 | if (!ParseStatements(lock&: *lock, query, result&: statements, error)) { |
867 | return make_uniq<PendingQueryResult>(args: std::move(error)); |
868 | } |
869 | if (statements.size() != 1) { |
870 | return make_uniq<PendingQueryResult>(args: PreservedError("PendingQuery can only take a single statement" )); |
871 | } |
872 | PendingQueryParameters parameters; |
873 | parameters.allow_stream_result = allow_stream_result; |
874 | return PendingQueryInternal(lock&: *lock, statement: std::move(statements[0]), parameters); |
875 | } |
876 | |
877 | unique_ptr<PendingQueryResult> ClientContext::PendingQuery(unique_ptr<SQLStatement> statement, |
878 | bool allow_stream_result) { |
879 | auto lock = LockContext(); |
880 | PendingQueryParameters parameters; |
881 | parameters.allow_stream_result = allow_stream_result; |
882 | return PendingQueryInternal(lock&: *lock, statement: std::move(statement), parameters); |
883 | } |
884 | |
885 | unique_ptr<PendingQueryResult> ClientContext::PendingQueryInternal(ClientContextLock &lock, |
886 | unique_ptr<SQLStatement> statement, |
887 | PendingQueryParameters parameters, bool verify) { |
888 | auto query = statement->query; |
889 | shared_ptr<PreparedStatementData> prepared; |
890 | if (verify) { |
891 | return PendingStatementOrPreparedStatementInternal(lock, query, statement: std::move(statement), prepared, parameters); |
892 | } else { |
893 | return PendingStatementOrPreparedStatement(lock, query, statement: std::move(statement), prepared, parameters); |
894 | } |
895 | } |
896 | |
897 | unique_ptr<QueryResult> ClientContext::ExecutePendingQueryInternal(ClientContextLock &lock, PendingQueryResult &query) { |
898 | return query.ExecuteInternal(lock); |
899 | } |
900 | |
901 | void ClientContext::Interrupt() { |
902 | interrupted = true; |
903 | } |
904 | |
905 | void ClientContext::EnableProfiling() { |
906 | auto lock = LockContext(); |
907 | auto &config = ClientConfig::GetConfig(context&: *this); |
908 | config.enable_profiler = true; |
909 | config.emit_profiler_output = true; |
910 | } |
911 | |
912 | void ClientContext::DisableProfiling() { |
913 | auto lock = LockContext(); |
914 | auto &config = ClientConfig::GetConfig(context&: *this); |
915 | config.enable_profiler = false; |
916 | } |
917 | |
918 | void ClientContext::RegisterFunction(CreateFunctionInfo &info) { |
919 | RunFunctionInTransaction(fun: [&]() { |
920 | auto existing_function = Catalog::GetEntry<ScalarFunctionCatalogEntry>(context&: *this, INVALID_CATALOG, schema_name: info.schema, |
921 | name: info.name, if_not_found: OnEntryNotFound::RETURN_NULL); |
922 | if (existing_function) { |
923 | auto &new_info = info.Cast<CreateScalarFunctionInfo>(); |
924 | if (new_info.functions.MergeFunctionSet(new_functions: existing_function->functions)) { |
925 | // function info was updated from catalog entry, rewrite is needed |
926 | info.on_conflict = OnCreateConflict::REPLACE_ON_CONFLICT; |
927 | } |
928 | } |
929 | // create function |
930 | auto &catalog = Catalog::GetSystemCatalog(context&: *this); |
931 | catalog.CreateFunction(context&: *this, info); |
932 | }); |
933 | } |
934 | |
935 | void ClientContext::RunFunctionInTransactionInternal(ClientContextLock &lock, const std::function<void(void)> &fun, |
936 | bool requires_valid_transaction) { |
937 | if (requires_valid_transaction && transaction.HasActiveTransaction() && |
938 | ValidChecker::IsInvalidated(o&: ActiveTransaction())) { |
939 | throw TransactionException(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_TRANSACTION)); |
940 | } |
941 | // check if we are on AutoCommit. In this case we should start a transaction |
942 | bool require_new_transaction = transaction.IsAutoCommit() && !transaction.HasActiveTransaction(); |
943 | if (require_new_transaction) { |
944 | D_ASSERT(!active_query); |
945 | transaction.BeginTransaction(); |
946 | } |
947 | try { |
948 | fun(); |
949 | } catch (StandardException &ex) { |
950 | if (require_new_transaction) { |
951 | transaction.Rollback(); |
952 | } |
953 | throw; |
954 | } catch (FatalException &ex) { |
955 | auto &db = DatabaseInstance::GetDatabase(context&: *this); |
956 | ValidChecker::Invalidate(o&: db, error: ex.what()); |
957 | throw; |
958 | } catch (std::exception &ex) { |
959 | if (require_new_transaction) { |
960 | transaction.Rollback(); |
961 | } else { |
962 | ValidChecker::Invalidate(o&: ActiveTransaction(), error: ex.what()); |
963 | } |
964 | throw; |
965 | } |
966 | if (require_new_transaction) { |
967 | transaction.Commit(); |
968 | } |
969 | } |
970 | |
971 | void ClientContext::RunFunctionInTransaction(const std::function<void(void)> &fun, bool requires_valid_transaction) { |
972 | auto lock = LockContext(); |
973 | RunFunctionInTransactionInternal(lock&: *lock, fun, requires_valid_transaction); |
974 | } |
975 | |
976 | unique_ptr<TableDescription> ClientContext::TableInfo(const string &schema_name, const string &table_name) { |
977 | unique_ptr<TableDescription> result; |
978 | RunFunctionInTransaction(fun: [&]() { |
979 | // obtain the table info |
980 | auto table = Catalog::GetEntry<TableCatalogEntry>(context&: *this, INVALID_CATALOG, schema_name, name: table_name, |
981 | if_not_found: OnEntryNotFound::RETURN_NULL); |
982 | if (!table) { |
983 | return; |
984 | } |
985 | // write the table info to the result |
986 | result = make_uniq<TableDescription>(); |
987 | result->schema = schema_name; |
988 | result->table = table_name; |
989 | for (auto &column : table->GetColumns().Logical()) { |
990 | result->columns.emplace_back(args: column.Name(), args: column.Type()); |
991 | } |
992 | }); |
993 | return result; |
994 | } |
995 | |
996 | void ClientContext::Append(TableDescription &description, ColumnDataCollection &collection) { |
997 | RunFunctionInTransaction(fun: [&]() { |
998 | auto &table_entry = |
999 | Catalog::GetEntry<TableCatalogEntry>(context&: *this, INVALID_CATALOG, schema_name: description.schema, name: description.table); |
1000 | // verify that the table columns and types match up |
1001 | if (description.columns.size() != table_entry.GetColumns().PhysicalColumnCount()) { |
1002 | throw Exception("Failed to append: table entry has different number of columns!" ); |
1003 | } |
1004 | for (idx_t i = 0; i < description.columns.size(); i++) { |
1005 | if (description.columns[i].Type() != table_entry.GetColumns().GetColumn(index: PhysicalIndex(i)).Type()) { |
1006 | throw Exception("Failed to append: table entry has different number of columns!" ); |
1007 | } |
1008 | } |
1009 | table_entry.GetStorage().LocalAppend(table&: table_entry, context&: *this, collection); |
1010 | }); |
1011 | } |
1012 | |
1013 | void ClientContext::TryBindRelation(Relation &relation, vector<ColumnDefinition> &result_columns) { |
1014 | #ifdef DEBUG |
1015 | D_ASSERT(!relation.GetAlias().empty()); |
1016 | D_ASSERT(!relation.ToString().empty()); |
1017 | #endif |
1018 | client_data->http_state = make_shared<HTTPState>(); |
1019 | RunFunctionInTransaction(fun: [&]() { |
1020 | // bind the expressions |
1021 | auto binder = Binder::CreateBinder(context&: *this); |
1022 | auto result = relation.Bind(binder&: *binder); |
1023 | D_ASSERT(result.names.size() == result.types.size()); |
1024 | |
1025 | result_columns.reserve(n: result_columns.size() + result.names.size()); |
1026 | for (idx_t i = 0; i < result.names.size(); i++) { |
1027 | result_columns.emplace_back(args&: result.names[i], args&: result.types[i]); |
1028 | } |
1029 | }); |
1030 | } |
1031 | |
1032 | unordered_set<string> ClientContext::GetTableNames(const string &query) { |
1033 | auto lock = LockContext(); |
1034 | |
1035 | auto statements = ParseStatementsInternal(lock&: *lock, query); |
1036 | if (statements.size() != 1) { |
1037 | throw InvalidInputException("Expected a single statement" ); |
1038 | } |
1039 | |
1040 | unordered_set<string> result; |
1041 | RunFunctionInTransactionInternal(lock&: *lock, fun: [&]() { |
1042 | // bind the expressions |
1043 | auto binder = Binder::CreateBinder(context&: *this); |
1044 | binder->SetBindingMode(BindingMode::EXTRACT_NAMES); |
1045 | binder->Bind(statement&: *statements[0]); |
1046 | result = binder->GetTableNames(); |
1047 | }); |
1048 | return result; |
1049 | } |
1050 | |
1051 | unique_ptr<PendingQueryResult> ClientContext::PendingQueryInternal(ClientContextLock &lock, |
1052 | const shared_ptr<Relation> &relation, |
1053 | bool allow_stream_result) { |
1054 | InitialCleanup(lock); |
1055 | |
1056 | string query; |
1057 | if (config.query_verification_enabled) { |
1058 | // run the ToString method of any relation we run, mostly to ensure it doesn't crash |
1059 | relation->ToString(); |
1060 | relation->GetAlias(); |
1061 | if (relation->IsReadOnly()) { |
1062 | // verify read only statements by running a select statement |
1063 | auto select = make_uniq<SelectStatement>(); |
1064 | select->node = relation->GetQueryNode(); |
1065 | RunStatementInternal(lock, query, statement: std::move(select), allow_stream_result: false); |
1066 | } |
1067 | } |
1068 | |
1069 | auto relation_stmt = make_uniq<RelationStatement>(args: relation); |
1070 | PendingQueryParameters parameters; |
1071 | parameters.allow_stream_result = allow_stream_result; |
1072 | return PendingQueryInternal(lock, statement: std::move(relation_stmt), parameters); |
1073 | } |
1074 | |
1075 | unique_ptr<PendingQueryResult> ClientContext::PendingQuery(const shared_ptr<Relation> &relation, |
1076 | bool allow_stream_result) { |
1077 | auto lock = LockContext(); |
1078 | return PendingQueryInternal(lock&: *lock, relation, allow_stream_result); |
1079 | } |
1080 | |
1081 | unique_ptr<QueryResult> ClientContext::Execute(const shared_ptr<Relation> &relation) { |
1082 | auto lock = LockContext(); |
1083 | auto &expected_columns = relation->Columns(); |
1084 | auto pending = PendingQueryInternal(lock&: *lock, relation, allow_stream_result: false); |
1085 | if (!pending->success) { |
1086 | return make_uniq<MaterializedQueryResult>(args&: pending->GetErrorObject()); |
1087 | } |
1088 | |
1089 | unique_ptr<QueryResult> result; |
1090 | result = ExecutePendingQueryInternal(lock&: *lock, query&: *pending); |
1091 | if (result->HasError()) { |
1092 | return result; |
1093 | } |
1094 | // verify that the result types and result names of the query match the expected result types/names |
1095 | if (result->types.size() == expected_columns.size()) { |
1096 | bool mismatch = false; |
1097 | for (idx_t i = 0; i < result->types.size(); i++) { |
1098 | if (result->types[i] != expected_columns[i].Type() || result->names[i] != expected_columns[i].Name()) { |
1099 | mismatch = true; |
1100 | break; |
1101 | } |
1102 | } |
1103 | if (!mismatch) { |
1104 | // all is as expected: return the result |
1105 | return result; |
1106 | } |
1107 | } |
1108 | // result mismatch |
1109 | string err_str = "Result mismatch in query!\nExpected the following columns: [" ; |
1110 | for (idx_t i = 0; i < expected_columns.size(); i++) { |
1111 | if (i > 0) { |
1112 | err_str += ", " ; |
1113 | } |
1114 | err_str += expected_columns[i].Name() + " " + expected_columns[i].Type().ToString(); |
1115 | } |
1116 | err_str += "]\nBut result contained the following: " ; |
1117 | for (idx_t i = 0; i < result->types.size(); i++) { |
1118 | err_str += i == 0 ? "[" : ", " ; |
1119 | err_str += result->names[i] + " " + result->types[i].ToString(); |
1120 | } |
1121 | err_str += "]" ; |
1122 | return make_uniq<MaterializedQueryResult>(args: PreservedError(err_str)); |
1123 | } |
1124 | |
1125 | bool ClientContext::TryGetCurrentSetting(const std::string &key, Value &result) { |
1126 | // first check the built-in settings |
1127 | auto &db_config = DBConfig::GetConfig(context&: *this); |
1128 | auto option = db_config.GetOptionByName(name: key); |
1129 | if (option) { |
1130 | result = option->get_setting(*this); |
1131 | return true; |
1132 | } |
1133 | |
1134 | // check the client session values |
1135 | const auto &session_config_map = config.set_variables; |
1136 | |
1137 | auto session_value = session_config_map.find(x: key); |
1138 | bool found_session_value = session_value != session_config_map.end(); |
1139 | if (found_session_value) { |
1140 | result = session_value->second; |
1141 | return true; |
1142 | } |
1143 | // finally check the global session values |
1144 | return db->TryGetCurrentSetting(key, result); |
1145 | } |
1146 | |
1147 | ParserOptions ClientContext::GetParserOptions() const { |
1148 | auto &client_config = ClientConfig::GetConfig(context: *this); |
1149 | ParserOptions options; |
1150 | options.preserve_identifier_case = client_config.preserve_identifier_case; |
1151 | options.integer_division = client_config.integer_division; |
1152 | options.max_expression_depth = client_config.max_expression_depth; |
1153 | options.extensions = &DBConfig::GetConfig(context: *this).parser_extensions; |
1154 | return options; |
1155 | } |
1156 | |
1157 | ClientProperties ClientContext::GetClientProperties() const { |
1158 | auto client_context = ClientConfig::GetConfig(context: *this); |
1159 | return {client_context.ExtractTimezone(), db->config.options.arrow_offset_size}; |
1160 | } |
1161 | |
1162 | bool ClientContext::ExecutionIsFinished() { |
1163 | if (!active_query || !active_query->executor) { |
1164 | return false; |
1165 | } |
1166 | return active_query->executor->ExecutionIsFinished(); |
1167 | } |
1168 | |
1169 | } // namespace duckdb |
1170 | |