1#include "duckdb/main/stream_query_result.hpp"
2
3#include "duckdb/main/client_context.hpp"
4#include "duckdb/main/materialized_query_result.hpp"
5#include "duckdb/common/box_renderer.hpp"
6
7namespace duckdb {
8
9StreamQueryResult::StreamQueryResult(StatementType statement_type, StatementProperties properties,
10 shared_ptr<ClientContext> context_p, vector<LogicalType> types,
11 vector<string> names)
12 : QueryResult(QueryResultType::STREAM_RESULT, statement_type, std::move(properties), std::move(types),
13 std::move(names), context_p->GetClientProperties()),
14 context(std::move(context_p)) {
15 D_ASSERT(context);
16}
17
18StreamQueryResult::~StreamQueryResult() {
19}
20
21string StreamQueryResult::ToString() {
22 string result;
23 if (success) {
24 result = HeaderToString();
25 result += "[[STREAM RESULT]]";
26 } else {
27 result = GetError() + "\n";
28 }
29 return result;
30}
31
32unique_ptr<ClientContextLock> StreamQueryResult::LockContext() {
33 if (!context) {
34 string error_str = "Attempting to execute an unsuccessful or closed pending query result";
35 if (HasError()) {
36 error_str += StringUtil::Format(fmt_str: "\nError: %s", params: GetError());
37 }
38 throw InvalidInputException(error_str);
39 }
40 return context->LockContext();
41}
42
43void StreamQueryResult::CheckExecutableInternal(ClientContextLock &lock) {
44 if (!IsOpenInternal(lock)) {
45 string error_str = "Attempting to execute an unsuccessful or closed pending query result";
46 if (HasError()) {
47 error_str += StringUtil::Format(fmt_str: "\nError: %s", params: GetError());
48 }
49 throw InvalidInputException(error_str);
50 }
51}
52
53unique_ptr<DataChunk> StreamQueryResult::FetchRaw() {
54 unique_ptr<DataChunk> chunk;
55 {
56 auto lock = LockContext();
57 CheckExecutableInternal(lock&: *lock);
58 chunk = context->Fetch(lock&: *lock, result&: *this);
59 }
60 if (!chunk || chunk->ColumnCount() == 0 || chunk->size() == 0) {
61 Close();
62 return nullptr;
63 }
64 return chunk;
65}
66
67unique_ptr<MaterializedQueryResult> StreamQueryResult::Materialize() {
68 if (HasError() || !context) {
69 return make_uniq<MaterializedQueryResult>(args&: GetErrorObject());
70 }
71 auto collection = make_uniq<ColumnDataCollection>(args&: Allocator::DefaultAllocator(), args&: types);
72
73 ColumnDataAppendState append_state;
74 collection->InitializeAppend(state&: append_state);
75 while (true) {
76 auto chunk = Fetch();
77 if (!chunk || chunk->size() == 0) {
78 break;
79 }
80 collection->Append(state&: append_state, new_chunk&: *chunk);
81 }
82 auto result =
83 make_uniq<MaterializedQueryResult>(args&: statement_type, args&: properties, args&: names, args: std::move(collection), args&: client_properties);
84 if (HasError()) {
85 return make_uniq<MaterializedQueryResult>(args&: GetErrorObject());
86 }
87 return result;
88}
89
90bool StreamQueryResult::IsOpenInternal(ClientContextLock &lock) {
91 bool invalidated = !success || !context;
92 if (!invalidated) {
93 invalidated = !context->IsActiveResult(lock, result: this);
94 }
95 return !invalidated;
96}
97
98bool StreamQueryResult::IsOpen() {
99 if (!success || !context) {
100 return false;
101 }
102 auto lock = LockContext();
103 return IsOpenInternal(lock&: *lock);
104}
105
106void StreamQueryResult::Close() {
107 context.reset();
108}
109
110} // namespace duckdb
111