| 1 | #include "duckdb/main/appender.hpp" |
| 2 | |
| 3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
| 4 | #include "duckdb/common/exception.hpp" |
| 5 | #include "duckdb/main/connection.hpp" |
| 6 | #include "duckdb/main/client_context.hpp" |
| 7 | #include "duckdb/main/database.hpp" |
| 8 | #include "duckdb/storage/data_table.hpp" |
| 9 | |
| 10 | #include "duckdb/common/operator/cast_operators.hpp" |
| 11 | |
| 12 | using namespace duckdb; |
| 13 | using namespace std; |
| 14 | |
| 15 | Appender::Appender(Connection &con, string schema_name, string table_name) : con(con), column(0) { |
| 16 | description = con.TableInfo(schema_name, table_name); |
| 17 | if (!description) { |
| 18 | // table could not be found |
| 19 | throw CatalogException( |
| 20 | StringUtil::Format("Table \"%s.%s\" could not be found" , schema_name.c_str(), table_name.c_str())); |
| 21 | } else { |
| 22 | vector<TypeId> types; |
| 23 | for (auto &column : description->columns) { |
| 24 | types.push_back(GetInternalType(column.type)); |
| 25 | } |
| 26 | chunk.Initialize(types); |
| 27 | con.context->RegisterAppender(this); |
| 28 | } |
| 29 | } |
| 30 | |
| 31 | Appender::Appender(Connection &con, string table_name) : Appender(con, DEFAULT_SCHEMA, table_name) { |
| 32 | } |
| 33 | |
| 34 | Appender::~Appender() { |
| 35 | Close(); |
| 36 | } |
| 37 | |
| 38 | void Appender::CheckInvalidated() { |
| 39 | if (!invalidated_msg.empty()) { |
| 40 | throw Exception("Invalid appender: " + invalidated_msg); |
| 41 | } |
| 42 | } |
| 43 | |
| 44 | void Appender::BeginRow() { |
| 45 | CheckInvalidated(); |
| 46 | } |
| 47 | |
| 48 | void Appender::EndRow() { |
| 49 | CheckInvalidated(); |
| 50 | // check that all rows have been appended to |
| 51 | if (column != chunk.column_count()) { |
| 52 | InvalidateException("Call to EndRow before all rows have been appended to!" ); |
| 53 | } |
| 54 | column = 0; |
| 55 | chunk.SetCardinality(chunk.size() + 1); |
| 56 | if (chunk.size() >= STANDARD_VECTOR_SIZE) { |
| 57 | Flush(); |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | template <class SRC, class DST> void Appender::AppendValueInternal(Vector &col, SRC input) { |
| 62 | FlatVector::GetData<DST>(col)[chunk.size()] = Cast::Operation<SRC, DST>(input); |
| 63 | } |
| 64 | |
| 65 | void Appender::InvalidateException(string msg) { |
| 66 | Invalidate(msg); |
| 67 | throw Exception(msg); |
| 68 | } |
| 69 | |
| 70 | template <class T> void Appender::AppendValueInternal(T input) { |
| 71 | CheckInvalidated(); |
| 72 | if (column >= chunk.column_count()) { |
| 73 | InvalidateException("Too many appends for chunk!" ); |
| 74 | } |
| 75 | auto &col = chunk.data[column]; |
| 76 | switch (col.type) { |
| 77 | case TypeId::BOOL: |
| 78 | AppendValueInternal<T, bool>(col, input); |
| 79 | break; |
| 80 | case TypeId::INT8: |
| 81 | AppendValueInternal<T, int8_t>(col, input); |
| 82 | break; |
| 83 | case TypeId::INT16: |
| 84 | AppendValueInternal<T, int16_t>(col, input); |
| 85 | break; |
| 86 | case TypeId::INT32: |
| 87 | AppendValueInternal<T, int32_t>(col, input); |
| 88 | break; |
| 89 | case TypeId::INT64: |
| 90 | AppendValueInternal<T, int64_t>(col, input); |
| 91 | break; |
| 92 | case TypeId::FLOAT: |
| 93 | AppendValueInternal<T, float>(col, input); |
| 94 | break; |
| 95 | case TypeId::DOUBLE: |
| 96 | AppendValueInternal<T, double>(col, input); |
| 97 | break; |
| 98 | default: |
| 99 | AppendValue(Value::CreateValue<T>(input)); |
| 100 | return; |
| 101 | } |
| 102 | column++; |
| 103 | } |
| 104 | |
| 105 | template <> void Appender::Append(bool value) { |
| 106 | AppendValueInternal<bool>(value); |
| 107 | } |
| 108 | |
| 109 | template <> void Appender::Append(int8_t value) { |
| 110 | AppendValueInternal<int8_t>(value); |
| 111 | } |
| 112 | |
| 113 | template <> void Appender::Append(int16_t value) { |
| 114 | AppendValueInternal<int16_t>(value); |
| 115 | } |
| 116 | |
| 117 | template <> void Appender::Append(int32_t value) { |
| 118 | AppendValueInternal<int32_t>(value); |
| 119 | } |
| 120 | |
| 121 | template <> void Appender::Append(int64_t value) { |
| 122 | AppendValueInternal<int64_t>(value); |
| 123 | } |
| 124 | |
| 125 | template <> void Appender::Append(const char *value) { |
| 126 | AppendValueInternal<string_t>(string_t(value)); |
| 127 | } |
| 128 | |
| 129 | template <> void Appender::Append(float value) { |
| 130 | if (!Value::FloatIsValid(value)) { |
| 131 | InvalidateException("Float value is out of range!" ); |
| 132 | } |
| 133 | AppendValueInternal<float>(value); |
| 134 | } |
| 135 | |
| 136 | template <> void Appender::Append(double value) { |
| 137 | if (!Value::DoubleIsValid(value)) { |
| 138 | InvalidateException("Double value is out of range!" ); |
| 139 | } |
| 140 | AppendValueInternal<double>(value); |
| 141 | } |
| 142 | |
| 143 | template <> void Appender::Append(Value value) { |
| 144 | if (column >= chunk.column_count()) { |
| 145 | InvalidateException("Too many appends for chunk!" ); |
| 146 | } |
| 147 | AppendValue(move(value)); |
| 148 | } |
| 149 | |
| 150 | template <> void Appender::Append(nullptr_t value) { |
| 151 | if (column >= chunk.column_count()) { |
| 152 | InvalidateException("Too many appends for chunk!" ); |
| 153 | } |
| 154 | auto &col = chunk.data[column++]; |
| 155 | FlatVector::SetNull(col, chunk.size(), true); |
| 156 | } |
| 157 | |
| 158 | void Appender::AppendValue(Value value) { |
| 159 | chunk.SetValue(column, chunk.size(), value); |
| 160 | column++; |
| 161 | } |
| 162 | |
| 163 | void Appender::Flush() { |
| 164 | CheckInvalidated(); |
| 165 | try { |
| 166 | // check that all vectors have the same length before appending |
| 167 | if (column != 0) { |
| 168 | throw Exception("Failed to Flush appender: incomplete append to row!" ); |
| 169 | } |
| 170 | |
| 171 | if (chunk.size() == 0) { |
| 172 | return; |
| 173 | } |
| 174 | con.Append(*description, chunk); |
| 175 | } catch (Exception &ex) { |
| 176 | Invalidate(ex.what()); |
| 177 | throw ex; |
| 178 | } |
| 179 | chunk.Reset(); |
| 180 | column = 0; |
| 181 | } |
| 182 | |
| 183 | void Appender::Close() { |
| 184 | if (!invalidated_msg.empty()) { |
| 185 | return; |
| 186 | } |
| 187 | if (column == 0 || column == chunk.column_count()) { |
| 188 | Flush(); |
| 189 | } |
| 190 | Invalidate("The appender has been closed!" ); |
| 191 | } |
| 192 | |
| 193 | void Appender::Invalidate(string msg, bool close) { |
| 194 | if (!invalidated_msg.empty()) { |
| 195 | return; |
| 196 | } |
| 197 | if (close) { |
| 198 | con.context->RemoveAppender(this); |
| 199 | } |
| 200 | assert(!msg.empty()); |
| 201 | invalidated_msg = msg; |
| 202 | } |
| 203 | |