1#include "duckdb/main/client_context.hpp"
2
3#include "duckdb/catalog/catalog_entry/scalar_function_catalog_entry.hpp"
4#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
5#include "duckdb/catalog/catalog_search_path.hpp"
6#include "duckdb/common/file_system.hpp"
7#include "duckdb/common/http_state.hpp"
8#include "duckdb/common/preserved_error.hpp"
9#include "duckdb/common/progress_bar/progress_bar.hpp"
10#include "duckdb/common/serializer/buffered_deserializer.hpp"
11#include "duckdb/common/serializer/buffered_file_writer.hpp"
12#include "duckdb/common/serializer/buffered_serializer.hpp"
13#include "duckdb/common/types/column/column_data_collection.hpp"
14#include "duckdb/execution/column_binding_resolver.hpp"
15#include "duckdb/execution/operator/helper/physical_result_collector.hpp"
16#include "duckdb/execution/physical_plan_generator.hpp"
17#include "duckdb/main/appender.hpp"
18#include "duckdb/main/attached_database.hpp"
19#include "duckdb/main/client_context_file_opener.hpp"
20#include "duckdb/main/client_data.hpp"
21#include "duckdb/main/database.hpp"
22#include "duckdb/main/database_manager.hpp"
23#include "duckdb/main/error_manager.hpp"
24#include "duckdb/main/materialized_query_result.hpp"
25#include "duckdb/main/query_profiler.hpp"
26#include "duckdb/main/query_result.hpp"
27#include "duckdb/main/relation.hpp"
28#include "duckdb/main/stream_query_result.hpp"
29#include "duckdb/optimizer/optimizer.hpp"
30#include "duckdb/parallel/task_scheduler.hpp"
31#include "duckdb/parser/expression/constant_expression.hpp"
32#include "duckdb/parser/expression/parameter_expression.hpp"
33#include "duckdb/parser/parsed_data/create_function_info.hpp"
34#include "duckdb/parser/parsed_expression_iterator.hpp"
35#include "duckdb/parser/parser.hpp"
36#include "duckdb/parser/query_node/select_node.hpp"
37#include "duckdb/parser/statement/drop_statement.hpp"
38#include "duckdb/parser/statement/execute_statement.hpp"
39#include "duckdb/parser/statement/explain_statement.hpp"
40#include "duckdb/parser/statement/prepare_statement.hpp"
41#include "duckdb/parser/statement/relation_statement.hpp"
42#include "duckdb/parser/statement/select_statement.hpp"
43#include "duckdb/planner/operator/logical_execute.hpp"
44#include "duckdb/planner/planner.hpp"
45#include "duckdb/planner/pragma_handler.hpp"
46#include "duckdb/storage/data_table.hpp"
47#include "duckdb/transaction/meta_transaction.hpp"
48#include "duckdb/transaction/transaction.hpp"
49#include "duckdb/transaction/transaction_manager.hpp"
50
51namespace duckdb {
52
53struct ActiveQueryContext {
54 //! The query that is currently being executed
55 string query;
56 //! The currently open result
57 BaseQueryResult *open_result = nullptr;
58 //! Prepared statement data
59 shared_ptr<PreparedStatementData> prepared;
60 //! The query executor
61 unique_ptr<Executor> executor;
62 //! The progress bar
63 unique_ptr<ProgressBar> progress_bar;
64};
65
66ClientContext::ClientContext(shared_ptr<DatabaseInstance> database)
67 : db(std::move(database)), interrupted(false), client_data(make_uniq<ClientData>(args&: *this)), transaction(*this) {
68}
69
70ClientContext::~ClientContext() {
71 if (Exception::UncaughtException()) {
72 return;
73 }
74 // destroy the client context and rollback if there is an active transaction
75 // but only if we are not destroying this client context as part of an exception stack unwind
76 Destroy();
77}
78
79unique_ptr<ClientContextLock> ClientContext::LockContext() {
80 return make_uniq<ClientContextLock>(args&: context_lock);
81}
82
83void ClientContext::Destroy() {
84 auto lock = LockContext();
85 if (transaction.HasActiveTransaction()) {
86 transaction.ResetActiveQuery();
87 if (!transaction.IsAutoCommit()) {
88 transaction.Rollback();
89 }
90 }
91 CleanupInternal(lock&: *lock);
92}
93
94unique_ptr<DataChunk> ClientContext::Fetch(ClientContextLock &lock, StreamQueryResult &result) {
95 D_ASSERT(IsActiveResult(lock, &result));
96 D_ASSERT(active_query->executor);
97 return FetchInternal(lock, executor&: *active_query->executor, result);
98}
99
100unique_ptr<DataChunk> ClientContext::FetchInternal(ClientContextLock &lock, Executor &executor,
101 BaseQueryResult &result) {
102 bool invalidate_query = true;
103 try {
104 // fetch the chunk and return it
105 auto chunk = executor.FetchChunk();
106 if (!chunk || chunk->size() == 0) {
107 CleanupInternal(lock, result: &result);
108 }
109 return chunk;
110 } catch (StandardException &ex) {
111 // standard exceptions do not invalidate the current transaction
112 result.SetError(PreservedError(ex));
113 invalidate_query = false;
114 } catch (FatalException &ex) {
115 // fatal exceptions invalidate the entire database
116 result.SetError(PreservedError(ex));
117 auto &db = DatabaseInstance::GetDatabase(context&: *this);
118 ValidChecker::Invalidate(o&: db, error: ex.what());
119 } catch (const Exception &ex) {
120 result.SetError(PreservedError(ex));
121 } catch (std::exception &ex) {
122 result.SetError(PreservedError(ex));
123 } catch (...) { // LCOV_EXCL_START
124 result.SetError(PreservedError("Unhandled exception in FetchInternal"));
125 } // LCOV_EXCL_STOP
126 CleanupInternal(lock, result: &result, invalidate_transaction: invalidate_query);
127 return nullptr;
128}
129
130void ClientContext::BeginTransactionInternal(ClientContextLock &lock, bool requires_valid_transaction) {
131 // check if we are on AutoCommit. In this case we should start a transaction
132 D_ASSERT(!active_query);
133 auto &db = DatabaseInstance::GetDatabase(context&: *this);
134 if (ValidChecker::IsInvalidated(o&: db)) {
135 throw FatalException(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_DATABASE,
136 params: ValidChecker::InvalidatedMessage(o&: db)));
137 }
138 if (requires_valid_transaction && transaction.HasActiveTransaction() &&
139 ValidChecker::IsInvalidated(o&: transaction.ActiveTransaction())) {
140 throw Exception(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_TRANSACTION));
141 }
142 active_query = make_uniq<ActiveQueryContext>();
143 if (transaction.IsAutoCommit()) {
144 transaction.BeginTransaction();
145 }
146}
147
148void ClientContext::BeginQueryInternal(ClientContextLock &lock, const string &query) {
149 BeginTransactionInternal(lock, requires_valid_transaction: false);
150 LogQueryInternal(lock, query);
151 active_query->query = query;
152 query_progress = -1;
153 transaction.SetActiveQuery(db->GetDatabaseManager().GetNewQueryNumber());
154}
155
156PreservedError ClientContext::EndQueryInternal(ClientContextLock &lock, bool success, bool invalidate_transaction) {
157 client_data->profiler->EndQuery();
158
159 if (client_data->http_state) {
160 client_data->http_state->Reset();
161 }
162
163 // Notify any registered state of query end
164 for (auto const &s : registered_state) {
165 s.second->QueryEnd();
166 }
167
168 D_ASSERT(active_query.get());
169 active_query.reset();
170 query_progress = -1;
171 PreservedError error;
172 try {
173 if (transaction.HasActiveTransaction()) {
174 // Move the query profiler into the history
175 auto &prev_profilers = client_data->query_profiler_history->GetPrevProfilers();
176 prev_profilers.emplace_back(args: transaction.GetActiveQuery(), args: std::move(client_data->profiler));
177 // Reinitialize the query profiler
178 client_data->profiler = make_shared<QueryProfiler>(args&: *this);
179 // Propagate settings of the saved query into the new profiler.
180 client_data->profiler->Propagate(qp&: *prev_profilers.back().second);
181 if (prev_profilers.size() >= client_data->query_profiler_history->GetPrevProfilersSize()) {
182 prev_profilers.pop_front();
183 }
184
185 transaction.ResetActiveQuery();
186 if (transaction.IsAutoCommit()) {
187 if (success) {
188 transaction.Commit();
189 } else {
190 transaction.Rollback();
191 }
192 } else if (invalidate_transaction) {
193 D_ASSERT(!success);
194 ValidChecker::Invalidate(o&: ActiveTransaction(), error: "Failed to commit");
195 }
196 }
197 } catch (FatalException &ex) {
198 auto &db = DatabaseInstance::GetDatabase(context&: *this);
199 ValidChecker::Invalidate(o&: db, error: ex.what());
200 error = PreservedError(ex);
201 } catch (const Exception &ex) {
202 error = PreservedError(ex);
203 } catch (std::exception &ex) {
204 error = PreservedError(ex);
205 } catch (...) { // LCOV_EXCL_START
206 error = PreservedError("Unhandled exception!");
207 } // LCOV_EXCL_STOP
208 return error;
209}
210
211void ClientContext::CleanupInternal(ClientContextLock &lock, BaseQueryResult *result, bool invalidate_transaction) {
212 client_data->http_state = make_shared<HTTPState>();
213 if (!active_query) {
214 // no query currently active
215 return;
216 }
217 if (active_query->executor) {
218 active_query->executor->CancelTasks();
219 }
220 active_query->progress_bar.reset();
221
222 auto error = EndQueryInternal(lock, success: result ? !result->HasError() : false, invalidate_transaction);
223 if (result && !result->HasError()) {
224 // if an error occurred while committing report it in the result
225 result->SetError(error);
226 }
227 D_ASSERT(!active_query);
228}
229
230Executor &ClientContext::GetExecutor() {
231 D_ASSERT(active_query);
232 D_ASSERT(active_query->executor);
233 return *active_query->executor;
234}
235
236const string &ClientContext::GetCurrentQuery() {
237 D_ASSERT(active_query);
238 return active_query->query;
239}
240
241unique_ptr<QueryResult> ClientContext::FetchResultInternal(ClientContextLock &lock, PendingQueryResult &pending) {
242 D_ASSERT(active_query);
243 D_ASSERT(active_query->open_result == &pending);
244 D_ASSERT(active_query->prepared);
245 auto &executor = GetExecutor();
246 auto &prepared = *active_query->prepared;
247 bool create_stream_result = prepared.properties.allow_stream_result && pending.allow_stream_result;
248 if (create_stream_result) {
249 D_ASSERT(!executor.HasResultCollector());
250 active_query->progress_bar.reset();
251 query_progress = -1;
252
253 // successfully compiled SELECT clause, and it is the last statement
254 // return a StreamQueryResult so the client can call Fetch() on it and stream the result
255 auto stream_result = make_uniq<StreamQueryResult>(args&: pending.statement_type, args&: pending.properties,
256 args: shared_from_this(), args&: pending.types, args&: pending.names);
257 active_query->open_result = stream_result.get();
258 return std::move(stream_result);
259 }
260 unique_ptr<QueryResult> result;
261 if (executor.HasResultCollector()) {
262 // we have a result collector - fetch the result directly from the result collector
263 result = executor.GetResult();
264 CleanupInternal(lock, result: result.get(), invalidate_transaction: false);
265 } else {
266 // no result collector - create a materialized result by continuously fetching
267 auto result_collection = make_uniq<ColumnDataCollection>(args&: Allocator::DefaultAllocator(), args&: pending.types);
268 D_ASSERT(!result_collection->Types().empty());
269 auto materialized_result =
270 make_uniq<MaterializedQueryResult>(args&: pending.statement_type, args&: pending.properties, args&: pending.names,
271 args: std::move(result_collection), args: GetClientProperties());
272
273 auto &collection = materialized_result->Collection();
274 D_ASSERT(!collection.Types().empty());
275 ColumnDataAppendState append_state;
276 collection.InitializeAppend(state&: append_state);
277 while (true) {
278 auto chunk = FetchInternal(lock, executor&: GetExecutor(), result&: *materialized_result);
279 if (!chunk || chunk->size() == 0) {
280 break;
281 }
282#ifdef DEBUG
283 for (idx_t i = 0; i < chunk->ColumnCount(); i++) {
284 if (pending.types[i].id() == LogicalTypeId::VARCHAR) {
285 chunk->data[i].UTFVerify(chunk->size());
286 }
287 }
288#endif
289 collection.Append(state&: append_state, new_chunk&: *chunk);
290 }
291 result = std::move(materialized_result);
292 }
293 return result;
294}
295
296static bool IsExplainAnalyze(SQLStatement *statement) {
297 if (!statement) {
298 return false;
299 }
300 if (statement->type != StatementType::EXPLAIN_STATEMENT) {
301 return false;
302 }
303 auto &explain = statement->Cast<ExplainStatement>();
304 return explain.explain_type == ExplainType::EXPLAIN_ANALYZE;
305}
306
307shared_ptr<PreparedStatementData> ClientContext::CreatePreparedStatement(ClientContextLock &lock, const string &query,
308 unique_ptr<SQLStatement> statement,
309 vector<Value> *values) {
310 StatementType statement_type = statement->type;
311 auto result = make_shared<PreparedStatementData>(args&: statement_type);
312
313 auto &profiler = QueryProfiler::Get(context&: *this);
314 profiler.StartQuery(query, is_explain_analyze: IsExplainAnalyze(statement: statement.get()), start_at_optimizer: true);
315 profiler.StartPhase(phase: "planner");
316 Planner planner(*this);
317 if (values) {
318 for (auto &value : *values) {
319 planner.parameter_data.emplace_back(args&: value);
320 }
321 }
322
323 client_data->http_state = make_shared<HTTPState>();
324 planner.CreatePlan(statement: std::move(statement));
325 D_ASSERT(planner.plan || !planner.properties.bound_all_parameters);
326 profiler.EndPhase();
327
328 auto plan = std::move(planner.plan);
329 // extract the result column names from the plan
330 result->properties = planner.properties;
331 result->names = planner.names;
332 result->types = planner.types;
333 result->value_map = std::move(planner.value_map);
334 result->catalog_version = MetaTransaction::Get(context&: *this).catalog_version;
335
336 if (!planner.properties.bound_all_parameters) {
337 return result;
338 }
339#ifdef DEBUG
340 plan->Verify(*this);
341#endif
342 if (config.enable_optimizer && plan->RequireOptimizer()) {
343 profiler.StartPhase(phase: "optimizer");
344 Optimizer optimizer(*planner.binder, *this);
345 plan = optimizer.Optimize(plan: std::move(plan));
346 D_ASSERT(plan);
347 profiler.EndPhase();
348
349#ifdef DEBUG
350 plan->Verify(*this);
351#endif
352 }
353
354 profiler.StartPhase(phase: "physical_planner");
355 // now convert logical query plan into a physical query plan
356 PhysicalPlanGenerator physical_planner(*this);
357 auto physical_plan = physical_planner.CreatePlan(logical: std::move(plan));
358 profiler.EndPhase();
359
360#ifdef DEBUG
361 D_ASSERT(!physical_plan->ToString().empty());
362#endif
363 result->plan = std::move(physical_plan);
364 return result;
365}
366
367double ClientContext::GetProgress() {
368 return query_progress.load();
369}
370
371unique_ptr<PendingQueryResult> ClientContext::PendingPreparedStatement(ClientContextLock &lock,
372 shared_ptr<PreparedStatementData> statement_p,
373 PendingQueryParameters parameters) {
374 D_ASSERT(active_query);
375 auto &statement = *statement_p;
376 if (ValidChecker::IsInvalidated(o&: ActiveTransaction()) && statement.properties.requires_valid_transaction) {
377 throw Exception(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_TRANSACTION));
378 }
379 auto &transaction = MetaTransaction::Get(context&: *this);
380 auto &manager = DatabaseManager::Get(db&: *this);
381 for (auto &modified_database : statement.properties.modified_databases) {
382 auto entry = manager.GetDatabase(context&: *this, name: modified_database);
383 if (!entry) {
384 throw InternalException("Database \"%s\" not found", modified_database);
385 }
386 if (entry->IsReadOnly()) {
387 throw Exception(StringUtil::Format(
388 fmt_str: "Cannot execute statement of type \"%s\" on database \"%s\" which is attached in read-only mode!",
389 params: StatementTypeToString(type: statement.statement_type), params: modified_database));
390 }
391 transaction.ModifyDatabase(db&: *entry);
392 }
393
394 // bind the bound values before execution
395 statement.Bind(values: parameters.parameters ? *parameters.parameters : vector<Value>());
396
397 active_query->executor = make_uniq<Executor>(args&: *this);
398 auto &executor = *active_query->executor;
399 if (config.enable_progress_bar) {
400 progress_bar_display_create_func_t display_create_func = nullptr;
401 if (config.print_progress_bar) {
402 // If a custom display is set, use that, otherwise just use the default
403 display_create_func =
404 config.display_create_func ? config.display_create_func : ProgressBar::DefaultProgressBarDisplay;
405 }
406 active_query->progress_bar = make_uniq<ProgressBar>(args&: executor, args&: config.wait_time, args&: display_create_func);
407 active_query->progress_bar->Start();
408 query_progress = 0;
409 }
410 auto stream_result = parameters.allow_stream_result && statement.properties.allow_stream_result;
411 if (!stream_result && statement.properties.return_type == StatementReturnType::QUERY_RESULT) {
412 unique_ptr<PhysicalResultCollector> collector;
413 auto &config = ClientConfig::GetConfig(context&: *this);
414 auto get_method =
415 config.result_collector ? config.result_collector : PhysicalResultCollector::GetResultCollector;
416 collector = get_method(*this, statement);
417 D_ASSERT(collector->type == PhysicalOperatorType::RESULT_COLLECTOR);
418 executor.Initialize(physical_plan: std::move(collector));
419 } else {
420 executor.Initialize(physical_plan&: *statement.plan);
421 }
422 auto types = executor.GetTypes();
423 D_ASSERT(types == statement.types);
424 D_ASSERT(!active_query->open_result);
425
426 auto pending_result =
427 make_uniq<PendingQueryResult>(args: shared_from_this(), args&: *statement_p, args: std::move(types), args&: stream_result);
428 active_query->prepared = std::move(statement_p);
429 active_query->open_result = pending_result.get();
430 return pending_result;
431}
432
433PendingExecutionResult ClientContext::ExecuteTaskInternal(ClientContextLock &lock, PendingQueryResult &result) {
434 D_ASSERT(active_query);
435 D_ASSERT(active_query->open_result == &result);
436 try {
437 auto result = active_query->executor->ExecuteTask();
438 if (active_query->progress_bar) {
439 active_query->progress_bar->Update(final: result == PendingExecutionResult::RESULT_READY);
440 query_progress = active_query->progress_bar->GetCurrentPercentage();
441 }
442 return result;
443 } catch (FatalException &ex) {
444 // fatal exceptions invalidate the entire database
445 result.SetError(PreservedError(ex));
446 auto &db = DatabaseInstance::GetDatabase(context&: *this);
447 ValidChecker::Invalidate(o&: db, error: ex.what());
448 } catch (const Exception &ex) {
449 result.SetError(PreservedError(ex));
450 } catch (std::exception &ex) {
451 result.SetError(PreservedError(ex));
452 } catch (...) { // LCOV_EXCL_START
453 result.SetError(PreservedError("Unhandled exception in ExecuteTaskInternal"));
454 } // LCOV_EXCL_STOP
455 EndQueryInternal(lock, success: false, invalidate_transaction: true);
456 return PendingExecutionResult::EXECUTION_ERROR;
457}
458
459void ClientContext::InitialCleanup(ClientContextLock &lock) {
460 //! Cleanup any open results and reset the interrupted flag
461 CleanupInternal(lock);
462 interrupted = false;
463}
464
465vector<unique_ptr<SQLStatement>> ClientContext::ParseStatements(const string &query) {
466 auto lock = LockContext();
467 return ParseStatementsInternal(lock&: *lock, query);
468}
469
470vector<unique_ptr<SQLStatement>> ClientContext::ParseStatementsInternal(ClientContextLock &lock, const string &query) {
471 Parser parser(GetParserOptions());
472 parser.ParseQuery(query);
473
474 PragmaHandler handler(*this);
475 handler.HandlePragmaStatements(lock, statements&: parser.statements);
476
477 return std::move(parser.statements);
478}
479
480void ClientContext::HandlePragmaStatements(vector<unique_ptr<SQLStatement>> &statements) {
481 auto lock = LockContext();
482
483 PragmaHandler handler(*this);
484 handler.HandlePragmaStatements(lock&: *lock, statements);
485}
486
487unique_ptr<LogicalOperator> ClientContext::ExtractPlan(const string &query) {
488 auto lock = LockContext();
489
490 auto statements = ParseStatementsInternal(lock&: *lock, query);
491 if (statements.size() != 1) {
492 throw Exception("ExtractPlan can only prepare a single statement");
493 }
494
495 unique_ptr<LogicalOperator> plan;
496 client_data->http_state = make_shared<HTTPState>();
497 RunFunctionInTransactionInternal(lock&: *lock, fun: [&]() {
498 Planner planner(*this);
499 planner.CreatePlan(statement: std::move(statements[0]));
500 D_ASSERT(planner.plan);
501
502 plan = std::move(planner.plan);
503
504 if (config.enable_optimizer) {
505 Optimizer optimizer(*planner.binder, *this);
506 plan = optimizer.Optimize(plan: std::move(plan));
507 }
508
509 ColumnBindingResolver resolver;
510 resolver.Verify(op&: *plan);
511 resolver.VisitOperator(op&: *plan);
512
513 plan->ResolveOperatorTypes();
514 });
515 return plan;
516}
517
518unique_ptr<PreparedStatement> ClientContext::PrepareInternal(ClientContextLock &lock,
519 unique_ptr<SQLStatement> statement) {
520 auto n_param = statement->n_param;
521 auto named_param_map = std::move(statement->named_param_map);
522 auto statement_query = statement->query;
523 shared_ptr<PreparedStatementData> prepared_data;
524 auto unbound_statement = statement->Copy();
525 RunFunctionInTransactionInternal(
526 lock, fun: [&]() { prepared_data = CreatePreparedStatement(lock, query: statement_query, statement: std::move(statement)); }, requires_valid_transaction: false);
527 prepared_data->unbound_statement = std::move(unbound_statement);
528 return make_uniq<PreparedStatement>(args: shared_from_this(), args: std::move(prepared_data), args: std::move(statement_query),
529 args&: n_param, args: std::move(named_param_map));
530}
531
532unique_ptr<PreparedStatement> ClientContext::Prepare(unique_ptr<SQLStatement> statement) {
533 auto lock = LockContext();
534 // prepare the query
535 try {
536 InitialCleanup(lock&: *lock);
537 return PrepareInternal(lock&: *lock, statement: std::move(statement));
538 } catch (const Exception &ex) {
539 return make_uniq<PreparedStatement>(args: PreservedError(ex));
540 } catch (std::exception &ex) {
541 return make_uniq<PreparedStatement>(args: PreservedError(ex));
542 }
543}
544
545unique_ptr<PreparedStatement> ClientContext::Prepare(const string &query) {
546 auto lock = LockContext();
547 // prepare the query
548 try {
549 InitialCleanup(lock&: *lock);
550
551 // first parse the query
552 auto statements = ParseStatementsInternal(lock&: *lock, query);
553 if (statements.empty()) {
554 throw Exception("No statement to prepare!");
555 }
556 if (statements.size() > 1) {
557 throw Exception("Cannot prepare multiple statements at once!");
558 }
559 return PrepareInternal(lock&: *lock, statement: std::move(statements[0]));
560 } catch (const Exception &ex) {
561 return make_uniq<PreparedStatement>(args: PreservedError(ex));
562 } catch (std::exception &ex) {
563 return make_uniq<PreparedStatement>(args: PreservedError(ex));
564 }
565}
566
567unique_ptr<PendingQueryResult> ClientContext::PendingQueryPreparedInternal(ClientContextLock &lock, const string &query,
568 shared_ptr<PreparedStatementData> &prepared,
569 PendingQueryParameters parameters) {
570 try {
571 InitialCleanup(lock);
572 } catch (const Exception &ex) {
573 return make_uniq<PendingQueryResult>(args: PreservedError(ex));
574 } catch (std::exception &ex) {
575 return make_uniq<PendingQueryResult>(args: PreservedError(ex));
576 }
577 return PendingStatementOrPreparedStatementInternal(lock, query, statement: nullptr, prepared, parameters);
578}
579
580unique_ptr<PendingQueryResult> ClientContext::PendingQuery(const string &query,
581 shared_ptr<PreparedStatementData> &prepared,
582 PendingQueryParameters parameters) {
583 auto lock = LockContext();
584 return PendingQueryPreparedInternal(lock&: *lock, query, prepared, parameters);
585}
586
587unique_ptr<QueryResult> ClientContext::Execute(const string &query, shared_ptr<PreparedStatementData> &prepared,
588 PendingQueryParameters parameters) {
589 auto lock = LockContext();
590 auto pending = PendingQueryPreparedInternal(lock&: *lock, query, prepared, parameters);
591 if (pending->HasError()) {
592 return make_uniq<MaterializedQueryResult>(args&: pending->GetErrorObject());
593 }
594 return pending->ExecuteInternal(lock&: *lock);
595}
596
597unique_ptr<QueryResult> ClientContext::Execute(const string &query, shared_ptr<PreparedStatementData> &prepared,
598 vector<Value> &values, bool allow_stream_result) {
599 PendingQueryParameters parameters;
600 parameters.parameters = &values;
601 parameters.allow_stream_result = allow_stream_result;
602 return Execute(query, prepared, parameters);
603}
604
605unique_ptr<PendingQueryResult> ClientContext::PendingStatementInternal(ClientContextLock &lock, const string &query,
606 unique_ptr<SQLStatement> statement,
607 PendingQueryParameters parameters) {
608 // prepare the query for execution
609 auto prepared = CreatePreparedStatement(lock, query, statement: std::move(statement), values: parameters.parameters);
610 if (prepared->properties.parameter_count > 0 && !parameters.parameters) {
611 string error_message = StringUtil::Format(fmt_str: "Expected %lld parameters, but none were supplied",
612 params: prepared->properties.parameter_count);
613 return make_uniq<PendingQueryResult>(args: PreservedError(error_message));
614 }
615 if (!prepared->properties.bound_all_parameters) {
616 return make_uniq<PendingQueryResult>(args: PreservedError("Not all parameters were bound"));
617 }
618 // execute the prepared statement
619 return PendingPreparedStatement(lock, statement_p: std::move(prepared), parameters);
620}
621
622unique_ptr<QueryResult> ClientContext::RunStatementInternal(ClientContextLock &lock, const string &query,
623 unique_ptr<SQLStatement> statement,
624 bool allow_stream_result, bool verify) {
625 PendingQueryParameters parameters;
626 parameters.allow_stream_result = allow_stream_result;
627 auto pending = PendingQueryInternal(lock, statement: std::move(statement), parameters, verify);
628 if (pending->HasError()) {
629 return make_uniq<MaterializedQueryResult>(args&: pending->GetErrorObject());
630 }
631 return ExecutePendingQueryInternal(lock, query&: *pending);
632}
633
634bool ClientContext::IsActiveResult(ClientContextLock &lock, BaseQueryResult *result) {
635 if (!active_query) {
636 return false;
637 }
638 return active_query->open_result == result;
639}
640
641unique_ptr<PendingQueryResult> ClientContext::PendingStatementOrPreparedStatementInternal(
642 ClientContextLock &lock, const string &query, unique_ptr<SQLStatement> statement,
643 shared_ptr<PreparedStatementData> &prepared, PendingQueryParameters parameters) {
644 // check if we are on AutoCommit. In this case we should start a transaction.
645 if (statement && config.AnyVerification()) {
646 // query verification is enabled
647 // create a copy of the statement, and use the copy
648 // this way we verify that the copy correctly copies all properties
649 auto copied_statement = statement->Copy();
650 switch (statement->type) {
651 case StatementType::SELECT_STATEMENT: {
652 // in case this is a select query, we verify the original statement
653 PreservedError error;
654 try {
655 error = VerifyQuery(lock, query, statement: std::move(statement));
656 } catch (const Exception &ex) {
657 error = PreservedError(ex);
658 } catch (std::exception &ex) {
659 error = PreservedError(ex);
660 }
661 if (error) {
662 // error in verifying query
663 return make_uniq<PendingQueryResult>(args&: error);
664 }
665 statement = std::move(copied_statement);
666 break;
667 }
668#ifndef DUCKDB_ALTERNATIVE_VERIFY
669 case StatementType::COPY_STATEMENT:
670 case StatementType::INSERT_STATEMENT:
671 case StatementType::DELETE_STATEMENT:
672 case StatementType::UPDATE_STATEMENT: {
673 Parser parser;
674 PreservedError error;
675 try {
676 parser.ParseQuery(query: statement->ToString());
677 } catch (const Exception &ex) {
678 error = PreservedError(ex);
679 } catch (std::exception &ex) {
680 error = PreservedError(ex);
681 }
682 if (error) {
683 // error in verifying query
684 return make_uniq<PendingQueryResult>(args&: error);
685 }
686 statement = std::move(parser.statements[0]);
687 break;
688 }
689#endif
690 default:
691 statement = std::move(copied_statement);
692 break;
693 }
694 }
695 return PendingStatementOrPreparedStatement(lock, query, statement: std::move(statement), prepared, parameters);
696}
697
698unique_ptr<PendingQueryResult> ClientContext::PendingStatementOrPreparedStatement(
699 ClientContextLock &lock, const string &query, unique_ptr<SQLStatement> statement,
700 shared_ptr<PreparedStatementData> &prepared, PendingQueryParameters parameters) {
701 unique_ptr<PendingQueryResult> result;
702
703 try {
704 BeginQueryInternal(lock, query);
705 } catch (FatalException &ex) {
706 // fatal exceptions invalidate the entire database
707 auto &db = DatabaseInstance::GetDatabase(context&: *this);
708 ValidChecker::Invalidate(o&: db, error: ex.what());
709 result = make_uniq<PendingQueryResult>(args: PreservedError(ex));
710 return result;
711 } catch (const Exception &ex) {
712 return make_uniq<PendingQueryResult>(args: PreservedError(ex));
713 } catch (std::exception &ex) {
714 return make_uniq<PendingQueryResult>(args: PreservedError(ex));
715 }
716 // start the profiler
717 auto &profiler = QueryProfiler::Get(context&: *this);
718 profiler.StartQuery(query, is_explain_analyze: IsExplainAnalyze(statement: statement ? statement.get() : prepared->unbound_statement.get()));
719
720 bool invalidate_query = true;
721 try {
722 if (statement) {
723 result = PendingStatementInternal(lock, query, statement: std::move(statement), parameters);
724 } else {
725 if (prepared->RequireRebind(context&: *this, values: *parameters.parameters)) {
726 // catalog was modified: rebind the statement before execution
727 auto new_prepared =
728 CreatePreparedStatement(lock, query, statement: prepared->unbound_statement->Copy(), values: parameters.parameters);
729 D_ASSERT(new_prepared->properties.bound_all_parameters);
730 new_prepared->unbound_statement = std::move(prepared->unbound_statement);
731 prepared = std::move(new_prepared);
732 prepared->properties.bound_all_parameters = false;
733 }
734 result = PendingPreparedStatement(lock, statement_p: prepared, parameters);
735 }
736 } catch (StandardException &ex) {
737 // standard exceptions do not invalidate the current transaction
738 result = make_uniq<PendingQueryResult>(args: PreservedError(ex));
739 invalidate_query = false;
740 } catch (FatalException &ex) {
741 // fatal exceptions invalidate the entire database
742 if (!config.query_verification_enabled) {
743 auto &db = DatabaseInstance::GetDatabase(context&: *this);
744 ValidChecker::Invalidate(o&: db, error: ex.what());
745 }
746 result = make_uniq<PendingQueryResult>(args: PreservedError(ex));
747 } catch (const Exception &ex) {
748 // other types of exceptions do invalidate the current transaction
749 result = make_uniq<PendingQueryResult>(args: PreservedError(ex));
750 } catch (std::exception &ex) {
751 // other types of exceptions do invalidate the current transaction
752 result = make_uniq<PendingQueryResult>(args: PreservedError(ex));
753 }
754 if (result->HasError()) {
755 // query failed: abort now
756 EndQueryInternal(lock, success: false, invalidate_transaction: invalidate_query);
757 return result;
758 }
759 D_ASSERT(active_query->open_result == result.get());
760 return result;
761}
762
763void ClientContext::LogQueryInternal(ClientContextLock &, const string &query) {
764 if (!client_data->log_query_writer) {
765#ifdef DUCKDB_FORCE_QUERY_LOG
766 try {
767 string log_path(DUCKDB_FORCE_QUERY_LOG);
768 client_data->log_query_writer =
769 make_uniq<BufferedFileWriter>(FileSystem::GetFileSystem(*this), log_path,
770 BufferedFileWriter::DEFAULT_OPEN_FLAGS, client_data->file_opener.get());
771 } catch (...) {
772 return;
773 }
774#else
775 return;
776#endif
777 }
778 // log query path is set: log the query
779 client_data->log_query_writer->WriteData(buffer: const_data_ptr_cast(src: query.c_str()), write_size: query.size());
780 client_data->log_query_writer->WriteData(buffer: const_data_ptr_cast(src: "\n"), write_size: 1);
781 client_data->log_query_writer->Flush();
782 client_data->log_query_writer->Sync();
783}
784
785unique_ptr<QueryResult> ClientContext::Query(unique_ptr<SQLStatement> statement, bool allow_stream_result) {
786 auto pending_query = PendingQuery(statement: std::move(statement), allow_stream_result);
787 if (pending_query->HasError()) {
788 return make_uniq<MaterializedQueryResult>(args&: pending_query->GetErrorObject());
789 }
790 return pending_query->Execute();
791}
792
793unique_ptr<QueryResult> ClientContext::Query(const string &query, bool allow_stream_result) {
794 auto lock = LockContext();
795
796 PreservedError error;
797 vector<unique_ptr<SQLStatement>> statements;
798 if (!ParseStatements(lock&: *lock, query, result&: statements, error)) {
799 return make_uniq<MaterializedQueryResult>(args: std::move(error));
800 }
801 if (statements.empty()) {
802 // no statements, return empty successful result
803 StatementProperties properties;
804 vector<string> names;
805 auto collection = make_uniq<ColumnDataCollection>(args&: Allocator::DefaultAllocator());
806 return make_uniq<MaterializedQueryResult>(args: StatementType::INVALID_STATEMENT, args&: properties, args: std::move(names),
807 args: std::move(collection), args: GetClientProperties());
808 }
809
810 unique_ptr<QueryResult> result;
811 QueryResult *last_result = nullptr;
812 bool last_had_result = false;
813 for (idx_t i = 0; i < statements.size(); i++) {
814 auto &statement = statements[i];
815 bool is_last_statement = i + 1 == statements.size();
816 PendingQueryParameters parameters;
817 parameters.allow_stream_result = allow_stream_result && is_last_statement;
818 auto pending_query = PendingQueryInternal(lock&: *lock, statement: std::move(statement), parameters);
819 auto has_result = pending_query->properties.return_type == StatementReturnType::QUERY_RESULT;
820 unique_ptr<QueryResult> current_result;
821 if (pending_query->HasError()) {
822 current_result = make_uniq<MaterializedQueryResult>(args&: pending_query->GetErrorObject());
823 } else {
824 current_result = ExecutePendingQueryInternal(lock&: *lock, query&: *pending_query);
825 }
826 // now append the result to the list of results
827 if (!last_result || !last_had_result) {
828 // first result of the query
829 result = std::move(current_result);
830 last_result = result.get();
831 last_had_result = has_result;
832 } else {
833 // later results; attach to the result chain
834 // but only if there is a result
835 if (!has_result) {
836 continue;
837 }
838 last_result->next = std::move(current_result);
839 last_result = last_result->next.get();
840 }
841 }
842 return result;
843}
844
845bool ClientContext::ParseStatements(ClientContextLock &lock, const string &query,
846 vector<unique_ptr<SQLStatement>> &result, PreservedError &error) {
847 try {
848 InitialCleanup(lock);
849 // parse the query and transform it into a set of statements
850 result = ParseStatementsInternal(lock, query);
851 return true;
852 } catch (const Exception &ex) {
853 error = PreservedError(ex);
854 return false;
855 } catch (std::exception &ex) {
856 error = PreservedError(ex);
857 return false;
858 }
859}
860
861unique_ptr<PendingQueryResult> ClientContext::PendingQuery(const string &query, bool allow_stream_result) {
862 auto lock = LockContext();
863
864 PreservedError error;
865 vector<unique_ptr<SQLStatement>> statements;
866 if (!ParseStatements(lock&: *lock, query, result&: statements, error)) {
867 return make_uniq<PendingQueryResult>(args: std::move(error));
868 }
869 if (statements.size() != 1) {
870 return make_uniq<PendingQueryResult>(args: PreservedError("PendingQuery can only take a single statement"));
871 }
872 PendingQueryParameters parameters;
873 parameters.allow_stream_result = allow_stream_result;
874 return PendingQueryInternal(lock&: *lock, statement: std::move(statements[0]), parameters);
875}
876
877unique_ptr<PendingQueryResult> ClientContext::PendingQuery(unique_ptr<SQLStatement> statement,
878 bool allow_stream_result) {
879 auto lock = LockContext();
880 PendingQueryParameters parameters;
881 parameters.allow_stream_result = allow_stream_result;
882 return PendingQueryInternal(lock&: *lock, statement: std::move(statement), parameters);
883}
884
885unique_ptr<PendingQueryResult> ClientContext::PendingQueryInternal(ClientContextLock &lock,
886 unique_ptr<SQLStatement> statement,
887 PendingQueryParameters parameters, bool verify) {
888 auto query = statement->query;
889 shared_ptr<PreparedStatementData> prepared;
890 if (verify) {
891 return PendingStatementOrPreparedStatementInternal(lock, query, statement: std::move(statement), prepared, parameters);
892 } else {
893 return PendingStatementOrPreparedStatement(lock, query, statement: std::move(statement), prepared, parameters);
894 }
895}
896
897unique_ptr<QueryResult> ClientContext::ExecutePendingQueryInternal(ClientContextLock &lock, PendingQueryResult &query) {
898 return query.ExecuteInternal(lock);
899}
900
901void ClientContext::Interrupt() {
902 interrupted = true;
903}
904
905void ClientContext::EnableProfiling() {
906 auto lock = LockContext();
907 auto &config = ClientConfig::GetConfig(context&: *this);
908 config.enable_profiler = true;
909 config.emit_profiler_output = true;
910}
911
912void ClientContext::DisableProfiling() {
913 auto lock = LockContext();
914 auto &config = ClientConfig::GetConfig(context&: *this);
915 config.enable_profiler = false;
916}
917
918void ClientContext::RegisterFunction(CreateFunctionInfo &info) {
919 RunFunctionInTransaction(fun: [&]() {
920 auto existing_function = Catalog::GetEntry<ScalarFunctionCatalogEntry>(context&: *this, INVALID_CATALOG, schema_name: info.schema,
921 name: info.name, if_not_found: OnEntryNotFound::RETURN_NULL);
922 if (existing_function) {
923 auto &new_info = info.Cast<CreateScalarFunctionInfo>();
924 if (new_info.functions.MergeFunctionSet(new_functions: existing_function->functions)) {
925 // function info was updated from catalog entry, rewrite is needed
926 info.on_conflict = OnCreateConflict::REPLACE_ON_CONFLICT;
927 }
928 }
929 // create function
930 auto &catalog = Catalog::GetSystemCatalog(context&: *this);
931 catalog.CreateFunction(context&: *this, info);
932 });
933}
934
935void ClientContext::RunFunctionInTransactionInternal(ClientContextLock &lock, const std::function<void(void)> &fun,
936 bool requires_valid_transaction) {
937 if (requires_valid_transaction && transaction.HasActiveTransaction() &&
938 ValidChecker::IsInvalidated(o&: ActiveTransaction())) {
939 throw TransactionException(ErrorManager::FormatException(context&: *this, error_type: ErrorType::INVALIDATED_TRANSACTION));
940 }
941 // check if we are on AutoCommit. In this case we should start a transaction
942 bool require_new_transaction = transaction.IsAutoCommit() && !transaction.HasActiveTransaction();
943 if (require_new_transaction) {
944 D_ASSERT(!active_query);
945 transaction.BeginTransaction();
946 }
947 try {
948 fun();
949 } catch (StandardException &ex) {
950 if (require_new_transaction) {
951 transaction.Rollback();
952 }
953 throw;
954 } catch (FatalException &ex) {
955 auto &db = DatabaseInstance::GetDatabase(context&: *this);
956 ValidChecker::Invalidate(o&: db, error: ex.what());
957 throw;
958 } catch (std::exception &ex) {
959 if (require_new_transaction) {
960 transaction.Rollback();
961 } else {
962 ValidChecker::Invalidate(o&: ActiveTransaction(), error: ex.what());
963 }
964 throw;
965 }
966 if (require_new_transaction) {
967 transaction.Commit();
968 }
969}
970
971void ClientContext::RunFunctionInTransaction(const std::function<void(void)> &fun, bool requires_valid_transaction) {
972 auto lock = LockContext();
973 RunFunctionInTransactionInternal(lock&: *lock, fun, requires_valid_transaction);
974}
975
976unique_ptr<TableDescription> ClientContext::TableInfo(const string &schema_name, const string &table_name) {
977 unique_ptr<TableDescription> result;
978 RunFunctionInTransaction(fun: [&]() {
979 // obtain the table info
980 auto table = Catalog::GetEntry<TableCatalogEntry>(context&: *this, INVALID_CATALOG, schema_name, name: table_name,
981 if_not_found: OnEntryNotFound::RETURN_NULL);
982 if (!table) {
983 return;
984 }
985 // write the table info to the result
986 result = make_uniq<TableDescription>();
987 result->schema = schema_name;
988 result->table = table_name;
989 for (auto &column : table->GetColumns().Logical()) {
990 result->columns.emplace_back(args: column.Name(), args: column.Type());
991 }
992 });
993 return result;
994}
995
996void ClientContext::Append(TableDescription &description, ColumnDataCollection &collection) {
997 RunFunctionInTransaction(fun: [&]() {
998 auto &table_entry =
999 Catalog::GetEntry<TableCatalogEntry>(context&: *this, INVALID_CATALOG, schema_name: description.schema, name: description.table);
1000 // verify that the table columns and types match up
1001 if (description.columns.size() != table_entry.GetColumns().PhysicalColumnCount()) {
1002 throw Exception("Failed to append: table entry has different number of columns!");
1003 }
1004 for (idx_t i = 0; i < description.columns.size(); i++) {
1005 if (description.columns[i].Type() != table_entry.GetColumns().GetColumn(index: PhysicalIndex(i)).Type()) {
1006 throw Exception("Failed to append: table entry has different number of columns!");
1007 }
1008 }
1009 table_entry.GetStorage().LocalAppend(table&: table_entry, context&: *this, collection);
1010 });
1011}
1012
1013void ClientContext::TryBindRelation(Relation &relation, vector<ColumnDefinition> &result_columns) {
1014#ifdef DEBUG
1015 D_ASSERT(!relation.GetAlias().empty());
1016 D_ASSERT(!relation.ToString().empty());
1017#endif
1018 client_data->http_state = make_shared<HTTPState>();
1019 RunFunctionInTransaction(fun: [&]() {
1020 // bind the expressions
1021 auto binder = Binder::CreateBinder(context&: *this);
1022 auto result = relation.Bind(binder&: *binder);
1023 D_ASSERT(result.names.size() == result.types.size());
1024
1025 result_columns.reserve(n: result_columns.size() + result.names.size());
1026 for (idx_t i = 0; i < result.names.size(); i++) {
1027 result_columns.emplace_back(args&: result.names[i], args&: result.types[i]);
1028 }
1029 });
1030}
1031
1032unordered_set<string> ClientContext::GetTableNames(const string &query) {
1033 auto lock = LockContext();
1034
1035 auto statements = ParseStatementsInternal(lock&: *lock, query);
1036 if (statements.size() != 1) {
1037 throw InvalidInputException("Expected a single statement");
1038 }
1039
1040 unordered_set<string> result;
1041 RunFunctionInTransactionInternal(lock&: *lock, fun: [&]() {
1042 // bind the expressions
1043 auto binder = Binder::CreateBinder(context&: *this);
1044 binder->SetBindingMode(BindingMode::EXTRACT_NAMES);
1045 binder->Bind(statement&: *statements[0]);
1046 result = binder->GetTableNames();
1047 });
1048 return result;
1049}
1050
1051unique_ptr<PendingQueryResult> ClientContext::PendingQueryInternal(ClientContextLock &lock,
1052 const shared_ptr<Relation> &relation,
1053 bool allow_stream_result) {
1054 InitialCleanup(lock);
1055
1056 string query;
1057 if (config.query_verification_enabled) {
1058 // run the ToString method of any relation we run, mostly to ensure it doesn't crash
1059 relation->ToString();
1060 relation->GetAlias();
1061 if (relation->IsReadOnly()) {
1062 // verify read only statements by running a select statement
1063 auto select = make_uniq<SelectStatement>();
1064 select->node = relation->GetQueryNode();
1065 RunStatementInternal(lock, query, statement: std::move(select), allow_stream_result: false);
1066 }
1067 }
1068
1069 auto relation_stmt = make_uniq<RelationStatement>(args: relation);
1070 PendingQueryParameters parameters;
1071 parameters.allow_stream_result = allow_stream_result;
1072 return PendingQueryInternal(lock, statement: std::move(relation_stmt), parameters);
1073}
1074
1075unique_ptr<PendingQueryResult> ClientContext::PendingQuery(const shared_ptr<Relation> &relation,
1076 bool allow_stream_result) {
1077 auto lock = LockContext();
1078 return PendingQueryInternal(lock&: *lock, relation, allow_stream_result);
1079}
1080
1081unique_ptr<QueryResult> ClientContext::Execute(const shared_ptr<Relation> &relation) {
1082 auto lock = LockContext();
1083 auto &expected_columns = relation->Columns();
1084 auto pending = PendingQueryInternal(lock&: *lock, relation, allow_stream_result: false);
1085 if (!pending->success) {
1086 return make_uniq<MaterializedQueryResult>(args&: pending->GetErrorObject());
1087 }
1088
1089 unique_ptr<QueryResult> result;
1090 result = ExecutePendingQueryInternal(lock&: *lock, query&: *pending);
1091 if (result->HasError()) {
1092 return result;
1093 }
1094 // verify that the result types and result names of the query match the expected result types/names
1095 if (result->types.size() == expected_columns.size()) {
1096 bool mismatch = false;
1097 for (idx_t i = 0; i < result->types.size(); i++) {
1098 if (result->types[i] != expected_columns[i].Type() || result->names[i] != expected_columns[i].Name()) {
1099 mismatch = true;
1100 break;
1101 }
1102 }
1103 if (!mismatch) {
1104 // all is as expected: return the result
1105 return result;
1106 }
1107 }
1108 // result mismatch
1109 string err_str = "Result mismatch in query!\nExpected the following columns: [";
1110 for (idx_t i = 0; i < expected_columns.size(); i++) {
1111 if (i > 0) {
1112 err_str += ", ";
1113 }
1114 err_str += expected_columns[i].Name() + " " + expected_columns[i].Type().ToString();
1115 }
1116 err_str += "]\nBut result contained the following: ";
1117 for (idx_t i = 0; i < result->types.size(); i++) {
1118 err_str += i == 0 ? "[" : ", ";
1119 err_str += result->names[i] + " " + result->types[i].ToString();
1120 }
1121 err_str += "]";
1122 return make_uniq<MaterializedQueryResult>(args: PreservedError(err_str));
1123}
1124
1125bool ClientContext::TryGetCurrentSetting(const std::string &key, Value &result) {
1126 // first check the built-in settings
1127 auto &db_config = DBConfig::GetConfig(context&: *this);
1128 auto option = db_config.GetOptionByName(name: key);
1129 if (option) {
1130 result = option->get_setting(*this);
1131 return true;
1132 }
1133
1134 // check the client session values
1135 const auto &session_config_map = config.set_variables;
1136
1137 auto session_value = session_config_map.find(x: key);
1138 bool found_session_value = session_value != session_config_map.end();
1139 if (found_session_value) {
1140 result = session_value->second;
1141 return true;
1142 }
1143 // finally check the global session values
1144 return db->TryGetCurrentSetting(key, result);
1145}
1146
1147ParserOptions ClientContext::GetParserOptions() const {
1148 auto &client_config = ClientConfig::GetConfig(context: *this);
1149 ParserOptions options;
1150 options.preserve_identifier_case = client_config.preserve_identifier_case;
1151 options.integer_division = client_config.integer_division;
1152 options.max_expression_depth = client_config.max_expression_depth;
1153 options.extensions = &DBConfig::GetConfig(context: *this).parser_extensions;
1154 return options;
1155}
1156
1157ClientProperties ClientContext::GetClientProperties() const {
1158 auto client_context = ClientConfig::GetConfig(context: *this);
1159 return {client_context.ExtractTimezone(), db->config.options.arrow_offset_size};
1160}
1161
1162bool ClientContext::ExecutionIsFinished() {
1163 if (!active_query || !active_query->executor) {
1164 return false;
1165 }
1166 return active_query->executor->ExecutionIsFinished();
1167}
1168
1169} // namespace duckdb
1170