| 1 | #include "duckdb/main/appender.hpp" | 
|---|
| 2 |  | 
|---|
| 3 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" | 
|---|
| 4 | #include "duckdb/common/exception.hpp" | 
|---|
| 5 | #include "duckdb/main/connection.hpp" | 
|---|
| 6 | #include "duckdb/main/client_context.hpp" | 
|---|
| 7 | #include "duckdb/main/database.hpp" | 
|---|
| 8 | #include "duckdb/storage/data_table.hpp" | 
|---|
| 9 |  | 
|---|
| 10 | #include "duckdb/common/operator/cast_operators.hpp" | 
|---|
| 11 |  | 
|---|
| 12 | using namespace duckdb; | 
|---|
| 13 | using namespace std; | 
|---|
| 14 |  | 
|---|
| 15 | Appender::Appender(Connection &con, string schema_name, string table_name) : con(con), column(0) { | 
|---|
| 16 | description = con.TableInfo(schema_name, table_name); | 
|---|
| 17 | if (!description) { | 
|---|
| 18 | // table could not be found | 
|---|
| 19 | throw CatalogException( | 
|---|
| 20 | StringUtil::Format( "Table \"%s.%s\" could not be found", schema_name.c_str(), table_name.c_str())); | 
|---|
| 21 | } else { | 
|---|
| 22 | vector<TypeId> types; | 
|---|
| 23 | for (auto &column : description->columns) { | 
|---|
| 24 | types.push_back(GetInternalType(column.type)); | 
|---|
| 25 | } | 
|---|
| 26 | chunk.Initialize(types); | 
|---|
| 27 | con.context->RegisterAppender(this); | 
|---|
| 28 | } | 
|---|
| 29 | } | 
|---|
| 30 |  | 
|---|
| 31 | Appender::Appender(Connection &con, string table_name) : Appender(con, DEFAULT_SCHEMA, table_name) { | 
|---|
| 32 | } | 
|---|
| 33 |  | 
|---|
| 34 | Appender::~Appender() { | 
|---|
| 35 | Close(); | 
|---|
| 36 | } | 
|---|
| 37 |  | 
|---|
| 38 | void Appender::CheckInvalidated() { | 
|---|
| 39 | if (!invalidated_msg.empty()) { | 
|---|
| 40 | throw Exception( "Invalid appender: "+ invalidated_msg); | 
|---|
| 41 | } | 
|---|
| 42 | } | 
|---|
| 43 |  | 
|---|
| 44 | void Appender::BeginRow() { | 
|---|
| 45 | CheckInvalidated(); | 
|---|
| 46 | } | 
|---|
| 47 |  | 
|---|
| 48 | void Appender::EndRow() { | 
|---|
| 49 | CheckInvalidated(); | 
|---|
| 50 | // check that all rows have been appended to | 
|---|
| 51 | if (column != chunk.column_count()) { | 
|---|
| 52 | InvalidateException( "Call to EndRow before all rows have been appended to!"); | 
|---|
| 53 | } | 
|---|
| 54 | column = 0; | 
|---|
| 55 | chunk.SetCardinality(chunk.size() + 1); | 
|---|
| 56 | if (chunk.size() >= STANDARD_VECTOR_SIZE) { | 
|---|
| 57 | Flush(); | 
|---|
| 58 | } | 
|---|
| 59 | } | 
|---|
| 60 |  | 
|---|
| 61 | template <class SRC, class DST> void Appender::AppendValueInternal(Vector &col, SRC input) { | 
|---|
| 62 | FlatVector::GetData<DST>(col)[chunk.size()] = Cast::Operation<SRC, DST>(input); | 
|---|
| 63 | } | 
|---|
| 64 |  | 
|---|
| 65 | void Appender::InvalidateException(string msg) { | 
|---|
| 66 | Invalidate(msg); | 
|---|
| 67 | throw Exception(msg); | 
|---|
| 68 | } | 
|---|
| 69 |  | 
|---|
| 70 | template <class T> void Appender::AppendValueInternal(T input) { | 
|---|
| 71 | CheckInvalidated(); | 
|---|
| 72 | if (column >= chunk.column_count()) { | 
|---|
| 73 | InvalidateException( "Too many appends for chunk!"); | 
|---|
| 74 | } | 
|---|
| 75 | auto &col = chunk.data[column]; | 
|---|
| 76 | switch (col.type) { | 
|---|
| 77 | case TypeId::BOOL: | 
|---|
| 78 | AppendValueInternal<T, bool>(col, input); | 
|---|
| 79 | break; | 
|---|
| 80 | case TypeId::INT8: | 
|---|
| 81 | AppendValueInternal<T, int8_t>(col, input); | 
|---|
| 82 | break; | 
|---|
| 83 | case TypeId::INT16: | 
|---|
| 84 | AppendValueInternal<T, int16_t>(col, input); | 
|---|
| 85 | break; | 
|---|
| 86 | case TypeId::INT32: | 
|---|
| 87 | AppendValueInternal<T, int32_t>(col, input); | 
|---|
| 88 | break; | 
|---|
| 89 | case TypeId::INT64: | 
|---|
| 90 | AppendValueInternal<T, int64_t>(col, input); | 
|---|
| 91 | break; | 
|---|
| 92 | case TypeId::FLOAT: | 
|---|
| 93 | AppendValueInternal<T, float>(col, input); | 
|---|
| 94 | break; | 
|---|
| 95 | case TypeId::DOUBLE: | 
|---|
| 96 | AppendValueInternal<T, double>(col, input); | 
|---|
| 97 | break; | 
|---|
| 98 | default: | 
|---|
| 99 | AppendValue(Value::CreateValue<T>(input)); | 
|---|
| 100 | return; | 
|---|
| 101 | } | 
|---|
| 102 | column++; | 
|---|
| 103 | } | 
|---|
| 104 |  | 
|---|
| 105 | template <> void Appender::Append(bool value) { | 
|---|
| 106 | AppendValueInternal<bool>(value); | 
|---|
| 107 | } | 
|---|
| 108 |  | 
|---|
| 109 | template <> void Appender::Append(int8_t value) { | 
|---|
| 110 | AppendValueInternal<int8_t>(value); | 
|---|
| 111 | } | 
|---|
| 112 |  | 
|---|
| 113 | template <> void Appender::Append(int16_t value) { | 
|---|
| 114 | AppendValueInternal<int16_t>(value); | 
|---|
| 115 | } | 
|---|
| 116 |  | 
|---|
| 117 | template <> void Appender::Append(int32_t value) { | 
|---|
| 118 | AppendValueInternal<int32_t>(value); | 
|---|
| 119 | } | 
|---|
| 120 |  | 
|---|
| 121 | template <> void Appender::Append(int64_t value) { | 
|---|
| 122 | AppendValueInternal<int64_t>(value); | 
|---|
| 123 | } | 
|---|
| 124 |  | 
|---|
| 125 | template <> void Appender::Append(const char *value) { | 
|---|
| 126 | AppendValueInternal<string_t>(string_t(value)); | 
|---|
| 127 | } | 
|---|
| 128 |  | 
|---|
| 129 | template <> void Appender::Append(float value) { | 
|---|
| 130 | if (!Value::FloatIsValid(value)) { | 
|---|
| 131 | InvalidateException( "Float value is out of range!"); | 
|---|
| 132 | } | 
|---|
| 133 | AppendValueInternal<float>(value); | 
|---|
| 134 | } | 
|---|
| 135 |  | 
|---|
| 136 | template <> void Appender::Append(double value) { | 
|---|
| 137 | if (!Value::DoubleIsValid(value)) { | 
|---|
| 138 | InvalidateException( "Double value is out of range!"); | 
|---|
| 139 | } | 
|---|
| 140 | AppendValueInternal<double>(value); | 
|---|
| 141 | } | 
|---|
| 142 |  | 
|---|
| 143 | template <> void Appender::Append(Value value) { | 
|---|
| 144 | if (column >= chunk.column_count()) { | 
|---|
| 145 | InvalidateException( "Too many appends for chunk!"); | 
|---|
| 146 | } | 
|---|
| 147 | AppendValue(move(value)); | 
|---|
| 148 | } | 
|---|
| 149 |  | 
|---|
| 150 | template <> void Appender::Append(nullptr_t value) { | 
|---|
| 151 | if (column >= chunk.column_count()) { | 
|---|
| 152 | InvalidateException( "Too many appends for chunk!"); | 
|---|
| 153 | } | 
|---|
| 154 | auto &col = chunk.data[column++]; | 
|---|
| 155 | FlatVector::SetNull(col, chunk.size(), true); | 
|---|
| 156 | } | 
|---|
| 157 |  | 
|---|
| 158 | void Appender::AppendValue(Value value) { | 
|---|
| 159 | chunk.SetValue(column, chunk.size(), value); | 
|---|
| 160 | column++; | 
|---|
| 161 | } | 
|---|
| 162 |  | 
|---|
| 163 | void Appender::Flush() { | 
|---|
| 164 | CheckInvalidated(); | 
|---|
| 165 | try { | 
|---|
| 166 | // check that all vectors have the same length before appending | 
|---|
| 167 | if (column != 0) { | 
|---|
| 168 | throw Exception( "Failed to Flush appender: incomplete append to row!"); | 
|---|
| 169 | } | 
|---|
| 170 |  | 
|---|
| 171 | if (chunk.size() == 0) { | 
|---|
| 172 | return; | 
|---|
| 173 | } | 
|---|
| 174 | con.Append(*description, chunk); | 
|---|
| 175 | } catch (Exception &ex) { | 
|---|
| 176 | Invalidate(ex.what()); | 
|---|
| 177 | throw ex; | 
|---|
| 178 | } | 
|---|
| 179 | chunk.Reset(); | 
|---|
| 180 | column = 0; | 
|---|
| 181 | } | 
|---|
| 182 |  | 
|---|
| 183 | void Appender::Close() { | 
|---|
| 184 | if (!invalidated_msg.empty()) { | 
|---|
| 185 | return; | 
|---|
| 186 | } | 
|---|
| 187 | if (column == 0 || column == chunk.column_count()) { | 
|---|
| 188 | Flush(); | 
|---|
| 189 | } | 
|---|
| 190 | Invalidate( "The appender has been closed!"); | 
|---|
| 191 | } | 
|---|
| 192 |  | 
|---|
| 193 | void Appender::Invalidate(string msg, bool close) { | 
|---|
| 194 | if (!invalidated_msg.empty()) { | 
|---|
| 195 | return; | 
|---|
| 196 | } | 
|---|
| 197 | if (close) { | 
|---|
| 198 | con.context->RemoveAppender(this); | 
|---|
| 199 | } | 
|---|
| 200 | assert(!msg.empty()); | 
|---|
| 201 | invalidated_msg = msg; | 
|---|
| 202 | } | 
|---|
| 203 |  | 
|---|