1#include "catch.hpp"
2#include "test_helpers.hpp"
3
4#include <chrono>
5#include <thread>
6
7using namespace duckdb;
8using namespace std;
9
10TEST_CASE("Test using connection after database is gone", "[api]") {
11 auto db = make_unique<DuckDB>(nullptr);
12 auto conn = make_unique<Connection>(*db);
13 // check that the connection works
14 auto result = conn->Query("SELECT 42");
15 REQUIRE(CHECK_COLUMN(result, 0, {42}));
16 // destroy the database
17 db.reset();
18 // try to use the connection
19 REQUIRE_FAIL(conn->Query("SELECT 42"));
20
21 // now try it with an open transaction
22 db = make_unique<DuckDB>(nullptr);
23 conn = make_unique<Connection>(*db);
24
25 REQUIRE_NO_FAIL(conn->Query("BEGIN TRANSACTION"));
26 result = conn->Query("SELECT 42");
27 REQUIRE(CHECK_COLUMN(result, 0, {42}));
28
29 db.reset();
30
31 REQUIRE_FAIL(conn->Query("SELECT 42"));
32}
33
34TEST_CASE("Test destroying connections with open transactions", "[api]") {
35 auto db = make_unique<DuckDB>(nullptr);
36 {
37 Connection con(*db);
38 con.Query("BEGIN TRANSACTION");
39 con.Query("CREATE TABLE test(i INTEGER);");
40 }
41
42 auto conn = make_unique<Connection>(*db);
43 REQUIRE_NO_FAIL(conn->Query("CREATE TABLE test(i INTEGER)"));
44}
45
46static void long_running_query(Connection *conn, bool *correct) {
47 *correct = true;
48 auto result = conn->Query("SELECT i1.i FROM integers i1, integers i2, integers i3, integers i4, integers i5, "
49 "integers i6, integers i7, integers i8, integers i9, integers i10,"
50 "integers i11, integers i12, integers i13");
51 // the query should fail
52 *correct = !result->success;
53}
54
55TEST_CASE("Test closing database during long running query", "[api]") {
56 auto db = make_unique<DuckDB>(nullptr);
57 auto conn = make_unique<Connection>(*db);
58 // create the database
59 REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)"));
60 REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)"));
61 conn->DisableProfiling();
62 // perform a long running query in the background (many cross products)
63 bool correct;
64 auto background_thread = thread(long_running_query, conn.get(), &correct);
65 // wait a little bit
66 std::this_thread::sleep_for(std::chrono::milliseconds(100));
67 // destroy the database
68 db.reset();
69 // wait for the thread
70 background_thread.join();
71 REQUIRE(correct);
72 // try to use the connection
73 REQUIRE_FAIL(conn->Query("SELECT 42"));
74}
75
76TEST_CASE("Test closing result after database is gone", "[api]") {
77 auto db = make_unique<DuckDB>(nullptr);
78 auto conn = make_unique<Connection>(*db);
79 // check that the connection works
80 auto result = conn->Query("SELECT 42");
81 REQUIRE(CHECK_COLUMN(result, 0, {42}));
82 // destroy the database
83 db.reset();
84 conn.reset();
85 result.reset();
86
87 // now the streaming result
88 db = make_unique<DuckDB>(nullptr);
89 conn = make_unique<Connection>(*db);
90 // check that the connection works
91 auto streaming_result = conn->SendQuery("SELECT 42");
92 // destroy the database
93 db.reset();
94 conn.reset();
95 streaming_result.reset();
96}
97
98TEST_CASE("Test closing database with open prepared statements", "[api]") {
99 auto db = make_unique<DuckDB>(nullptr);
100 auto conn = make_unique<Connection>(*db);
101
102 auto p1 = conn->Prepare("CREATE TABLE a (i INTEGER)");
103 p1->Execute();
104 auto p2 = conn->Prepare("INSERT INTO a VALUES (42)");
105 p2->Execute();
106
107 db.reset();
108 conn.reset();
109}
110
111static void parallel_query(Connection *conn, bool *correct, size_t threadnr) {
112 correct[threadnr] = true;
113 for (size_t i = 0; i < 100; i++) {
114 auto result = conn->Query("SELECT * FROM integers ORDER BY i");
115 if (!CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})) {
116 correct[threadnr] = false;
117 }
118 }
119}
120
121TEST_CASE("Test parallel usage of single client", "[api][.]") {
122 auto db = make_unique<DuckDB>(nullptr);
123 auto conn = make_unique<Connection>(*db);
124
125 REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)"));
126 REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)"));
127
128 bool correct[20];
129 thread threads[20];
130 for (size_t i = 0; i < 20; i++) {
131 threads[i] = thread(parallel_query, conn.get(), correct, i);
132 }
133 for (size_t i = 0; i < 20; i++) {
134 threads[i].join();
135 REQUIRE(correct[i]);
136 }
137}
138
139static void parallel_query_with_new_connection(DuckDB *db, bool *correct, size_t threadnr) {
140 correct[threadnr] = true;
141 for (size_t i = 0; i < 100; i++) {
142 auto conn = make_unique<Connection>(*db);
143 auto result = conn->Query("SELECT * FROM integers ORDER BY i");
144 if (!CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})) {
145 correct[threadnr] = false;
146 }
147 }
148}
149
150TEST_CASE("Test making and dropping connections in parallel to a single database", "[api][.]") {
151 auto db = make_unique<DuckDB>(nullptr);
152 auto conn = make_unique<Connection>(*db);
153
154 REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)"));
155 REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)"));
156
157 bool correct[20];
158 thread threads[20];
159 for (size_t i = 0; i < 20; i++) {
160 threads[i] = thread(parallel_query_with_new_connection, db.get(), correct, i);
161 }
162 for (size_t i = 0; i < 100; i++) {
163 auto result = conn->Query("SELECT * FROM integers ORDER BY i");
164 REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3}));
165 }
166 for (size_t i = 0; i < 20; i++) {
167 threads[i].join();
168 REQUIRE(correct[i]);
169 }
170 auto result = conn->Query("SELECT * FROM integers ORDER BY i");
171 REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3}));
172}
173
174TEST_CASE("Test multiple result sets", "[api]") {
175 unique_ptr<QueryResult> result;
176 DuckDB db(nullptr);
177 Connection con(db);
178 con.EnableQueryVerification();
179
180 result = con.Query("SELECT 42; SELECT 84");
181 REQUIRE(CHECK_COLUMN(result, 0, {42}));
182 result = move(result->next);
183 REQUIRE(CHECK_COLUMN(result, 0, {84}));
184 REQUIRE(!result->next);
185
186 // also with stream api
187 result = con.SendQuery("SELECT 42; SELECT 84");
188 REQUIRE(CHECK_COLUMN(result, 0, {42}));
189 result = move(result->next);
190 REQUIRE(CHECK_COLUMN(result, 0, {84}));
191 REQUIRE(!result->next);
192}
193
194TEST_CASE("Test fetch API", "[api]") {
195 DuckDB db(nullptr);
196 Connection con(db);
197 con.EnableQueryVerification();
198
199 unique_ptr<QueryResult> result;
200
201 result = con.SendQuery("CREATE TABLE test (a INTEGER);");
202
203 result = con.Query("select a from test where 1 <> 1");
204 REQUIRE(CHECK_COLUMN(result, 0, {}));
205
206 result = con.SendQuery("INSERT INTO test VALUES (42)");
207 result = con.SendQuery("SELECT a from test");
208 REQUIRE(CHECK_COLUMN(result, 0, {42}));
209
210 auto materialized_result = con.Query("select a from test");
211 REQUIRE(CHECK_COLUMN(materialized_result, 0, {42}));
212
213 // override fetch result
214 result = con.SendQuery("SELECT a from test");
215 result = con.SendQuery("SELECT a from test");
216 result = con.SendQuery("SELECT a from test");
217 result = con.SendQuery("SELECT a from test");
218 REQUIRE(CHECK_COLUMN(result, 0, {42}));
219}
220
221TEST_CASE("Test fetch API robustness", "[api]") {
222 auto db = make_unique<DuckDB>(nullptr);
223 auto conn = make_unique<Connection>(*db);
224
225 // remove connection with active stream result
226 auto result = conn->SendQuery("SELECT 42");
227 // close the connection
228 conn.reset();
229 // now try to fetch a chunk, this should return a nullptr
230 auto chunk = result->Fetch();
231 REQUIRE(!chunk);
232
233 // now close the entire database
234 conn = make_unique<Connection>(*db);
235 result = conn->SendQuery("SELECT 42");
236
237 db.reset();
238 // fetch should fail
239 chunk = result->Fetch();
240 REQUIRE(!chunk);
241 // new queries on the connection should fail as well
242 REQUIRE_FAIL(conn->SendQuery("SELECT 42"));
243
244 // override fetch result
245 db = make_unique<DuckDB>(nullptr);
246 conn = make_unique<Connection>(*db);
247 auto result1 = conn->SendQuery("SELECT 42");
248 auto result2 = conn->SendQuery("SELECT 84");
249 REQUIRE_NO_FAIL(*result1);
250 REQUIRE_NO_FAIL(*result2);
251
252 // result1 should be closed now
253 REQUIRE(!result1->Fetch());
254 // result2 should work
255 REQUIRE(result2->Fetch());
256
257 // test materialize
258 result1 = conn->SendQuery("SELECT 42");
259 REQUIRE(result1->type == QueryResultType::STREAM_RESULT);
260 auto materialized = ((StreamQueryResult &)*result1).Materialize();
261 result2 = conn->SendQuery("SELECT 84");
262
263 // we can read materialized still, even after opening a new result
264 REQUIRE(CHECK_COLUMN(materialized, 0, {42}));
265 REQUIRE(CHECK_COLUMN(result2, 0, {84}));
266}
267
268static void VerifyStreamResult(unique_ptr<QueryResult> result) {
269 REQUIRE(result->types[0] == TypeId::INT32);
270 size_t current_row = 0;
271 size_t current_expected_value = 0;
272 size_t expected_rows = 500 * 5;
273 while (true) {
274 auto chunk = result->Fetch();
275 if (chunk->size() == 0) {
276 break;
277 }
278 auto col1_data = FlatVector::GetData<int>(chunk->data[0]);
279 for (size_t k = 0; k < chunk->size(); k++) {
280 if (current_row % 500 == 0) {
281 current_expected_value++;
282 }
283 REQUIRE(col1_data[k] == current_expected_value);
284 current_row++;
285 }
286 }
287 REQUIRE(current_row == expected_rows);
288}
289
290TEST_CASE("Test fetch API with big results", "[api][.]") {
291 DuckDB db(nullptr);
292 Connection con(db);
293 con.EnableQueryVerification();
294
295 // create table that consists of multiple chunks
296 REQUIRE_NO_FAIL(con.Query("BEGIN TRANSACTION"));
297 REQUIRE_NO_FAIL(con.Query("CREATE TABLE test(a INTEGER)"));
298 for (size_t i = 0; i < 500; i++) {
299 REQUIRE_NO_FAIL(con.Query("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES "
300 "(3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);"));
301 }
302 REQUIRE_NO_FAIL(con.Query("COMMIT"));
303
304 // stream the results using the Fetch() API
305 auto result = con.SendQuery("SELECT CAST(a AS INTEGER) FROM test ORDER BY a");
306 VerifyStreamResult(move(result));
307 // we can also stream a materialized result
308 auto materialized = con.Query("SELECT CAST(a AS INTEGER) FROM test ORDER BY a");
309 VerifyStreamResult(move(materialized));
310 // return multiple results using the stream API
311 result = con.SendQuery("SELECT CAST(a AS INTEGER) FROM test ORDER BY a; SELECT CAST(a AS INTEGER) FROM test ORDER "
312 "BY a; SELECT CAST(a AS INTEGER) FROM test ORDER BY a;");
313 auto next = move(result->next);
314 while (next) {
315 auto nextnext = move(next->next);
316 VerifyStreamResult(move(nextnext));
317 next = move(nextnext);
318 }
319 VerifyStreamResult(move(result));
320}
321