1 | #include "duckdb/main/stream_query_result.hpp" |
2 | |
3 | #include "duckdb/main/client_context.hpp" |
4 | #include "duckdb/main/materialized_query_result.hpp" |
5 | |
6 | using namespace duckdb; |
7 | using namespace std; |
8 | |
9 | StreamQueryResult::StreamQueryResult(StatementType statement_type, ClientContext &context, vector<SQLType> sql_types, |
10 | vector<TypeId> types, vector<string> names) |
11 | : QueryResult(QueryResultType::STREAM_RESULT, statement_type, sql_types, types, names), is_open(true), |
12 | context(context) { |
13 | } |
14 | |
15 | StreamQueryResult::~StreamQueryResult() { |
16 | Close(); |
17 | } |
18 | |
19 | string StreamQueryResult::ToString() { |
20 | string result; |
21 | if (success) { |
22 | result = HeaderToString(); |
23 | result += "[[STREAM RESULT]]" ; |
24 | } else { |
25 | result = "Query Error: " + error + "\n" ; |
26 | } |
27 | return result; |
28 | } |
29 | |
30 | unique_ptr<DataChunk> StreamQueryResult::Fetch() { |
31 | if (!success || !is_open) { |
32 | return nullptr; |
33 | } |
34 | auto chunk = context.Fetch(); |
35 | if (!chunk || chunk->column_count() == 0 || chunk->size() == 0) { |
36 | Close(); |
37 | } |
38 | return chunk; |
39 | } |
40 | |
41 | unique_ptr<MaterializedQueryResult> StreamQueryResult::Materialize() { |
42 | if (!success) { |
43 | return make_unique<MaterializedQueryResult>(error); |
44 | } |
45 | auto result = make_unique<MaterializedQueryResult>(statement_type, sql_types, types, names); |
46 | while (true) { |
47 | auto chunk = Fetch(); |
48 | if (!chunk || chunk->size() == 0) { |
49 | return result; |
50 | } |
51 | result->collection.Append(*chunk); |
52 | } |
53 | } |
54 | |
55 | void StreamQueryResult::Close() { |
56 | if (!is_open) { |
57 | return; |
58 | } |
59 | context.Cleanup(); |
60 | } |
61 | |