1#include "duckdb/main/stream_query_result.hpp"
2
3#include "duckdb/main/client_context.hpp"
4#include "duckdb/main/materialized_query_result.hpp"
5
6using namespace duckdb;
7using namespace std;
8
9StreamQueryResult::StreamQueryResult(StatementType statement_type, ClientContext &context, vector<SQLType> sql_types,
10 vector<TypeId> types, vector<string> names)
11 : QueryResult(QueryResultType::STREAM_RESULT, statement_type, sql_types, types, names), is_open(true),
12 context(context) {
13}
14
15StreamQueryResult::~StreamQueryResult() {
16 Close();
17}
18
19string StreamQueryResult::ToString() {
20 string result;
21 if (success) {
22 result = HeaderToString();
23 result += "[[STREAM RESULT]]";
24 } else {
25 result = "Query Error: " + error + "\n";
26 }
27 return result;
28}
29
30unique_ptr<DataChunk> StreamQueryResult::Fetch() {
31 if (!success || !is_open) {
32 return nullptr;
33 }
34 auto chunk = context.Fetch();
35 if (!chunk || chunk->column_count() == 0 || chunk->size() == 0) {
36 Close();
37 }
38 return chunk;
39}
40
41unique_ptr<MaterializedQueryResult> StreamQueryResult::Materialize() {
42 if (!success) {
43 return make_unique<MaterializedQueryResult>(error);
44 }
45 auto result = make_unique<MaterializedQueryResult>(statement_type, sql_types, types, names);
46 while (true) {
47 auto chunk = Fetch();
48 if (!chunk || chunk->size() == 0) {
49 return result;
50 }
51 result->collection.Append(*chunk);
52 }
53}
54
55void StreamQueryResult::Close() {
56 if (!is_open) {
57 return;
58 }
59 context.Cleanup();
60}
61