1#include "sqlite_transfer.hpp"
2
3#include "duckdb/common/types/date.hpp"
4
5using namespace duckdb;
6using namespace std;
7
8namespace sqlite {
9
10bool TransferDatabase(Connection &con, sqlite3 *sqlite) {
11 char *error;
12 // start the SQLite transaction
13 if (sqlite3_exec(sqlite, "BEGIN TRANSACTION", nullptr, nullptr, &error) != SQLITE_OK) {
14 return false;
15 }
16
17 // query the list of tables
18 auto table_list = con.Query("SELECT name, sql FROM sqlite_master();");
19
20 for (size_t i = 0; i < table_list->collection.count; i++) {
21 auto name = table_list->GetValue(0, i).ToString();
22 auto sql = table_list->GetValue(1, i).ToString();
23
24 // for each table, first create the table in sqlite
25 if (sqlite3_exec(sqlite, sql.c_str(), nullptr, nullptr, &error) != SQLITE_OK) {
26 return false;
27 }
28
29 // now transfer the actual data
30 // first get the data from DuckDB
31 auto result = con.Query("SELECT * FROM " + name);
32 // create the prepared statement based on the result
33 stringstream prepared;
34 prepared << "INSERT INTO " << name << " (";
35 for (size_t j = 0; j < result->types.size(); j++) {
36 prepared << result->names[j];
37 if (j + 1 != result->types.size()) {
38 prepared << ",";
39 }
40 }
41 prepared << ") VALUES (";
42 for (size_t j = 0; j < result->types.size(); j++) {
43 prepared << "?";
44 if (j + 1 != result->types.size()) {
45 prepared << ",";
46 }
47 }
48 prepared << ");";
49
50 auto insert_statement = prepared.str();
51 sqlite3_stmt *stmt;
52 if (sqlite3_prepare_v2(sqlite, insert_statement.c_str(), -1, &stmt, nullptr) != SQLITE_OK) {
53 return false;
54 }
55
56 auto &types = result->sql_types;
57 for (size_t k = 0; k < result->collection.count; k++) {
58 int rc = SQLITE_ERROR;
59 for (size_t j = 0; j < types.size(); j++) {
60 size_t bind_index = j + 1;
61 auto value = result->GetValue(j, k);
62 if (value.is_null) {
63 rc = sqlite3_bind_null(stmt, bind_index);
64 } else {
65 // bind based on the type
66 switch (types[j].id) {
67 case SQLTypeId::BOOLEAN:
68 case SQLTypeId::TINYINT:
69 case SQLTypeId::SMALLINT:
70 case SQLTypeId::INTEGER:
71 rc = sqlite3_bind_int(stmt, bind_index, (int)value.GetValue<int64_t>());
72 break;
73 case SQLTypeId::BIGINT:
74 rc = sqlite3_bind_int64(stmt, bind_index, (sqlite3_int64)value.GetValue<int64_t>());
75 break;
76 case SQLTypeId::DATE: {
77 auto date_str = value.ToString() + " 00:00:00";
78 rc = sqlite3_bind_text(stmt, bind_index, date_str.c_str(), -1, SQLITE_TRANSIENT);
79 break;
80 }
81 case SQLTypeId::TIMESTAMP:
82 // TODO
83 throw NotImplementedException("Transferring timestamps is not supported yet");
84 case SQLTypeId::DECIMAL:
85 rc = sqlite3_bind_double(stmt, bind_index, value.value_.double_);
86 break;
87 case SQLTypeId::VARCHAR:
88 rc = sqlite3_bind_text(stmt, bind_index, value.ToString().c_str(), -1, SQLITE_TRANSIENT);
89 break;
90 default:
91 break;
92 }
93 }
94 if (rc != SQLITE_OK) {
95 return false;
96 }
97 }
98 rc = sqlite3_step(stmt);
99 if (rc != SQLITE_DONE) {
100 return false;
101 }
102 if (sqlite3_reset(stmt) != SQLITE_OK) {
103 return false;
104 }
105 }
106 sqlite3_finalize(stmt);
107 }
108 // commit the SQLite transaction
109 if (sqlite3_exec(sqlite, "COMMIT", nullptr, nullptr, &error) != SQLITE_OK) {
110 return false;
111 }
112 return true;
113}
114
115unique_ptr<QueryResult> QueryDatabase(vector<SQLType> result_types, sqlite3 *sqlite, std::string query,
116 volatile int &interrupt) {
117 if (!sqlite) {
118 return nullptr;
119 }
120 // prepare the SQL statement
121 sqlite3_stmt *stmt;
122 if (sqlite3_prepare_v2(sqlite, query.c_str(), -1, &stmt, nullptr) != SQLITE_OK) {
123 return make_unique<MaterializedQueryResult>(sqlite3_errmsg(sqlite));
124 }
125 int col_count = sqlite3_column_count(stmt);
126 vector<string> names;
127 for (int i = 0; i < col_count; i++) {
128 names.push_back(sqlite3_column_name(stmt, i));
129 }
130 // figure out the types of the columns
131 // construct the types of the result
132 vector<TypeId> typeids;
133 for (auto &tp : result_types) {
134 typeids.push_back(GetInternalType(tp));
135 }
136
137 // construct the result
138 auto result = make_unique<MaterializedQueryResult>(StatementType::SELECT_STATEMENT, result_types, typeids, std::move(names));
139 DataChunk result_chunk;
140 result_chunk.Initialize(typeids);
141 int rc = SQLITE_ERROR;
142 while ((rc = sqlite3_step(stmt)) == SQLITE_ROW && interrupt == 0) {
143 // get the value for each of the columns
144 idx_t result_idx = result_chunk.size();
145 for (int i = 0; i < col_count; i++) {
146 if (sqlite3_column_type(stmt, i) == SQLITE_NULL) {
147 // NULL value
148 FlatVector::Nullmask(result_chunk.data[i])[result_idx] = true;
149 } else {
150 auto dataptr = FlatVector::GetData(result_chunk.data[i]);
151 // normal value, convert type
152 switch (result_types[i].id) {
153 case SQLTypeId::BOOLEAN:
154 ((int8_t *)dataptr)[result_idx] = sqlite3_column_int(stmt, i) == 0 ? 0 : 1;
155 break;
156 case SQLTypeId::TINYINT:
157 ((int8_t *)dataptr)[result_idx] = (int8_t)sqlite3_column_int(stmt, i);
158 break;
159 case SQLTypeId::SMALLINT:
160 ((int16_t *)dataptr)[result_idx] = (int16_t)sqlite3_column_int(stmt, i);
161 break;
162 case SQLTypeId::INTEGER:
163 ((int32_t *)dataptr)[result_idx] = (int32_t)sqlite3_column_int(stmt, i);
164 break;
165 case SQLTypeId::BIGINT:
166 ((int64_t *)dataptr)[result_idx] = (int64_t)sqlite3_column_int64(stmt, i);
167 break;
168 case SQLTypeId::DECIMAL:
169 ((double *)dataptr)[result_idx] = (double)sqlite3_column_double(stmt, i);
170 break;
171 case SQLTypeId::VARCHAR: {
172 Value result((char *)sqlite3_column_text(stmt, i));
173 result_chunk.SetValue(i, result_idx, result);
174 break;
175 }
176 case SQLTypeId::DATE: {
177 auto unix_time = sqlite3_column_int64(stmt, i);
178 ((date_t *)dataptr)[result_idx] = Date::EpochToDate(unix_time);
179 break;
180 }
181 default:
182 throw NotImplementedException("Unimplemented type for SQLite -> DuckDB type conversion");
183 }
184 }
185 }
186 result_chunk.SetCardinality(result_idx + 1);
187 if (result_chunk.size() == STANDARD_VECTOR_SIZE) {
188 // chunk is filled
189 // flush the chunk to the result
190 result->collection.Append(result_chunk);
191 result_chunk.Reset();
192 }
193 }
194 if (rc != SQLITE_DONE) {
195 // failed
196 return nullptr;
197 }
198 if (result_chunk.size() > 0) {
199 // final append of any leftover data
200 result->collection.Append(result_chunk);
201 result_chunk.Reset();
202 }
203 sqlite3_finalize(stmt);
204 return move(result);
205}
206
207}; // namespace sqlite
208