| 1 | #include "duckdb/main/connection.hpp" |
| 2 | |
| 3 | #include "duckdb/common/types/column/column_data_collection.hpp" |
| 4 | #include "duckdb/execution/operator/persistent/parallel_csv_reader.hpp" |
| 5 | #include "duckdb/function/table/read_csv.hpp" |
| 6 | #include "duckdb/main/appender.hpp" |
| 7 | #include "duckdb/main/client_context.hpp" |
| 8 | #include "duckdb/main/connection_manager.hpp" |
| 9 | #include "duckdb/main/database.hpp" |
| 10 | #include "duckdb/main/query_profiler.hpp" |
| 11 | #include "duckdb/main/relation/query_relation.hpp" |
| 12 | #include "duckdb/main/relation/read_csv_relation.hpp" |
| 13 | #include "duckdb/main/relation/table_function_relation.hpp" |
| 14 | #include "duckdb/main/relation/table_relation.hpp" |
| 15 | #include "duckdb/main/relation/value_relation.hpp" |
| 16 | #include "duckdb/main/relation/view_relation.hpp" |
| 17 | #include "duckdb/parser/parser.hpp" |
| 18 | #include "duckdb/planner/logical_operator.hpp" |
| 19 | |
| 20 | namespace duckdb { |
| 21 | |
| 22 | Connection::Connection(DatabaseInstance &database) : context(make_shared<ClientContext>(args: database.shared_from_this())) { |
| 23 | ConnectionManager::Get(db&: database).AddConnection(context&: *context); |
| 24 | #ifdef DEBUG |
| 25 | EnableProfiling(); |
| 26 | context->config.emit_profiler_output = false; |
| 27 | #endif |
| 28 | } |
| 29 | |
| 30 | Connection::Connection(DuckDB &database) : Connection(*database.instance) { |
| 31 | } |
| 32 | |
| 33 | Connection::~Connection() { |
| 34 | ConnectionManager::Get(db&: *context->db).RemoveConnection(context&: *context); |
| 35 | } |
| 36 | |
| 37 | string Connection::GetProfilingInformation(ProfilerPrintFormat format) { |
| 38 | auto &profiler = QueryProfiler::Get(context&: *context); |
| 39 | if (format == ProfilerPrintFormat::JSON) { |
| 40 | return profiler.ToJSON(); |
| 41 | } else { |
| 42 | return profiler.QueryTreeToString(); |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | void Connection::Interrupt() { |
| 47 | context->Interrupt(); |
| 48 | } |
| 49 | |
| 50 | void Connection::EnableProfiling() { |
| 51 | context->EnableProfiling(); |
| 52 | } |
| 53 | |
| 54 | void Connection::DisableProfiling() { |
| 55 | context->DisableProfiling(); |
| 56 | } |
| 57 | |
| 58 | void Connection::EnableQueryVerification() { |
| 59 | ClientConfig::GetConfig(context&: *context).query_verification_enabled = true; |
| 60 | } |
| 61 | |
| 62 | void Connection::DisableQueryVerification() { |
| 63 | ClientConfig::GetConfig(context&: *context).query_verification_enabled = false; |
| 64 | } |
| 65 | |
| 66 | void Connection::ForceParallelism() { |
| 67 | ClientConfig::GetConfig(context&: *context).verify_parallelism = true; |
| 68 | } |
| 69 | |
| 70 | unique_ptr<QueryResult> Connection::SendQuery(const string &query) { |
| 71 | return context->Query(query, allow_stream_result: true); |
| 72 | } |
| 73 | |
| 74 | unique_ptr<MaterializedQueryResult> Connection::Query(const string &query) { |
| 75 | auto result = context->Query(query, allow_stream_result: false); |
| 76 | D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); |
| 77 | return unique_ptr_cast<QueryResult, MaterializedQueryResult>(src: std::move(result)); |
| 78 | } |
| 79 | |
| 80 | DUCKDB_API string Connection::GetSubstrait(const string &query) { |
| 81 | vector<Value> params; |
| 82 | params.emplace_back(args: query); |
| 83 | auto result = TableFunction(tname: "get_substrait" , values: params)->Execute(); |
| 84 | auto protobuf = result->FetchRaw()->GetValue(col_idx: 0, index: 0); |
| 85 | return protobuf.GetValueUnsafe<string_t>().GetString(); |
| 86 | } |
| 87 | |
| 88 | DUCKDB_API unique_ptr<QueryResult> Connection::FromSubstrait(const string &proto) { |
| 89 | vector<Value> params; |
| 90 | params.emplace_back(args: Value::BLOB_RAW(data: proto)); |
| 91 | return TableFunction(tname: "from_substrait" , values: params)->Execute(); |
| 92 | } |
| 93 | |
| 94 | DUCKDB_API string Connection::GetSubstraitJSON(const string &query) { |
| 95 | vector<Value> params; |
| 96 | params.emplace_back(args: query); |
| 97 | auto result = TableFunction(tname: "get_substrait_json" , values: params)->Execute(); |
| 98 | auto protobuf = result->FetchRaw()->GetValue(col_idx: 0, index: 0); |
| 99 | return protobuf.GetValueUnsafe<string_t>().GetString(); |
| 100 | } |
| 101 | |
| 102 | DUCKDB_API unique_ptr<QueryResult> Connection::FromSubstraitJSON(const string &json) { |
| 103 | vector<Value> params; |
| 104 | params.emplace_back(args: json); |
| 105 | return TableFunction(tname: "from_substrait_json" , values: params)->Execute(); |
| 106 | } |
| 107 | |
| 108 | unique_ptr<MaterializedQueryResult> Connection::Query(unique_ptr<SQLStatement> statement) { |
| 109 | auto result = context->Query(statement: std::move(statement), allow_stream_result: false); |
| 110 | D_ASSERT(result->type == QueryResultType::MATERIALIZED_RESULT); |
| 111 | return unique_ptr_cast<QueryResult, MaterializedQueryResult>(src: std::move(result)); |
| 112 | } |
| 113 | |
| 114 | unique_ptr<PendingQueryResult> Connection::PendingQuery(const string &query, bool allow_stream_result) { |
| 115 | return context->PendingQuery(query, allow_stream_result); |
| 116 | } |
| 117 | |
| 118 | unique_ptr<PendingQueryResult> Connection::PendingQuery(unique_ptr<SQLStatement> statement, bool allow_stream_result) { |
| 119 | return context->PendingQuery(statement: std::move(statement), allow_stream_result); |
| 120 | } |
| 121 | |
| 122 | unique_ptr<PreparedStatement> Connection::Prepare(const string &query) { |
| 123 | return context->Prepare(query); |
| 124 | } |
| 125 | |
| 126 | unique_ptr<PreparedStatement> Connection::Prepare(unique_ptr<SQLStatement> statement) { |
| 127 | return context->Prepare(statement: std::move(statement)); |
| 128 | } |
| 129 | |
| 130 | unique_ptr<QueryResult> Connection::QueryParamsRecursive(const string &query, vector<Value> &values) { |
| 131 | auto statement = Prepare(query); |
| 132 | if (statement->HasError()) { |
| 133 | return make_uniq<MaterializedQueryResult>(args&: statement->error); |
| 134 | } |
| 135 | return statement->Execute(values, allow_stream_result: false); |
| 136 | } |
| 137 | |
| 138 | unique_ptr<TableDescription> Connection::TableInfo(const string &table_name) { |
| 139 | return TableInfo(INVALID_SCHEMA, table_name); |
| 140 | } |
| 141 | |
| 142 | unique_ptr<TableDescription> Connection::TableInfo(const string &schema_name, const string &table_name) { |
| 143 | return context->TableInfo(schema_name, table_name); |
| 144 | } |
| 145 | |
| 146 | vector<unique_ptr<SQLStatement>> Connection::(const string &query) { |
| 147 | return context->ParseStatements(query); |
| 148 | } |
| 149 | |
| 150 | unique_ptr<LogicalOperator> Connection::(const string &query) { |
| 151 | return context->ExtractPlan(query); |
| 152 | } |
| 153 | |
| 154 | void Connection::Append(TableDescription &description, DataChunk &chunk) { |
| 155 | if (chunk.size() == 0) { |
| 156 | return; |
| 157 | } |
| 158 | ColumnDataCollection collection(Allocator::Get(context&: *context), chunk.GetTypes()); |
| 159 | collection.Append(new_chunk&: chunk); |
| 160 | Append(description, collection); |
| 161 | } |
| 162 | |
| 163 | void Connection::Append(TableDescription &description, ColumnDataCollection &collection) { |
| 164 | context->Append(description, collection); |
| 165 | } |
| 166 | |
| 167 | shared_ptr<Relation> Connection::Table(const string &table_name) { |
| 168 | return Table(DEFAULT_SCHEMA, table_name); |
| 169 | } |
| 170 | |
| 171 | shared_ptr<Relation> Connection::Table(const string &schema_name, const string &table_name) { |
| 172 | auto table_info = TableInfo(schema_name, table_name); |
| 173 | if (!table_info) { |
| 174 | throw CatalogException("Table '%s' does not exist!" , table_name); |
| 175 | } |
| 176 | return make_shared<TableRelation>(args&: context, args: std::move(table_info)); |
| 177 | } |
| 178 | |
| 179 | shared_ptr<Relation> Connection::View(const string &tname) { |
| 180 | return View(DEFAULT_SCHEMA, table_name: tname); |
| 181 | } |
| 182 | |
| 183 | shared_ptr<Relation> Connection::View(const string &schema_name, const string &table_name) { |
| 184 | return make_shared<ViewRelation>(args&: context, args: schema_name, args: table_name); |
| 185 | } |
| 186 | |
| 187 | shared_ptr<Relation> Connection::TableFunction(const string &fname) { |
| 188 | vector<Value> values; |
| 189 | named_parameter_map_t named_parameters; |
| 190 | return TableFunction(tname: fname, values, named_parameters); |
| 191 | } |
| 192 | |
| 193 | shared_ptr<Relation> Connection::TableFunction(const string &fname, const vector<Value> &values, |
| 194 | const named_parameter_map_t &named_parameters) { |
| 195 | return make_shared<TableFunctionRelation>(args&: context, args: fname, args: values, args: named_parameters); |
| 196 | } |
| 197 | |
| 198 | shared_ptr<Relation> Connection::TableFunction(const string &fname, const vector<Value> &values) { |
| 199 | return make_shared<TableFunctionRelation>(args&: context, args: fname, args: values); |
| 200 | } |
| 201 | |
| 202 | shared_ptr<Relation> Connection::Values(const vector<vector<Value>> &values) { |
| 203 | vector<string> column_names; |
| 204 | return Values(values, column_names); |
| 205 | } |
| 206 | |
| 207 | shared_ptr<Relation> Connection::Values(const vector<vector<Value>> &values, const vector<string> &column_names, |
| 208 | const string &alias) { |
| 209 | return make_shared<ValueRelation>(args&: context, args: values, args: column_names, args: alias); |
| 210 | } |
| 211 | |
| 212 | shared_ptr<Relation> Connection::Values(const string &values) { |
| 213 | vector<string> column_names; |
| 214 | return Values(values, column_names); |
| 215 | } |
| 216 | |
| 217 | shared_ptr<Relation> Connection::Values(const string &values, const vector<string> &column_names, const string &alias) { |
| 218 | return make_shared<ValueRelation>(args&: context, args: values, args: column_names, args: alias); |
| 219 | } |
| 220 | |
| 221 | shared_ptr<Relation> Connection::ReadCSV(const string &csv_file) { |
| 222 | BufferedCSVReaderOptions options; |
| 223 | return ReadCSV(csv_file, options); |
| 224 | } |
| 225 | |
| 226 | shared_ptr<Relation> Connection::ReadCSV(const string &csv_file, BufferedCSVReaderOptions &options) { |
| 227 | options.file_path = csv_file; |
| 228 | options.auto_detect = true; |
| 229 | return make_shared<ReadCSVRelation>(args&: context, args: csv_file, args&: options); |
| 230 | } |
| 231 | |
| 232 | shared_ptr<Relation> Connection::ReadCSV(const string &csv_file, const vector<string> &columns) { |
| 233 | // parse columns |
| 234 | vector<ColumnDefinition> column_list; |
| 235 | for (auto &column : columns) { |
| 236 | auto col_list = Parser::ParseColumnList(column_list: column, options: context->GetParserOptions()); |
| 237 | if (col_list.LogicalColumnCount() != 1) { |
| 238 | throw ParserException("Expected a single column definition" ); |
| 239 | } |
| 240 | column_list.push_back(x: std::move(col_list.GetColumnMutable(index: LogicalIndex(0)))); |
| 241 | } |
| 242 | return make_shared<ReadCSVRelation>(args&: context, args: csv_file, args: std::move(column_list)); |
| 243 | } |
| 244 | |
| 245 | shared_ptr<Relation> Connection::ReadParquet(const string &parquet_file, bool binary_as_string) { |
| 246 | vector<Value> params; |
| 247 | params.emplace_back(args: parquet_file); |
| 248 | named_parameter_map_t named_parameters({{"binary_as_string" , Value::BOOLEAN(value: binary_as_string)}}); |
| 249 | return TableFunction(fname: "parquet_scan" , values: params, named_parameters)->Alias(alias: parquet_file); |
| 250 | } |
| 251 | |
| 252 | unordered_set<string> Connection::GetTableNames(const string &query) { |
| 253 | return context->GetTableNames(query); |
| 254 | } |
| 255 | |
| 256 | shared_ptr<Relation> Connection::RelationFromQuery(const string &query, const string &alias, const string &error) { |
| 257 | return RelationFromQuery(select_stmt: QueryRelation::ParseStatement(context&: *context, query, error), alias); |
| 258 | } |
| 259 | |
| 260 | shared_ptr<Relation> Connection::RelationFromQuery(unique_ptr<SelectStatement> select_stmt, const string &alias) { |
| 261 | return make_shared<QueryRelation>(args&: context, args: std::move(select_stmt), args: alias); |
| 262 | } |
| 263 | |
| 264 | void Connection::BeginTransaction() { |
| 265 | auto result = Query(query: "BEGIN TRANSACTION" ); |
| 266 | if (result->HasError()) { |
| 267 | result->ThrowError(); |
| 268 | } |
| 269 | } |
| 270 | |
| 271 | void Connection::Commit() { |
| 272 | auto result = Query(query: "COMMIT" ); |
| 273 | if (result->HasError()) { |
| 274 | result->ThrowError(); |
| 275 | } |
| 276 | } |
| 277 | |
| 278 | void Connection::Rollback() { |
| 279 | auto result = Query(query: "ROLLBACK" ); |
| 280 | if (result->HasError()) { |
| 281 | result->ThrowError(); |
| 282 | } |
| 283 | } |
| 284 | |
| 285 | void Connection::SetAutoCommit(bool auto_commit) { |
| 286 | context->transaction.SetAutoCommit(auto_commit); |
| 287 | } |
| 288 | |
| 289 | bool Connection::IsAutoCommit() { |
| 290 | return context->transaction.IsAutoCommit(); |
| 291 | } |
| 292 | bool Connection::HasActiveTransaction() { |
| 293 | return context->transaction.HasActiveTransaction(); |
| 294 | } |
| 295 | |
| 296 | } // namespace duckdb |
| 297 | |