1 | #include "duckdb/main/appender.hpp" |
2 | |
3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
4 | #include "duckdb/common/exception.hpp" |
5 | #include "duckdb/main/connection.hpp" |
6 | #include "duckdb/main/client_context.hpp" |
7 | #include "duckdb/main/database.hpp" |
8 | #include "duckdb/storage/data_table.hpp" |
9 | |
10 | #include "duckdb/common/operator/cast_operators.hpp" |
11 | |
12 | using namespace duckdb; |
13 | using namespace std; |
14 | |
15 | Appender::Appender(Connection &con, string schema_name, string table_name) : con(con), column(0) { |
16 | description = con.TableInfo(schema_name, table_name); |
17 | if (!description) { |
18 | // table could not be found |
19 | throw CatalogException( |
20 | StringUtil::Format("Table \"%s.%s\" could not be found" , schema_name.c_str(), table_name.c_str())); |
21 | } else { |
22 | vector<TypeId> types; |
23 | for (auto &column : description->columns) { |
24 | types.push_back(GetInternalType(column.type)); |
25 | } |
26 | chunk.Initialize(types); |
27 | con.context->RegisterAppender(this); |
28 | } |
29 | } |
30 | |
31 | Appender::Appender(Connection &con, string table_name) : Appender(con, DEFAULT_SCHEMA, table_name) { |
32 | } |
33 | |
34 | Appender::~Appender() { |
35 | Close(); |
36 | } |
37 | |
38 | void Appender::CheckInvalidated() { |
39 | if (!invalidated_msg.empty()) { |
40 | throw Exception("Invalid appender: " + invalidated_msg); |
41 | } |
42 | } |
43 | |
44 | void Appender::BeginRow() { |
45 | CheckInvalidated(); |
46 | } |
47 | |
48 | void Appender::EndRow() { |
49 | CheckInvalidated(); |
50 | // check that all rows have been appended to |
51 | if (column != chunk.column_count()) { |
52 | InvalidateException("Call to EndRow before all rows have been appended to!" ); |
53 | } |
54 | column = 0; |
55 | chunk.SetCardinality(chunk.size() + 1); |
56 | if (chunk.size() >= STANDARD_VECTOR_SIZE) { |
57 | Flush(); |
58 | } |
59 | } |
60 | |
61 | template <class SRC, class DST> void Appender::AppendValueInternal(Vector &col, SRC input) { |
62 | FlatVector::GetData<DST>(col)[chunk.size()] = Cast::Operation<SRC, DST>(input); |
63 | } |
64 | |
65 | void Appender::InvalidateException(string msg) { |
66 | Invalidate(msg); |
67 | throw Exception(msg); |
68 | } |
69 | |
70 | template <class T> void Appender::AppendValueInternal(T input) { |
71 | CheckInvalidated(); |
72 | if (column >= chunk.column_count()) { |
73 | InvalidateException("Too many appends for chunk!" ); |
74 | } |
75 | auto &col = chunk.data[column]; |
76 | switch (col.type) { |
77 | case TypeId::BOOL: |
78 | AppendValueInternal<T, bool>(col, input); |
79 | break; |
80 | case TypeId::INT8: |
81 | AppendValueInternal<T, int8_t>(col, input); |
82 | break; |
83 | case TypeId::INT16: |
84 | AppendValueInternal<T, int16_t>(col, input); |
85 | break; |
86 | case TypeId::INT32: |
87 | AppendValueInternal<T, int32_t>(col, input); |
88 | break; |
89 | case TypeId::INT64: |
90 | AppendValueInternal<T, int64_t>(col, input); |
91 | break; |
92 | case TypeId::FLOAT: |
93 | AppendValueInternal<T, float>(col, input); |
94 | break; |
95 | case TypeId::DOUBLE: |
96 | AppendValueInternal<T, double>(col, input); |
97 | break; |
98 | default: |
99 | AppendValue(Value::CreateValue<T>(input)); |
100 | return; |
101 | } |
102 | column++; |
103 | } |
104 | |
105 | template <> void Appender::Append(bool value) { |
106 | AppendValueInternal<bool>(value); |
107 | } |
108 | |
109 | template <> void Appender::Append(int8_t value) { |
110 | AppendValueInternal<int8_t>(value); |
111 | } |
112 | |
113 | template <> void Appender::Append(int16_t value) { |
114 | AppendValueInternal<int16_t>(value); |
115 | } |
116 | |
117 | template <> void Appender::Append(int32_t value) { |
118 | AppendValueInternal<int32_t>(value); |
119 | } |
120 | |
121 | template <> void Appender::Append(int64_t value) { |
122 | AppendValueInternal<int64_t>(value); |
123 | } |
124 | |
125 | template <> void Appender::Append(const char *value) { |
126 | AppendValueInternal<string_t>(string_t(value)); |
127 | } |
128 | |
129 | template <> void Appender::Append(float value) { |
130 | if (!Value::FloatIsValid(value)) { |
131 | InvalidateException("Float value is out of range!" ); |
132 | } |
133 | AppendValueInternal<float>(value); |
134 | } |
135 | |
136 | template <> void Appender::Append(double value) { |
137 | if (!Value::DoubleIsValid(value)) { |
138 | InvalidateException("Double value is out of range!" ); |
139 | } |
140 | AppendValueInternal<double>(value); |
141 | } |
142 | |
143 | template <> void Appender::Append(Value value) { |
144 | if (column >= chunk.column_count()) { |
145 | InvalidateException("Too many appends for chunk!" ); |
146 | } |
147 | AppendValue(move(value)); |
148 | } |
149 | |
150 | template <> void Appender::Append(nullptr_t value) { |
151 | if (column >= chunk.column_count()) { |
152 | InvalidateException("Too many appends for chunk!" ); |
153 | } |
154 | auto &col = chunk.data[column++]; |
155 | FlatVector::SetNull(col, chunk.size(), true); |
156 | } |
157 | |
158 | void Appender::AppendValue(Value value) { |
159 | chunk.SetValue(column, chunk.size(), value); |
160 | column++; |
161 | } |
162 | |
163 | void Appender::Flush() { |
164 | CheckInvalidated(); |
165 | try { |
166 | // check that all vectors have the same length before appending |
167 | if (column != 0) { |
168 | throw Exception("Failed to Flush appender: incomplete append to row!" ); |
169 | } |
170 | |
171 | if (chunk.size() == 0) { |
172 | return; |
173 | } |
174 | con.Append(*description, chunk); |
175 | } catch (Exception &ex) { |
176 | Invalidate(ex.what()); |
177 | throw ex; |
178 | } |
179 | chunk.Reset(); |
180 | column = 0; |
181 | } |
182 | |
183 | void Appender::Close() { |
184 | if (!invalidated_msg.empty()) { |
185 | return; |
186 | } |
187 | if (column == 0 || column == chunk.column_count()) { |
188 | Flush(); |
189 | } |
190 | Invalidate("The appender has been closed!" ); |
191 | } |
192 | |
193 | void Appender::Invalidate(string msg, bool close) { |
194 | if (!invalidated_msg.empty()) { |
195 | return; |
196 | } |
197 | if (close) { |
198 | con.context->RemoveAppender(this); |
199 | } |
200 | assert(!msg.empty()); |
201 | invalidated_msg = msg; |
202 | } |
203 | |