1 | #include "duckdb/main/client_context.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
4 | #include "duckdb/common/serializer/buffered_deserializer.hpp" |
5 | #include "duckdb/common/serializer/buffered_serializer.hpp" |
6 | #include "duckdb/execution/physical_plan_generator.hpp" |
7 | #include "duckdb/main/database.hpp" |
8 | #include "duckdb/main/materialized_query_result.hpp" |
9 | #include "duckdb/main/query_result.hpp" |
10 | #include "duckdb/main/stream_query_result.hpp" |
11 | #include "duckdb/optimizer/optimizer.hpp" |
12 | #include "duckdb/parser/parser.hpp" |
13 | #include "duckdb/parser/expression/constant_expression.hpp" |
14 | #include "duckdb/parser/statement/drop_statement.hpp" |
15 | #include "duckdb/parser/statement/execute_statement.hpp" |
16 | #include "duckdb/parser/statement/explain_statement.hpp" |
17 | #include "duckdb/parser/statement/prepare_statement.hpp" |
18 | #include "duckdb/planner/operator/logical_execute.hpp" |
19 | #include "duckdb/planner/planner.hpp" |
20 | #include "duckdb/transaction/transaction_manager.hpp" |
21 | #include "duckdb/transaction/transaction.hpp" |
22 | #include "duckdb/storage/data_table.hpp" |
23 | #include "duckdb/main/appender.hpp" |
24 | #include "duckdb/main/relation.hpp" |
25 | #include "duckdb/planner/expression_binder/where_binder.hpp" |
26 | #include "duckdb/parser/statement/relation_statement.hpp" |
27 | |
28 | using namespace duckdb; |
29 | using namespace std; |
30 | |
31 | ClientContext::ClientContext(DuckDB &database) |
32 | : db(database), transaction(*database.transaction_manager), interrupted(false), catalog(*database.catalog), |
33 | temporary_objects(make_unique<SchemaCatalogEntry>(db.catalog.get(), TEMP_SCHEMA)), |
34 | prepared_statements(make_unique<CatalogSet>(*db.catalog)), open_result(nullptr) { |
35 | random_device rd; |
36 | random_engine.seed(rd()); |
37 | } |
38 | |
39 | void ClientContext::Cleanup() { |
40 | lock_guard<mutex> client_guard(context_lock); |
41 | if (is_invalidated || !prepared_statements) { |
42 | return; |
43 | } |
44 | if (transaction.HasActiveTransaction()) { |
45 | ActiveTransaction().active_query = MAXIMUM_QUERY_ID; |
46 | if (!transaction.IsAutoCommit()) { |
47 | transaction.Rollback(); |
48 | } |
49 | } |
50 | assert(prepared_statements); |
51 | db.transaction_manager->AddCatalogSet(*this, move(prepared_statements)); |
52 | // invalidate any prepared statements |
53 | for (auto &statement : prepared_statement_objects) { |
54 | statement->is_invalidated = true; |
55 | } |
56 | for (auto &appender : appenders) { |
57 | appender->Invalidate("Connection has been closed!" , false); |
58 | } |
59 | CleanupInternal(); |
60 | } |
61 | |
62 | void ClientContext::RegisterAppender(Appender *appender) { |
63 | lock_guard<mutex> client_guard(context_lock); |
64 | if (is_invalidated) { |
65 | throw Exception("Database that this connection belongs to has been closed!" ); |
66 | } |
67 | appenders.insert(appender); |
68 | } |
69 | |
70 | void ClientContext::RemoveAppender(Appender *appender) { |
71 | lock_guard<mutex> client_guard(context_lock); |
72 | if (is_invalidated) { |
73 | return; |
74 | } |
75 | appenders.erase(appender); |
76 | } |
77 | |
78 | unique_ptr<DataChunk> ClientContext::Fetch() { |
79 | lock_guard<mutex> client_guard(context_lock); |
80 | if (!open_result) { |
81 | // no result to fetch from |
82 | return nullptr; |
83 | } |
84 | if (is_invalidated) { |
85 | // ClientContext is invalidated: database has been closed |
86 | open_result->error = "Database that this connection belongs to has been closed!" ; |
87 | open_result->success = false; |
88 | return nullptr; |
89 | } |
90 | try { |
91 | // fetch the chunk and return it |
92 | auto chunk = FetchInternal(); |
93 | return chunk; |
94 | } catch (Exception &ex) { |
95 | open_result->error = ex.what(); |
96 | } catch (...) { |
97 | open_result->error = "Unhandled exception in Fetch" ; |
98 | } |
99 | open_result->success = false; |
100 | CleanupInternal(); |
101 | return nullptr; |
102 | } |
103 | |
104 | string ClientContext::FinalizeQuery(bool success) { |
105 | profiler.EndQuery(); |
106 | |
107 | execution_context.Reset(); |
108 | |
109 | string error; |
110 | if (transaction.HasActiveTransaction()) { |
111 | ActiveTransaction().active_query = MAXIMUM_QUERY_ID; |
112 | try { |
113 | if (transaction.IsAutoCommit()) { |
114 | if (success) { |
115 | // query was successful: commit |
116 | transaction.Commit(); |
117 | } else { |
118 | // query was unsuccessful: rollback |
119 | transaction.Rollback(); |
120 | } |
121 | } |
122 | } catch (Exception &ex) { |
123 | error = ex.what(); |
124 | } catch (...) { |
125 | error = "Unhandled exception!" ; |
126 | } |
127 | } |
128 | return error; |
129 | } |
130 | |
131 | void ClientContext::CleanupInternal() { |
132 | if (!open_result) { |
133 | // no result currently open |
134 | return; |
135 | } |
136 | |
137 | auto error = FinalizeQuery(open_result->success); |
138 | if (open_result->success) { |
139 | // if an error occurred while committing report it in the result |
140 | open_result->error = error; |
141 | open_result->success = error.empty(); |
142 | } |
143 | |
144 | open_result->is_open = false; |
145 | open_result = nullptr; |
146 | } |
147 | |
148 | unique_ptr<DataChunk> ClientContext::FetchInternal() { |
149 | assert(execution_context.physical_plan); |
150 | auto chunk = make_unique<DataChunk>(); |
151 | // run the plan to get the next chunks |
152 | execution_context.physical_plan->InitializeChunk(*chunk); |
153 | execution_context.physical_plan->GetChunk(*this, *chunk, execution_context.physical_state.get()); |
154 | return chunk; |
155 | } |
156 | |
157 | unique_ptr<PreparedStatementData> ClientContext::CreatePreparedStatement(const string &query, |
158 | unique_ptr<SQLStatement> statement) { |
159 | StatementType statement_type = statement->type; |
160 | auto result = make_unique<PreparedStatementData>(statement_type); |
161 | |
162 | profiler.StartPhase("planner" ); |
163 | Planner planner(*this); |
164 | planner.CreatePlan(move(statement)); |
165 | assert(planner.plan); |
166 | profiler.EndPhase(); |
167 | |
168 | auto plan = move(planner.plan); |
169 | // extract the result column names from the plan |
170 | result->read_only = planner.read_only; |
171 | result->requires_valid_transaction = planner.requires_valid_transaction; |
172 | result->names = planner.names; |
173 | result->sql_types = planner.sql_types; |
174 | result->value_map = move(planner.value_map); |
175 | |
176 | #ifdef DEBUG |
177 | if (enable_optimizer) { |
178 | #endif |
179 | profiler.StartPhase("optimizer" ); |
180 | Optimizer optimizer(planner.binder, *this); |
181 | plan = optimizer.Optimize(move(plan)); |
182 | assert(plan); |
183 | profiler.EndPhase(); |
184 | #ifdef DEBUG |
185 | } |
186 | #endif |
187 | |
188 | profiler.StartPhase("physical_planner" ); |
189 | // now convert logical query plan into a physical query plan |
190 | PhysicalPlanGenerator physical_planner(*this); |
191 | auto physical_plan = physical_planner.CreatePlan(move(plan)); |
192 | profiler.EndPhase(); |
193 | |
194 | result->dependencies = move(physical_planner.dependencies); |
195 | result->types = physical_plan->types; |
196 | result->plan = move(physical_plan); |
197 | return result; |
198 | } |
199 | |
200 | unique_ptr<QueryResult> ClientContext::ExecutePreparedStatement(const string &query, PreparedStatementData &statement, |
201 | vector<Value> bound_values, bool allow_stream_result) { |
202 | if (ActiveTransaction().is_invalidated && statement.requires_valid_transaction) { |
203 | throw Exception("Current transaction is aborted (please ROLLBACK)" ); |
204 | } |
205 | if (db.access_mode == AccessMode::READ_ONLY && !statement.read_only) { |
206 | throw Exception(StringUtil::Format("Cannot execute statement of type \"%s\" in read-only mode!" , |
207 | StatementTypeToString(statement.statement_type).c_str())); |
208 | } |
209 | |
210 | // bind the bound values before execution |
211 | statement.Bind(move(bound_values)); |
212 | |
213 | bool create_stream_result = statement.statement_type == StatementType::SELECT_STATEMENT && allow_stream_result; |
214 | |
215 | // store the physical plan in the context for calls to Fetch() |
216 | execution_context.physical_plan = move(statement.plan); |
217 | execution_context.physical_state = execution_context.physical_plan->GetOperatorState(); |
218 | |
219 | auto types = execution_context.physical_plan->GetTypes(); |
220 | assert(types.size() == statement.sql_types.size()); |
221 | |
222 | if (create_stream_result) { |
223 | // successfully compiled SELECT clause and it is the last statement |
224 | // return a StreamQueryResult so the client can call Fetch() on it and stream the result |
225 | return make_unique<StreamQueryResult>(statement.statement_type, *this, statement.sql_types, types, |
226 | statement.names); |
227 | } |
228 | // create a materialized result by continuously fetching |
229 | auto result = |
230 | make_unique<MaterializedQueryResult>(statement.statement_type, statement.sql_types, types, statement.names); |
231 | while (true) { |
232 | auto chunk = FetchInternal(); |
233 | if (chunk->size() == 0) { |
234 | break; |
235 | } |
236 | #ifdef DEBUG |
237 | for (idx_t i = 0; i < chunk->column_count(); i++) { |
238 | if (statement.sql_types[i].id == SQLTypeId::VARCHAR) { |
239 | chunk->data[i].UTFVerify(chunk->size()); |
240 | } |
241 | } |
242 | #endif |
243 | result->collection.Append(*chunk); |
244 | } |
245 | return move(result); |
246 | } |
247 | |
248 | void ClientContext::InitialCleanup() { |
249 | if (is_invalidated) { |
250 | throw Exception("Database that this connection belongs to has been closed!" ); |
251 | } |
252 | //! Cleanup any open results and reset the interrupted flag |
253 | CleanupInternal(); |
254 | interrupted = false; |
255 | } |
256 | |
257 | unique_ptr<PreparedStatement> ClientContext::Prepare(string query) { |
258 | lock_guard<mutex> client_guard(context_lock); |
259 | // prepare the query |
260 | try { |
261 | InitialCleanup(); |
262 | |
263 | // first parse the query |
264 | Parser parser; |
265 | parser.ParseQuery(query.c_str()); |
266 | if (parser.statements.size() == 0) { |
267 | throw Exception("No statement to prepare!" ); |
268 | } |
269 | if (parser.statements.size() > 1) { |
270 | throw Exception("Cannot prepare multiple statements at once!" ); |
271 | } |
272 | // now write the prepared statement data into the catalog |
273 | string prepare_name = "____duckdb_internal_prepare_" + to_string(prepare_count); |
274 | prepare_count++; |
275 | // create a prepare statement out of the underlying statement |
276 | auto prepare = make_unique<PrepareStatement>(); |
277 | prepare->name = prepare_name; |
278 | prepare->statement = move(parser.statements[0]); |
279 | |
280 | // now perform the actual PREPARE query |
281 | auto result = RunStatement(query, move(prepare), false); |
282 | if (!result->success) { |
283 | throw Exception(result->error); |
284 | } |
285 | auto prepared_catalog = (PreparedStatementCatalogEntry *)prepared_statements->GetRootEntry(prepare_name); |
286 | auto prepared_object = make_unique<PreparedStatement>(this, prepare_name, query, *prepared_catalog->prepared, |
287 | parser.n_prepared_parameters); |
288 | prepared_statement_objects.insert(prepared_object.get()); |
289 | return prepared_object; |
290 | } catch (Exception &ex) { |
291 | return make_unique<PreparedStatement>(ex.what()); |
292 | } |
293 | } |
294 | |
295 | unique_ptr<QueryResult> ClientContext::Execute(string name, vector<Value> &values, bool allow_stream_result, |
296 | string query) { |
297 | lock_guard<mutex> client_guard(context_lock); |
298 | try { |
299 | InitialCleanup(); |
300 | } catch (std::exception &ex) { |
301 | return make_unique<MaterializedQueryResult>(ex.what()); |
302 | } |
303 | |
304 | // create the execute statement |
305 | auto execute = make_unique<ExecuteStatement>(); |
306 | execute->name = name; |
307 | for (auto &val : values) { |
308 | execute->values.push_back(make_unique<ConstantExpression>(val.GetSQLType(), val)); |
309 | } |
310 | |
311 | return RunStatement(query, move(execute), allow_stream_result); |
312 | } |
313 | void ClientContext::RemovePreparedStatement(PreparedStatement *statement) { |
314 | lock_guard<mutex> client_guard(context_lock); |
315 | if (!statement->success || statement->is_invalidated || is_invalidated) { |
316 | return; |
317 | } |
318 | try { |
319 | InitialCleanup(); |
320 | } catch (...) { |
321 | return; |
322 | } |
323 | // erase the object from the list of prepared statements |
324 | prepared_statement_objects.erase(statement); |
325 | // drop it from the catalog |
326 | auto deallocate_statement = make_unique<DropStatement>(); |
327 | deallocate_statement->info->type = CatalogType::PREPARED_STATEMENT; |
328 | deallocate_statement->info->name = statement->name; |
329 | string query = "DEALLOCATE " + statement->name; |
330 | RunStatement(query, move(deallocate_statement), false); |
331 | } |
332 | |
333 | unique_ptr<QueryResult> ClientContext::RunStatementInternal(const string &query, unique_ptr<SQLStatement> statement, |
334 | bool allow_stream_result) { |
335 | // prepare the query for execution |
336 | auto prepared = CreatePreparedStatement(query, move(statement)); |
337 | // by default, no values are bound |
338 | vector<Value> bound_values; |
339 | // execute the prepared statement |
340 | return ExecutePreparedStatement(query, *prepared, move(bound_values), allow_stream_result); |
341 | } |
342 | |
343 | unique_ptr<QueryResult> ClientContext::RunStatement(const string &query, unique_ptr<SQLStatement> statement, |
344 | bool allow_stream_result) { |
345 | unique_ptr<QueryResult> result; |
346 | // check if we are on AutoCommit. In this case we should start a transaction. |
347 | if (transaction.IsAutoCommit()) { |
348 | transaction.BeginTransaction(); |
349 | } |
350 | ActiveTransaction().active_query = db.transaction_manager->GetQueryNumber(); |
351 | if (statement->type == StatementType::SELECT_STATEMENT && query_verification_enabled) { |
352 | // query verification is enabled: |
353 | // create a copy of the statement and verify the original statement |
354 | auto copied_statement = ((SelectStatement &)*statement).Copy(); |
355 | string error = VerifyQuery(query, move(statement)); |
356 | if (!error.empty()) { |
357 | // query failed: abort now |
358 | FinalizeQuery(false); |
359 | // error in verifying query |
360 | return make_unique<MaterializedQueryResult>(error); |
361 | } |
362 | statement = move(copied_statement); |
363 | } |
364 | // start the profiler |
365 | profiler.StartQuery(query, *statement); |
366 | try { |
367 | result = RunStatementInternal(query, move(statement), allow_stream_result); |
368 | } catch (StandardException &ex) { |
369 | // standard exceptions do not invalidate the current transaction |
370 | result = make_unique<MaterializedQueryResult>(ex.what()); |
371 | } catch (std::exception &ex) { |
372 | // other types of exceptions do invalidate the current transaction |
373 | if (transaction.HasActiveTransaction()) { |
374 | ActiveTransaction().is_invalidated = true; |
375 | } |
376 | result = make_unique<MaterializedQueryResult>(ex.what()); |
377 | } |
378 | if (!result->success) { |
379 | // initial failures should always be reported as MaterializedResult |
380 | assert(result->type != QueryResultType::STREAM_RESULT); |
381 | // query failed: abort now |
382 | FinalizeQuery(false); |
383 | return result; |
384 | } |
385 | // query succeeded, append to list of results |
386 | if (result->type == QueryResultType::STREAM_RESULT) { |
387 | // store as currently open result if it is a stream result |
388 | this->open_result = (StreamQueryResult *)result.get(); |
389 | } else { |
390 | // finalize the query if it is not a stream result |
391 | string error = FinalizeQuery(true); |
392 | if (!error.empty()) { |
393 | // failure in committing transaction |
394 | return make_unique<MaterializedQueryResult>(error); |
395 | } |
396 | } |
397 | return result; |
398 | } |
399 | |
400 | unique_ptr<QueryResult> ClientContext::RunStatements(const string &query, vector<unique_ptr<SQLStatement>> &statements, |
401 | bool allow_stream_result) { |
402 | // now we have a list of statements |
403 | // iterate over them and execute them one by one |
404 | unique_ptr<QueryResult> result; |
405 | QueryResult *last_result = nullptr; |
406 | for (idx_t i = 0; i < statements.size(); i++) { |
407 | auto &statement = statements[i]; |
408 | bool is_last_statement = i + 1 == statements.size(); |
409 | auto current_result = RunStatement(query, move(statement), allow_stream_result && is_last_statement); |
410 | // now append the result to the list of results |
411 | if (!last_result) { |
412 | // first result of the query |
413 | result = move(current_result); |
414 | last_result = result.get(); |
415 | } else { |
416 | // later results; attach to the result chain |
417 | last_result->next = move(current_result); |
418 | last_result = last_result->next.get(); |
419 | } |
420 | } |
421 | return result; |
422 | } |
423 | |
424 | unique_ptr<QueryResult> ClientContext::Query(string query, bool allow_stream_result) { |
425 | lock_guard<mutex> client_guard(context_lock); |
426 | |
427 | Parser parser; |
428 | try { |
429 | InitialCleanup(); |
430 | // parse the query and transform it into a set of statements |
431 | parser.ParseQuery(query.c_str()); |
432 | } catch (std::exception &ex) { |
433 | return make_unique<MaterializedQueryResult>(ex.what()); |
434 | } |
435 | |
436 | if (parser.statements.size() == 0) { |
437 | // no statements, return empty successful result |
438 | return make_unique<MaterializedQueryResult>(StatementType::INVALID_STATEMENT); |
439 | } |
440 | |
441 | return RunStatements(query, parser.statements, allow_stream_result); |
442 | } |
443 | |
444 | void ClientContext::Interrupt() { |
445 | interrupted = true; |
446 | } |
447 | |
448 | void ClientContext::EnableProfiling() { |
449 | lock_guard<mutex> client_guard(context_lock); |
450 | profiler.Enable(); |
451 | } |
452 | |
453 | void ClientContext::DisableProfiling() { |
454 | lock_guard<mutex> client_guard(context_lock); |
455 | profiler.Disable(); |
456 | } |
457 | |
458 | void ClientContext::Invalidate() { |
459 | // interrupt any running query before attempting to obtain the lock |
460 | // this way we don't have to wait for the entire query to finish |
461 | Interrupt(); |
462 | // now obtain the context lock |
463 | lock_guard<mutex> client_guard(context_lock); |
464 | // invalidate this context and the TransactionManager |
465 | is_invalidated = true; |
466 | transaction.Invalidate(); |
467 | // also close any open result |
468 | if (open_result) { |
469 | open_result->is_open = false; |
470 | } |
471 | // and close any open appenders and prepared statements |
472 | for (auto &statement : prepared_statement_objects) { |
473 | statement->is_invalidated = true; |
474 | } |
475 | for (auto &appender : appenders) { |
476 | appender->Invalidate("Database that this appender belongs to has been closed!" , false); |
477 | } |
478 | appenders.clear(); |
479 | } |
480 | |
481 | string ClientContext::VerifyQuery(string query, unique_ptr<SQLStatement> statement) { |
482 | assert(statement->type == StatementType::SELECT_STATEMENT); |
483 | // aggressive query verification |
484 | |
485 | // the purpose of this function is to test correctness of otherwise hard to test features: |
486 | // Copy() of statements and expressions |
487 | // Serialize()/Deserialize() of expressions |
488 | // Hash() of expressions |
489 | // Equality() of statements and expressions |
490 | // Correctness of plans both with and without optimizers |
491 | |
492 | // copy the statement |
493 | auto select_stmt = (SelectStatement *)statement.get(); |
494 | auto copied_stmt = select_stmt->Copy(); |
495 | auto unoptimized_stmt = select_stmt->Copy(); |
496 | |
497 | BufferedSerializer serializer; |
498 | select_stmt->Serialize(serializer); |
499 | BufferedDeserializer source(serializer); |
500 | auto deserialized_stmt = SelectStatement::Deserialize(source); |
501 | // all the statements should be equal |
502 | assert(copied_stmt->Equals(statement.get())); |
503 | assert(deserialized_stmt->Equals(statement.get())); |
504 | assert(copied_stmt->Equals(deserialized_stmt.get())); |
505 | |
506 | // now perform checking on the expressions |
507 | #ifdef DEBUG |
508 | auto &orig_expr_list = select_stmt->node->GetSelectList(); |
509 | auto &de_expr_list = deserialized_stmt->node->GetSelectList(); |
510 | auto &cp_expr_list = copied_stmt->node->GetSelectList(); |
511 | assert(orig_expr_list.size() == de_expr_list.size() && cp_expr_list.size() == de_expr_list.size()); |
512 | for (idx_t i = 0; i < orig_expr_list.size(); i++) { |
513 | // run the ToString, to verify that it doesn't crash |
514 | orig_expr_list[i]->ToString(); |
515 | // check that the expressions are equivalent |
516 | assert(orig_expr_list[i]->Equals(de_expr_list[i].get())); |
517 | assert(orig_expr_list[i]->Equals(cp_expr_list[i].get())); |
518 | assert(de_expr_list[i]->Equals(cp_expr_list[i].get())); |
519 | // check that the hashes are equivalent too |
520 | assert(orig_expr_list[i]->Hash() == de_expr_list[i]->Hash()); |
521 | assert(orig_expr_list[i]->Hash() == cp_expr_list[i]->Hash()); |
522 | } |
523 | // now perform additional checking within the expressions |
524 | for (idx_t outer_idx = 0; outer_idx < orig_expr_list.size(); outer_idx++) { |
525 | auto hash = orig_expr_list[outer_idx]->Hash(); |
526 | for (idx_t inner_idx = 0; inner_idx < orig_expr_list.size(); inner_idx++) { |
527 | auto hash2 = orig_expr_list[inner_idx]->Hash(); |
528 | if (hash != hash2) { |
529 | // if the hashes are not equivalent, the expressions should not be equivalent |
530 | assert(!orig_expr_list[outer_idx]->Equals(orig_expr_list[inner_idx].get())); |
531 | } |
532 | } |
533 | } |
534 | #endif |
535 | |
536 | // disable profiling if it is enabled |
537 | bool profiling_is_enabled = profiler.IsEnabled(); |
538 | if (profiling_is_enabled) { |
539 | profiler.Disable(); |
540 | } |
541 | |
542 | // see below |
543 | auto statement_copy_for_explain = select_stmt->Copy(); |
544 | |
545 | auto original_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT), |
546 | copied_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT), |
547 | deserialized_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT), |
548 | unoptimized_result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT); |
549 | // execute the original statement |
550 | try { |
551 | auto result = RunStatementInternal(query, move(statement), false); |
552 | original_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result)); |
553 | } catch (Exception &ex) { |
554 | original_result->error = ex.what(); |
555 | original_result->success = false; |
556 | } |
557 | |
558 | // check explain, only if q does not already contain EXPLAIN |
559 | if (original_result->success) { |
560 | auto explain_q = "EXPLAIN " + query; |
561 | auto explain_stmt = make_unique<ExplainStatement>(move(statement_copy_for_explain)); |
562 | try { |
563 | RunStatementInternal(explain_q, move(explain_stmt), false); |
564 | } catch (std::exception &ex) { |
565 | return "EXPLAIN failed but query did not (" + string(ex.what()) + ")" ; |
566 | } |
567 | } |
568 | |
569 | // now execute the copied statement |
570 | try { |
571 | auto result = RunStatementInternal(query, move(copied_stmt), false); |
572 | copied_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result)); |
573 | } catch (Exception &ex) { |
574 | copied_result->error = ex.what(); |
575 | } |
576 | // now execute the deserialized statement |
577 | try { |
578 | auto result = RunStatementInternal(query, move(deserialized_stmt), false); |
579 | deserialized_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result)); |
580 | } catch (Exception &ex) { |
581 | deserialized_result->error = ex.what(); |
582 | } |
583 | // now execute the unoptimized statement |
584 | enable_optimizer = false; |
585 | try { |
586 | auto result = RunStatementInternal(query, move(unoptimized_stmt), false); |
587 | unoptimized_result = unique_ptr_cast<QueryResult, MaterializedQueryResult>(move(result)); |
588 | } catch (Exception &ex) { |
589 | unoptimized_result->error = ex.what(); |
590 | } |
591 | |
592 | enable_optimizer = true; |
593 | if (profiling_is_enabled) { |
594 | profiler.Enable(); |
595 | } |
596 | |
597 | // now compare the results |
598 | // the results of all four runs should be identical |
599 | if (!original_result->collection.Equals(copied_result->collection)) { |
600 | string result = "Copied result differs from original result!\n" ; |
601 | result += "Original Result:\n" + original_result->ToString(); |
602 | result += "Copied Result\n" + copied_result->ToString(); |
603 | return result; |
604 | } |
605 | if (!original_result->collection.Equals(deserialized_result->collection)) { |
606 | string result = "Deserialized result differs from original result!\n" ; |
607 | result += "Original Result:\n" + original_result->ToString(); |
608 | result += "Deserialized Result\n" + deserialized_result->ToString(); |
609 | return result; |
610 | } |
611 | if (!original_result->collection.Equals(unoptimized_result->collection)) { |
612 | string result = "Unoptimized result differs from original result!\n" ; |
613 | result += "Original Result:\n" + original_result->ToString(); |
614 | result += "Unoptimized Result\n" + unoptimized_result->ToString(); |
615 | return result; |
616 | } |
617 | |
618 | return "" ; |
619 | } |
620 | |
621 | template <class T> void ClientContext::RunFunctionInTransaction(T &&fun) { |
622 | lock_guard<mutex> client_guard(context_lock); |
623 | if (is_invalidated) { |
624 | throw Exception("Failed: database has been closed!" ); |
625 | } |
626 | if (transaction.HasActiveTransaction() && transaction.ActiveTransaction().is_invalidated) { |
627 | throw Exception("Failed: transaction has been invalidated!" ); |
628 | } |
629 | // check if we are on AutoCommit. In this case we should start a transaction |
630 | if (transaction.IsAutoCommit()) { |
631 | transaction.BeginTransaction(); |
632 | } |
633 | try { |
634 | fun(); |
635 | } catch (Exception &ex) { |
636 | if (transaction.IsAutoCommit()) { |
637 | transaction.Rollback(); |
638 | } else { |
639 | transaction.Invalidate(); |
640 | } |
641 | throw ex; |
642 | } |
643 | if (transaction.IsAutoCommit()) { |
644 | transaction.Commit(); |
645 | } |
646 | } |
647 | |
648 | unique_ptr<TableDescription> ClientContext::TableInfo(string schema_name, string table_name) { |
649 | unique_ptr<TableDescription> result; |
650 | RunFunctionInTransaction([&]() { |
651 | // obtain the table info |
652 | auto table = db.catalog->GetEntry<TableCatalogEntry>(*this, schema_name, table_name, true); |
653 | if (!table) { |
654 | return; |
655 | } |
656 | // write the table info to the result |
657 | result = make_unique<TableDescription>(); |
658 | result->schema = schema_name; |
659 | result->table = table_name; |
660 | for (auto &column : table->columns) { |
661 | result->columns.push_back(ColumnDefinition(column.name, column.type)); |
662 | } |
663 | }); |
664 | return result; |
665 | } |
666 | |
667 | void ClientContext::Append(TableDescription &description, DataChunk &chunk) { |
668 | RunFunctionInTransaction([&]() { |
669 | auto table_entry = db.catalog->GetEntry<TableCatalogEntry>(*this, description.schema, description.table); |
670 | // verify that the table columns and types match up |
671 | if (description.columns.size() != table_entry->columns.size()) { |
672 | throw Exception("Failed to append: table entry has different number of columns!" ); |
673 | } |
674 | for (idx_t i = 0; i < description.columns.size(); i++) { |
675 | if (description.columns[i].type != table_entry->columns[i].type) { |
676 | throw Exception("Failed to append: table entry has different number of columns!" ); |
677 | } |
678 | } |
679 | table_entry->storage->Append(*table_entry, *this, chunk); |
680 | }); |
681 | } |
682 | |
683 | void ClientContext::TryBindRelation(Relation &relation, vector<ColumnDefinition> &result_columns) { |
684 | RunFunctionInTransaction([&]() { |
685 | // bind the expressions |
686 | Binder binder(*this); |
687 | auto result = relation.Bind(binder); |
688 | assert(result.names.size() == result.types.size()); |
689 | for (idx_t i = 0; i < result.names.size(); i++) { |
690 | result_columns.push_back(ColumnDefinition(result.names[i], result.types[i])); |
691 | } |
692 | }); |
693 | } |
694 | |
695 | unique_ptr<QueryResult> ClientContext::Execute(shared_ptr<Relation> relation) { |
696 | string query; |
697 | if (query_verification_enabled) { |
698 | // run the ToString method of any relation we run, mostly to ensure it doesn't crash |
699 | relation->ToString(); |
700 | if (relation->IsReadOnly()) { |
701 | // verify read only statements by running a select statement |
702 | auto select = make_unique<SelectStatement>(); |
703 | select->node = relation->GetQueryNode(); |
704 | RunStatement(query, move(select), false); |
705 | } |
706 | } |
707 | auto &expected_columns = relation->Columns(); |
708 | auto relation_stmt = make_unique<RelationStatement>(relation); |
709 | auto result = RunStatement(query, move(relation_stmt), false); |
710 | // verify that the result types and result names of the query match the expected result types/names |
711 | if (result->types.size() == expected_columns.size()) { |
712 | bool mismatch = false; |
713 | for (idx_t i = 0; i < result->types.size(); i++) { |
714 | if (result->sql_types[i] != expected_columns[i].type || result->names[i] != expected_columns[i].name) { |
715 | mismatch = true; |
716 | break; |
717 | } |
718 | } |
719 | if (!mismatch) { |
720 | // all is as expected: return the result |
721 | return result; |
722 | } |
723 | } |
724 | // result mismatch |
725 | string err_str = "Result mismatch in query!\nExpected the following columns: " ; |
726 | for (idx_t i = 0; i < expected_columns.size(); i++) { |
727 | err_str += i == 0 ? "[" : ", " ; |
728 | err_str += expected_columns[i].name + " " + SQLTypeToString(expected_columns[i].type); |
729 | } |
730 | err_str += "]\nBut result contained the following: " ; |
731 | for (idx_t i = 0; i < result->types.size(); i++) { |
732 | err_str += i == 0 ? "[" : ", " ; |
733 | err_str += result->names[i] + " " + SQLTypeToString(result->sql_types[i]); |
734 | } |
735 | err_str += "]" ; |
736 | return make_unique<MaterializedQueryResult>(err_str); |
737 | } |
738 | |