1#include "duckdb/main/client_context.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/serializer/buffered_deserializer.hpp"
5#include "duckdb/common/serializer/buffered_serializer.hpp"
6#include "duckdb/execution/physical_plan_generator.hpp"
7#include "duckdb/main/database.hpp"
8#include "duckdb/main/materialized_query_result.hpp"
9#include "duckdb/main/query_result.hpp"
10#include "duckdb/main/stream_query_result.hpp"
11#include "duckdb/optimizer/optimizer.hpp"
12#include "duckdb/parser/parser.hpp"
13#include "duckdb/parser/expression/constant_expression.hpp"
14#include "duckdb/parser/statement/drop_statement.hpp"
15#include "duckdb/parser/statement/execute_statement.hpp"
16#include "duckdb/parser/statement/explain_statement.hpp"
17#include "duckdb/parser/statement/prepare_statement.hpp"
18#include "duckdb/planner/operator/logical_execute.hpp"
19#include "duckdb/planner/planner.hpp"
20#include "duckdb/transaction/transaction_manager.hpp"
21#include "duckdb/transaction/transaction.hpp"
22#include "duckdb/storage/data_table.hpp"
23#include "duckdb/main/appender.hpp"
24#include "duckdb/main/relation.hpp"
25#include "duckdb/planner/expression_binder/where_binder.hpp"
26#include "duckdb/parser/statement/relation_statement.hpp"
27
28using namespace duckdb;
29using namespace std;
30
31ClientContext::ClientContext(DuckDB &database)
32 : db(database), transaction(*database.transaction_manager), interrupted(false), catalog(*database.catalog),
33 temporary_objects(make_unique<SchemaCatalogEntry>(db.catalog.get(), TEMP_SCHEMA)),
34 prepared_statements(make_unique<CatalogSet>(*db.catalog)), open_result(nullptr) {
35 random_device rd;
36 random_engine.seed(rd());
37}
38
39void ClientContext::Cleanup() {
40 lock_guard<mutex> client_guard(context_lock);
41 if (is_invalidated || !prepared_statements) {
42 return;
43 }
44 if (transaction.HasActiveTransaction()) {
45 ActiveTransaction().active_query = MAXIMUM_QUERY_ID;
46 if (!transaction.IsAutoCommit()) {
47 transaction.Rollback();
48 }
49 }
50 assert(prepared_statements);
51 db.transaction_manager->AddCatalogSet(*this, move(prepared_statements));
52 // invalidate any prepared statements
53 for (auto &statement : prepared_statement_objects) {
54 statement->is_invalidated = true;
55 }
56 for (auto &appender : appenders) {
57 appender->Invalidate("Connection has been closed!", false);
58 }
59 CleanupInternal();
60}
61
62void ClientContext::RegisterAppender(Appender *appender) {
63 lock_guard<mutex> client_guard(context_lock);
64 if (is_invalidated) {
65 throw Exception("Database that this connection belongs to has been closed!");
66 }
67 appenders.insert(appender);
68}
69
70void ClientContext::RemoveAppender(Appender *appender) {
71 lock_guard<mutex> client_guard(context_lock);
72 if (is_invalidated) {
73 return;
74 }
75 appenders.erase(appender);
76}
77
78unique_ptr<DataChunk> ClientContext::Fetch() {
79 lock_guard<mutex> client_guard(context_lock);
80 if (!open_result) {
81 // no result to fetch from
82 return nullptr;
83 }
84 if (is_invalidated) {
85 // ClientContext is invalidated: database has been closed
86 open_result->error = "Database that this connection belongs to has been closed!";
87 open_result->success = false;
88 return nullptr;
89 }
90 try {
91 // fetch the chunk and return it
92 auto chunk = FetchInternal();
93 return chunk;
94 } catch (Exception &ex) {
95 open_result->error = ex.what();
96 } catch (...) {
97 open_result->error = "Unhandled exception in Fetch";
98 }
99 open_result->success = false;
100 CleanupInternal();
101 return nullptr;
102}
103
104string ClientContext::FinalizeQuery(bool success) {
105 profiler.EndQuery();
106
107 execution_context.Reset();
108
109 string error;
110 if (transaction.HasActiveTransaction()) {
111 ActiveTransaction().active_query = MAXIMUM_QUERY_ID;
112 try {
113 if (transaction.IsAutoCommit()) {
114 if (success) {
115 // query was successful: commit
116 transaction.Commit();
117 } else {
118 // query was unsuccessful: rollback
119 transaction.Rollback();
120 }
121 }
122 } catch (Exception &ex) {
123 error = ex.what();
124 } catch (...) {
125 error = "Unhandled exception!";
126 }
127 }
128 return error;
129}
130
131void ClientContext::CleanupInternal() {
132 if (!open_result) {
133 // no result currently open
134 return;
135 }
136
137 auto error = FinalizeQuery(open_result->success);
138 if (open_result->success) {
139 // if an error occurred while committing report it in the result
140 open_result->error = error;
141 open_result->success = error.empty();
142 }
143
144 open_result->is_open = false;
145 open_result = nullptr;
146}
147
148unique_ptr<DataChunk> ClientContext::FetchInternal() {
149 assert(execution_context.physical_plan);
150 auto chunk = make_unique<DataChunk>();
151 // run the plan to get the next chunks
152 execution_context.physical_plan->InitializeChunk(*chunk);
153 execution_context.physical_plan->GetChunk(*this, *chunk, execution_context.physical_state.get());
154 return chunk;
155}
156
157unique_ptr<PreparedStatementData> ClientContext::CreatePreparedStatement(const string &query,
158 unique_ptr<SQLStatement> statement) {
159 StatementType statement_type = statement->type;
160 auto result = make_unique<PreparedStatementData>(statement_type);
161
162 profiler.StartPhase("planner");
163 Planner planner(*this);
164 planner.CreatePlan(move(statement));
165 assert(planner.plan);
166 profiler.EndPhase();
167
168 auto plan = move(planner.plan);
169 // extract the result column names from the plan
170 result->read_only = planner.read_only;
171 result->requires_valid_transaction = planner.requires_valid_transaction;
172 result->names = planner.names;
173 result->sql_types = planner.sql_types;
174 result->value_map = move(planner.value_map);
175
176#ifdef DEBUG
177 if (enable_optimizer) {
178#endif
179 profiler.StartPhase("optimizer");
180 Optimizer optimizer(planner.binder, *this);
181 plan = optimizer.Optimize(move(plan));
182 assert(plan);
183 profiler.EndPhase();
184#ifdef DEBUG
185 }
186#endif
187
188 profiler.StartPhase("physical_planner");
189 // now convert logical query plan into a physical query plan
190 PhysicalPlanGenerator physical_planner(*this);
191 auto physical_plan = physical_planner.CreatePlan(move(plan));
192 profiler.EndPhase();
193
194 result->dependencies = move(physical_planner.dependencies);
195 result->types = physical_plan->types;
196 result->plan = move(physical_plan);
197 return result;
198}
199
200unique_ptr<QueryResult> ClientContext::ExecutePreparedStatement(const string &query, PreparedStatementData &statement,
201 vector<Value> bound_values, bool allow_stream_result) {
202 if (ActiveTransaction().is_invalidated && statement.requires_valid_transaction) {
203 throw Exception("Current transaction is aborted (please ROLLBACK)");
204 }
205 if (db.access_mode == AccessMode::READ_ONLY && !statement.read_only) {
206 throw Exception(StringUtil::Format("Cannot execute statement of type \"%s\" in read-only mode!",
207 StatementTypeToString(statement.statement_type).c_str()));
208 }
209
210 // bind the bound values before execution
211 statement.Bind(move(bound_values));
212
213 bool create_stream_result = statement.statement_type == StatementType::SELECT_STATEMENT && allow_stream_result;
214
215 // store the physical plan in the context for calls to Fetch()
216 execution_context.physical_plan = move(statement.plan);
217 execution_context.physical_state = execution_context.physical_plan->GetOperatorState();
218
219 auto types = execution_context.physical_plan->GetTypes();
220 assert(types.size() == statement.sql_types.size());
221
222 if (create_stream_result) {
223 // successfully compiled SELECT clause and it is the last statement
224 // return a StreamQueryResult so the client can call Fetch() on it and stream the result
225 return make_unique<StreamQueryResult>(statement.statement_type, *this, statement.sql_types, types,
226 statement.names);
227 }
228 // create a materialized result by continuously fetching
229 auto result =
230 make_unique<MaterializedQueryResult>(statement.statement_type, statement.sql_types, types, statement.names);
231 while (true) {
232 auto chunk = FetchInternal();
233 if (chunk->size() == 0) {
234 break;
235 }
236#ifdef DEBUG
237 for (idx_t i = 0; i < chunk->column_count(); i++) {
238 if (statement.sql_types[i].id == SQLTypeId::VARCHAR) {
239 chunk->data[i].UTFVerify(chunk->size());
240 }
241 }
242#endif
243 result->collection.Append(*chunk);
244 }
245 return move(result);
246}
247
248void ClientContext::InitialCleanup() {
249 if (is_invalidated) {
250 throw Exception("Database that this connection belongs to has been closed!");
251 }
252 //! Cleanup any open results and reset the interrupted flag
253 CleanupInternal();
254 interrupted = false;
255}
256
257unique_ptr<PreparedStatement> ClientContext::Prepare(string query) {
258 lock_guard<mutex> client_guard(context_lock);
259 // prepare the query
260 try {
261 InitialCleanup();
262
263 // first parse the query
264 Parser parser;
265 parser.ParseQuery(query.c_str());
266 if (parser.statements.size() == 0) {
267 throw Exception("No statement to prepare!");
268 }
269 if (parser.statements.size() > 1) {
270 throw Exception("Cannot prepare multiple statements at once!");
271 }
272 // now write the prepared statement data into the catalog
273 string prepare_name = "____duckdb_internal_prepare_" + to_string(prepare_count);
274 prepare_count++;
275 // create a prepare statement out of the underlying statement
276 auto prepare = make_unique<PrepareStatement>();
277 prepare->name = prepare_name;
278 prepare->statement = move(parser.statements[0]);
279
280 // now perform the actual PREPARE query
281 auto result = RunStatement(query, move(prepare), false);
282 if (!result->success) {
283 throw Exception(result->error);
284 }
285 auto prepared_catalog = (PreparedStatementCatalogEntry *)prepared_statements->GetRootEntry(prepare_name);
286 auto prepared_object = make_unique<PreparedStatement>(this, prepare_name, query, *prepared_catalog->prepared,
287 parser.n_prepared_parameters);
288 prepared_statement_objects.insert(prepared_object.get());
289 return prepared_object;
290 } catch (Exception &ex) {
291 return make_unique<PreparedStatement>(ex.what());
292 }
293}
294
295unique_ptr<QueryResult> ClientContext::Execute(string name, vector<Value> &values, bool allow_stream_result,
296 string query) {
297 lock_guard<mutex> client_guard(context_lock);
298 try {
299 InitialCleanup();
300 } catch (std::exception &ex) {
301 return make_unique<MaterializedQueryResult>(ex.what());
302 }
303
304 // create the execute statement
305 auto execute = make_unique<ExecuteStatement>();
306 execute->name = name;
307 for (auto &val : values) {
308 execute->values.push_back(make_unique<ConstantExpression>(val.GetSQLType(), val));
309 }
310
311 return RunStatement(query, move(execute), allow_stream_result);
312}
313void ClientContext::RemovePreparedStatement(PreparedStatement *statement) {
314 lock_guard<mutex> client_guard(context_lock);
315 if (!statement->success || statement->is_invalidated || is_invalidated) {
316 return;
317 }
318 try {
319 InitialCleanup();
320 } catch (...) {
321 return;
322 }
323 // erase the object from the list of prepared statements
324 prepared_statement_objects.erase(statement);
325 // drop it from the catalog
326 auto deallocate_statement = make_unique<DropStatement>();
327 deallocate_statement->info->type = CatalogType::PREPARED_STATEMENT;
328 deallocate_statement->info->name = statement->name;
329 string query = "DEALLOCATE " + statement->name;
330 RunStatement(query, move(deallocate_statement), false);
331}
332
333unique_ptr<QueryResult> ClientContext::RunStatementInternal(const string &query, unique_ptr<SQLStatement> statement,
334 bool allow_stream_result) {
335 // prepare the query for execution
336 auto prepared = CreatePreparedStatement(query, move(statement));
337 // by default, no values are bound
338 vector<Value> bound_values;
339 // execute the prepared statement
340 return ExecutePreparedStatement(query, *prepared, move(bound_values), allow_stream_result);
341}
342
343unique_ptr<QueryResult> ClientContext::RunStatement(const string &query, unique_ptr<SQLStatement> statement,
344 bool allow_stream_result) {
345 unique_ptr<QueryResult> result;
346 // check if we are on AutoCommit. In this case we should start a transaction.
347 if (transaction.IsAutoCommit()) {
348 transaction.BeginTransaction();
349 }
350 ActiveTransaction().active_query = db.transaction_manager->GetQueryNumber();
351 if (statement->type == StatementType::SELECT_STATEMENT && query_verification_enabled) {
352 // query verification is enabled:
353 // create a copy of the statement and verify the original statement
354 auto copied_statement = ((SelectStatement &)*statement).Copy();
355 string error = VerifyQuery(query, move(statement));
356 if (!error.empty()) {
357 // query failed: abort now
358 FinalizeQuery(false);
359 // error in verifying query
360 return make_unique<MaterializedQueryResult>(error);
361 }
362 statement = move(copied_statement);
363 }
364 // start the profiler
365 profiler.StartQuery(query, *statement);
366 try {
367 result = RunStatementInternal(query, move(statement), allow_stream_result);
368 } catch (StandardException &ex) {
369 // standard exceptions do not invalidate the current transaction
370 result = make_unique<MaterializedQueryResult>(ex.what());
371 } catch (std::exception &ex) {
372 // other types of exceptions do invalidate the current transaction
373 if (transaction.HasActiveTransaction()) {
374 ActiveTransaction().is_invalidated = true;
375 }
376 result = make_unique<MaterializedQueryResult>(ex.what());
377 }
378 if (!result->success) {
379 // initial failures should always be reported as MaterializedResult
380 assert(result->type != QueryResultType::STREAM_RESULT);
381 // query failed: abort now
382 FinalizeQuery(false);
383 return result;
384 }
385 // query succeeded, append to list of results
386 if (result->type == QueryResultType::STREAM_RESULT) {
387 // store as currently open result if it is a stream result
388 this->open_result = (StreamQueryResult *)result.get();
389 } else {
390 // finalize the query if it is not a stream result
391 string error = FinalizeQuery(true);
392 if (!error.empty()) {
393 // failure in committing transaction
394 return make_unique<MaterializedQueryResult>(error);
395 }
396 }
397 return result;
398}
399
400unique_ptr<QueryResult> ClientContext::RunStatements(const string &query, vector<unique_ptr<SQLStatement>> &statements,
401 bool allow_stream_result) {
402 // now we have a list of statements
403 // iterate over them and execute them one by one
404 unique_ptr<QueryResult> result;
405 QueryResult *last_result = nullptr;
406 for (idx_t i = 0; i < statements.size(); i++) {
407 auto &statement = statements[i];
408 bool is_last_statement = i + 1 == statements.size();
409 auto current_result = RunStatement(query, move(statement), allow_stream_result && is_last_statement);
410 // now append the result to the list of results
411 if (!last_result) {
412 // first result of the query
413 result = move(current_result);
414 last_result = result.get();
415 } else {
416 // later results; attach to the result chain
417 last_result->next = move(current_result);
418 last_result = last_result->next.get();
419 }
420 }
421 return result;
422}
423
424unique_ptr<QueryResult> ClientContext::Query(string query, bool allow_stream_result) {
425 lock_guard<mutex> client_guard(context_lock);
426
427 Parser parser;
428 try {
429 InitialCleanup();
430 // parse the query and transform it into a set of statements
431 parser.ParseQuery(query.c_str());
432 } catch (std::exception &ex) {
433 return make_unique<MaterializedQueryResult>(ex.what());
434 }
435
436 if (parser.statements.size() == 0) {
437 // no statements, return empty successful result
438 return make_unique<MaterializedQueryResult>(StatementType::INVALID_STATEMENT);
439 }
440
441 return RunStatements(query, parser.statements, allow_stream_result);
442}
443
444void ClientContext::Interrupt() {
445 interrupted = true;
446}
447
448void ClientContext::EnableProfiling() {
449 lock_guard<mutex> client_guard(context_lock);
450 profiler.Enable();
451}
452
453void ClientContext::DisableProfiling() {
454 lock_guard<mutex> client_guard(context_lock);
455 profiler.Disable();
456}
457
458void ClientContext::Invalidate() {
459 // interrupt any running query before attempting to obtain the lock
460 // this way we don't have to wait for the entire query to finish
461 Interrupt();
462 // now obtain the context lock
463 lock_guard<mutex> client_guard(context_lock);
464 // invalidate this context and the TransactionManager
465 is_invalidated = true;
466 transaction.Invalidate();
467 // also close any open result
468 if (open_result) {
469 open_result->is_open = false;
470 }
471 // and close any open appenders and prepared statements
472 for (auto &statement : prepared_statement_objects) {
473 statement->is_invalidated = true;
474 }
475 for (auto &appender : appenders) {
476 appender->Invalidate("Database that this appender belongs to has been closed!", false);
477 }
478 appenders.clear();
479}
480
481string ClientContext::VerifyQuery(string query, unique_ptr<SQLStatement> statement) {
482 assert(statement->type == StatementType::SELECT_STATEMENT);
483 // aggressive query verification
484
485 // the purpose of this function is to test correctness of otherwise hard to test features:
486 // Copy() of statements and expressions
487 // Serialize()/Deserialize() of expressions
488 // Hash() of expressions
489 // Equality() of statements and expressions
490 // Correctness of plans both with and without optimizers
491
492 // copy the statement
493 auto select_stmt = (SelectStatement *)statement.get();
494 auto copied_stmt = select_stmt->Copy();
495 auto unoptimized_stmt = select_stmt->Copy();
496
497 BufferedSerializer serializer;
498 select_stmt->Serialize(serializer);
499 BufferedDeserializer source(serializer);
500 auto deserialized_stmt = SelectStatement::Deserialize(source);
501 // all the statements should be equal
502 assert(copied_stmt->Equals(statement.get()));
503 assert(deserialized_stmt->Equals(statement.get()));
504 assert(copied_stmt->Equals(deserialized_stmt.get()));
505
506 // now perform checking on the expressions
507#ifdef DEBUG
508 auto &orig_expr_list = select_stmt->node->GetSelectList();
509 auto &de_expr_list = deserialized_stmt->node->GetSelectList();
510 auto &cp_expr_list = copied_stmt->node->GetSelectList();
511 assert(orig_expr_list.size() == de_expr_list.size() && cp_expr_list.size() == de_expr_list.size());
512 for (idx_t i = 0; i < orig_expr_list.size(); i++) {
513 // run the ToString, to verify that it doesn't crash
514 orig_expr_list[i]->ToString();
515 // check that the expressions are equivalent
516 assert(orig_expr_list[i]->Equals(de_expr_list[i].get()));
517 assert(orig_expr_list[i]->Equals(cp_expr_list[i].get()));
518 assert(de_expr_list[i]->Equals(cp_expr_list[i].get()));
519 // check that the hashes are equivalent too
520 assert(orig_expr_list[i]->Hash() == de_expr_list[i]->Hash());
521 assert(orig_expr_list[i]->Hash() == cp_expr_list[i]->Hash());
522 }
523 // now perform additional checking within the expressions
524 for (idx_t outer_idx = 0; outer_idx < orig_expr_list.size(); outer_idx++) {
525 auto hash = orig_expr_list[outer_idx]->Hash();
526 for (idx_t inner_idx = 0; inner_idx < orig_expr_list.size(); inner_idx++) {
527 auto hash2 = orig_expr_list[inner_idx]->Hash();
528 if (hash != hash2) {
529 // if the hashes are not equivalent, the expressions should not be equivalent
530 assert(!orig_expr_list[outer_idx]->Equals(orig_expr_list[inner_idx].get()));
531 }
532 }
533 }
534#endif
535
536 // disable profiling if it is enabled
537 bool profiling_is_enabled = profiler.IsEnabled();
538 if (profiling_is_enabled) {
539 profiler.Disable();
540 }
541
542 // see below
543 auto statement_copy_for_explain = select_stmt->Copy();
544
545 auto original_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT),
546 copied_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT),
547 deserialized_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT),
548 unoptimized_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT);
549 // execute the original statement
550 try {
551 auto result = RunStatementInternal(query, move(statement), false);
552 original_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result));
553 } catch (Exception &ex) {
554 original_result->error = ex.what();
555 original_result->success = false;
556 }
557
558 // check explain, only if q does not already contain EXPLAIN
559 if (original_result->success) {
560 auto explain_q = "EXPLAIN " + query;
561 auto explain_stmt = make_unique<ExplainStatement>(move(statement_copy_for_explain));
562 try {
563 RunStatementInternal(explain_q, move(explain_stmt), false);
564 } catch (std::exception &ex) {
565 return "EXPLAIN failed but query did not (" + string(ex.what()) + ")";
566 }
567 }
568
569 // now execute the copied statement
570 try {
571 auto result = RunStatementInternal(query, move(copied_stmt), false);
572 copied_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result));
573 } catch (Exception &ex) {
574 copied_result->error = ex.what();
575 }
576 // now execute the deserialized statement
577 try {
578 auto result = RunStatementInternal(query, move(deserialized_stmt), false);
579 deserialized_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result));
580 } catch (Exception &ex) {
581 deserialized_result->error = ex.what();
582 }
583 // now execute the unoptimized statement
584 enable_optimizer = false;
585 try {
586 auto result = RunStatementInternal(query, move(unoptimized_stmt), false);
587 unoptimized_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result));
588 } catch (Exception &ex) {
589 unoptimized_result->error = ex.what();
590 }
591
592 enable_optimizer = true;
593 if (profiling_is_enabled) {
594 profiler.Enable();
595 }
596
597 // now compare the results
598 // the results of all four runs should be identical
599 if (!original_result->collection.Equals(copied_result->collection)) {
600 string result = "Copied result differs from original result!\n";
601 result += "Original Result:\n" + original_result->ToString();
602 result += "Copied Result\n" + copied_result->ToString();
603 return result;
604 }
605 if (!original_result->collection.Equals(deserialized_result->collection)) {
606 string result = "Deserialized result differs from original result!\n";
607 result += "Original Result:\n" + original_result->ToString();
608 result += "Deserialized Result\n" + deserialized_result->ToString();
609 return result;
610 }
611 if (!original_result->collection.Equals(unoptimized_result->collection)) {
612 string result = "Unoptimized result differs from original result!\n";
613 result += "Original Result:\n" + original_result->ToString();
614 result += "Unoptimized Result\n" + unoptimized_result->ToString();
615 return result;
616 }
617
618 return "";
619}
620
621template <class T> void ClientContext::RunFunctionInTransaction(T &&fun) {
622 lock_guard<mutex> client_guard(context_lock);
623 if (is_invalidated) {
624 throw Exception("Failed: database has been closed!");
625 }
626 if (transaction.HasActiveTransaction() && transaction.ActiveTransaction().is_invalidated) {
627 throw Exception("Failed: transaction has been invalidated!");
628 }
629 // check if we are on AutoCommit. In this case we should start a transaction
630 if (transaction.IsAutoCommit()) {
631 transaction.BeginTransaction();
632 }
633 try {
634 fun();
635 } catch (Exception &ex) {
636 if (transaction.IsAutoCommit()) {
637 transaction.Rollback();
638 } else {
639 transaction.Invalidate();
640 }
641 throw ex;
642 }
643 if (transaction.IsAutoCommit()) {
644 transaction.Commit();
645 }
646}
647
648unique_ptr<TableDescription> ClientContext::TableInfo(string schema_name, string table_name) {
649 unique_ptr<TableDescription> result;
650 RunFunctionInTransaction([&]() {
651 // obtain the table info
652 auto table = db.catalog->GetEntry<TableCatalogEntry>(*this, schema_name, table_name, true);
653 if (!table) {
654 return;
655 }
656 // write the table info to the result
657 result = make_unique<TableDescription>();
658 result->schema = schema_name;
659 result->table = table_name;
660 for (auto &column : table->columns) {
661 result->columns.push_back(ColumnDefinition(column.name, column.type));
662 }
663 });
664 return result;
665}
666
667void ClientContext::Append(TableDescription &description, DataChunk &chunk) {
668 RunFunctionInTransaction([&]() {
669 auto table_entry = db.catalog->GetEntry<TableCatalogEntry>(*this, description.schema, description.table);
670 // verify that the table columns and types match up
671 if (description.columns.size() != table_entry->columns.size()) {
672 throw Exception("Failed to append: table entry has different number of columns!");
673 }
674 for (idx_t i = 0; i < description.columns.size(); i++) {
675 if (description.columns[i].type != table_entry->columns[i].type) {
676 throw Exception("Failed to append: table entry has different number of columns!");
677 }
678 }
679 table_entry->storage->Append(*table_entry, *this, chunk);
680 });
681}
682
683void ClientContext::TryBindRelation(Relation &relation, vector<ColumnDefinition> &result_columns) {
684 RunFunctionInTransaction([&]() {
685 // bind the expressions
686 Binder binder(*this);
687 auto result = relation.Bind(binder);
688 assert(result.names.size() == result.types.size());
689 for (idx_t i = 0; i < result.names.size(); i++) {
690 result_columns.push_back(ColumnDefinition(result.names[i], result.types[i]));
691 }
692 });
693}
694
695unique_ptr<QueryResult> ClientContext::Execute(shared_ptr<Relation> relation) {
696 string query;
697 if (query_verification_enabled) {
698 // run the ToString method of any relation we run, mostly to ensure it doesn't crash
699 relation->ToString();
700 if (relation->IsReadOnly()) {
701 // verify read only statements by running a select statement
702 auto select = make_unique<SelectStatement>();
703 select->node = relation->GetQueryNode();
704 RunStatement(query, move(select), false);
705 }
706 }
707 auto &expected_columns = relation->Columns();
708 auto relation_stmt = make_unique<RelationStatement>(relation);
709 auto result = RunStatement(query, move(relation_stmt), false);
710 // verify that the result types and result names of the query match the expected result types/names
711 if (result->types.size() == expected_columns.size()) {
712 bool mismatch = false;
713 for (idx_t i = 0; i < result->types.size(); i++) {
714 if (result->sql_types[i] != expected_columns[i].type || result->names[i] != expected_columns[i].name) {
715 mismatch = true;
716 break;
717 }
718 }
719 if (!mismatch) {
720 // all is as expected: return the result
721 return result;
722 }
723 }
724 // result mismatch
725 string err_str = "Result mismatch in query!\nExpected the following columns: ";
726 for (idx_t i = 0; i < expected_columns.size(); i++) {
727 err_str += i == 0 ? "[" : ", ";
728 err_str += expected_columns[i].name + " " + SQLTypeToString(expected_columns[i].type);
729 }
730 err_str += "]\nBut result contained the following: ";
731 for (idx_t i = 0; i < result->types.size(); i++) {
732 err_str += i == 0 ? "[" : ", ";
733 err_str += result->names[i] + " " + SQLTypeToString(result->sql_types[i]);
734 }
735 err_str += "]";
736 return make_unique<MaterializedQueryResult>(err_str);
737}
738