1 | #include "catch.hpp" |
2 | #include "test_helpers.hpp" |
3 | |
4 | #include <chrono> |
5 | #include <thread> |
6 | |
7 | using namespace duckdb; |
8 | using namespace std; |
9 | |
10 | TEST_CASE("Test using connection after database is gone" , "[api]" ) { |
11 | auto db = make_unique<DuckDB>(nullptr); |
12 | auto conn = make_unique<Connection>(*db); |
13 | // check that the connection works |
14 | auto result = conn->Query("SELECT 42" ); |
15 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
16 | // destroy the database |
17 | db.reset(); |
18 | // try to use the connection |
19 | REQUIRE_FAIL(conn->Query("SELECT 42" )); |
20 | |
21 | // now try it with an open transaction |
22 | db = make_unique<DuckDB>(nullptr); |
23 | conn = make_unique<Connection>(*db); |
24 | |
25 | REQUIRE_NO_FAIL(conn->Query("BEGIN TRANSACTION" )); |
26 | result = conn->Query("SELECT 42" ); |
27 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
28 | |
29 | db.reset(); |
30 | |
31 | REQUIRE_FAIL(conn->Query("SELECT 42" )); |
32 | } |
33 | |
34 | TEST_CASE("Test destroying connections with open transactions" , "[api]" ) { |
35 | auto db = make_unique<DuckDB>(nullptr); |
36 | { |
37 | Connection con(*db); |
38 | con.Query("BEGIN TRANSACTION" ); |
39 | con.Query("CREATE TABLE test(i INTEGER);" ); |
40 | } |
41 | |
42 | auto conn = make_unique<Connection>(*db); |
43 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE test(i INTEGER)" )); |
44 | } |
45 | |
46 | static void long_running_query(Connection *conn, bool *correct) { |
47 | *correct = true; |
48 | auto result = conn->Query("SELECT i1.i FROM integers i1, integers i2, integers i3, integers i4, integers i5, " |
49 | "integers i6, integers i7, integers i8, integers i9, integers i10," |
50 | "integers i11, integers i12, integers i13" ); |
51 | // the query should fail |
52 | *correct = !result->success; |
53 | } |
54 | |
55 | TEST_CASE("Test closing database during long running query" , "[api]" ) { |
56 | auto db = make_unique<DuckDB>(nullptr); |
57 | auto conn = make_unique<Connection>(*db); |
58 | // create the database |
59 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)" )); |
60 | REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)" )); |
61 | conn->DisableProfiling(); |
62 | // perform a long running query in the background (many cross products) |
63 | bool correct; |
64 | auto background_thread = thread(long_running_query, conn.get(), &correct); |
65 | // wait a little bit |
66 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
67 | // destroy the database |
68 | db.reset(); |
69 | // wait for the thread |
70 | background_thread.join(); |
71 | REQUIRE(correct); |
72 | // try to use the connection |
73 | REQUIRE_FAIL(conn->Query("SELECT 42" )); |
74 | } |
75 | |
76 | TEST_CASE("Test closing result after database is gone" , "[api]" ) { |
77 | auto db = make_unique<DuckDB>(nullptr); |
78 | auto conn = make_unique<Connection>(*db); |
79 | // check that the connection works |
80 | auto result = conn->Query("SELECT 42" ); |
81 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
82 | // destroy the database |
83 | db.reset(); |
84 | conn.reset(); |
85 | result.reset(); |
86 | |
87 | // now the streaming result |
88 | db = make_unique<DuckDB>(nullptr); |
89 | conn = make_unique<Connection>(*db); |
90 | // check that the connection works |
91 | auto streaming_result = conn->SendQuery("SELECT 42" ); |
92 | // destroy the database |
93 | db.reset(); |
94 | conn.reset(); |
95 | streaming_result.reset(); |
96 | } |
97 | |
98 | TEST_CASE("Test closing database with open prepared statements" , "[api]" ) { |
99 | auto db = make_unique<DuckDB>(nullptr); |
100 | auto conn = make_unique<Connection>(*db); |
101 | |
102 | auto p1 = conn->Prepare("CREATE TABLE a (i INTEGER)" ); |
103 | p1->Execute(); |
104 | auto p2 = conn->Prepare("INSERT INTO a VALUES (42)" ); |
105 | p2->Execute(); |
106 | |
107 | db.reset(); |
108 | conn.reset(); |
109 | } |
110 | |
111 | static void parallel_query(Connection *conn, bool *correct, size_t threadnr) { |
112 | correct[threadnr] = true; |
113 | for (size_t i = 0; i < 100; i++) { |
114 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
115 | if (!CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})) { |
116 | correct[threadnr] = false; |
117 | } |
118 | } |
119 | } |
120 | |
121 | TEST_CASE("Test parallel usage of single client" , "[api][.]" ) { |
122 | auto db = make_unique<DuckDB>(nullptr); |
123 | auto conn = make_unique<Connection>(*db); |
124 | |
125 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)" )); |
126 | REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)" )); |
127 | |
128 | bool correct[20]; |
129 | thread threads[20]; |
130 | for (size_t i = 0; i < 20; i++) { |
131 | threads[i] = thread(parallel_query, conn.get(), correct, i); |
132 | } |
133 | for (size_t i = 0; i < 20; i++) { |
134 | threads[i].join(); |
135 | REQUIRE(correct[i]); |
136 | } |
137 | } |
138 | |
139 | static void parallel_query_with_new_connection(DuckDB *db, bool *correct, size_t threadnr) { |
140 | correct[threadnr] = true; |
141 | for (size_t i = 0; i < 100; i++) { |
142 | auto conn = make_unique<Connection>(*db); |
143 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
144 | if (!CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})) { |
145 | correct[threadnr] = false; |
146 | } |
147 | } |
148 | } |
149 | |
150 | TEST_CASE("Test making and dropping connections in parallel to a single database" , "[api][.]" ) { |
151 | auto db = make_unique<DuckDB>(nullptr); |
152 | auto conn = make_unique<Connection>(*db); |
153 | |
154 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)" )); |
155 | REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)" )); |
156 | |
157 | bool correct[20]; |
158 | thread threads[20]; |
159 | for (size_t i = 0; i < 20; i++) { |
160 | threads[i] = thread(parallel_query_with_new_connection, db.get(), correct, i); |
161 | } |
162 | for (size_t i = 0; i < 100; i++) { |
163 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
164 | REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})); |
165 | } |
166 | for (size_t i = 0; i < 20; i++) { |
167 | threads[i].join(); |
168 | REQUIRE(correct[i]); |
169 | } |
170 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
171 | REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})); |
172 | } |
173 | |
174 | TEST_CASE("Test multiple result sets" , "[api]" ) { |
175 | unique_ptr<QueryResult> result; |
176 | DuckDB db(nullptr); |
177 | Connection con(db); |
178 | con.EnableQueryVerification(); |
179 | |
180 | result = con.Query("SELECT 42; SELECT 84" ); |
181 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
182 | result = move(result->next); |
183 | REQUIRE(CHECK_COLUMN(result, 0, {84})); |
184 | REQUIRE(!result->next); |
185 | |
186 | // also with stream api |
187 | result = con.SendQuery("SELECT 42; SELECT 84" ); |
188 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
189 | result = move(result->next); |
190 | REQUIRE(CHECK_COLUMN(result, 0, {84})); |
191 | REQUIRE(!result->next); |
192 | } |
193 | |
194 | TEST_CASE("Test fetch API" , "[api]" ) { |
195 | DuckDB db(nullptr); |
196 | Connection con(db); |
197 | con.EnableQueryVerification(); |
198 | |
199 | unique_ptr<QueryResult> result; |
200 | |
201 | result = con.SendQuery("CREATE TABLE test (a INTEGER);" ); |
202 | |
203 | result = con.Query("select a from test where 1 <> 1" ); |
204 | REQUIRE(CHECK_COLUMN(result, 0, {})); |
205 | |
206 | result = con.SendQuery("INSERT INTO test VALUES (42)" ); |
207 | result = con.SendQuery("SELECT a from test" ); |
208 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
209 | |
210 | auto materialized_result = con.Query("select a from test" ); |
211 | REQUIRE(CHECK_COLUMN(materialized_result, 0, {42})); |
212 | |
213 | // override fetch result |
214 | result = con.SendQuery("SELECT a from test" ); |
215 | result = con.SendQuery("SELECT a from test" ); |
216 | result = con.SendQuery("SELECT a from test" ); |
217 | result = con.SendQuery("SELECT a from test" ); |
218 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
219 | } |
220 | |
221 | TEST_CASE("Test fetch API robustness" , "[api]" ) { |
222 | auto db = make_unique<DuckDB>(nullptr); |
223 | auto conn = make_unique<Connection>(*db); |
224 | |
225 | // remove connection with active stream result |
226 | auto result = conn->SendQuery("SELECT 42" ); |
227 | // close the connection |
228 | conn.reset(); |
229 | // now try to fetch a chunk, this should return a nullptr |
230 | auto chunk = result->Fetch(); |
231 | REQUIRE(!chunk); |
232 | |
233 | // now close the entire database |
234 | conn = make_unique<Connection>(*db); |
235 | result = conn->SendQuery("SELECT 42" ); |
236 | |
237 | db.reset(); |
238 | // fetch should fail |
239 | chunk = result->Fetch(); |
240 | REQUIRE(!chunk); |
241 | // new queries on the connection should fail as well |
242 | REQUIRE_FAIL(conn->SendQuery("SELECT 42" )); |
243 | |
244 | // override fetch result |
245 | db = make_unique<DuckDB>(nullptr); |
246 | conn = make_unique<Connection>(*db); |
247 | auto result1 = conn->SendQuery("SELECT 42" ); |
248 | auto result2 = conn->SendQuery("SELECT 84" ); |
249 | REQUIRE_NO_FAIL(*result1); |
250 | REQUIRE_NO_FAIL(*result2); |
251 | |
252 | // result1 should be closed now |
253 | REQUIRE(!result1->Fetch()); |
254 | // result2 should work |
255 | REQUIRE(result2->Fetch()); |
256 | |
257 | // test materialize |
258 | result1 = conn->SendQuery("SELECT 42" ); |
259 | REQUIRE(result1->type == QueryResultType::STREAM_RESULT); |
260 | auto materialized = ((StreamQueryResult &)*result1).Materialize(); |
261 | result2 = conn->SendQuery("SELECT 84" ); |
262 | |
263 | // we can read materialized still, even after opening a new result |
264 | REQUIRE(CHECK_COLUMN(materialized, 0, {42})); |
265 | REQUIRE(CHECK_COLUMN(result2, 0, {84})); |
266 | } |
267 | |
268 | static void VerifyStreamResult(unique_ptr<QueryResult> result) { |
269 | REQUIRE(result->types[0] == TypeId::INT32); |
270 | size_t current_row = 0; |
271 | size_t current_expected_value = 0; |
272 | size_t expected_rows = 500 * 5; |
273 | while (true) { |
274 | auto chunk = result->Fetch(); |
275 | if (chunk->size() == 0) { |
276 | break; |
277 | } |
278 | auto col1_data = FlatVector::GetData<int>(chunk->data[0]); |
279 | for (size_t k = 0; k < chunk->size(); k++) { |
280 | if (current_row % 500 == 0) { |
281 | current_expected_value++; |
282 | } |
283 | REQUIRE(col1_data[k] == current_expected_value); |
284 | current_row++; |
285 | } |
286 | } |
287 | REQUIRE(current_row == expected_rows); |
288 | } |
289 | |
290 | TEST_CASE("Test fetch API with big results" , "[api][.]" ) { |
291 | DuckDB db(nullptr); |
292 | Connection con(db); |
293 | con.EnableQueryVerification(); |
294 | |
295 | // create table that consists of multiple chunks |
296 | REQUIRE_NO_FAIL(con.Query("BEGIN TRANSACTION" )); |
297 | REQUIRE_NO_FAIL(con.Query("CREATE TABLE test(a INTEGER)" )); |
298 | for (size_t i = 0; i < 500; i++) { |
299 | REQUIRE_NO_FAIL(con.Query("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES " |
300 | "(3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);" )); |
301 | } |
302 | REQUIRE_NO_FAIL(con.Query("COMMIT" )); |
303 | |
304 | // stream the results using the Fetch() API |
305 | auto result = con.SendQuery("SELECT CAST(a AS INTEGER) FROM test ORDER BY a" ); |
306 | VerifyStreamResult(move(result)); |
307 | // we can also stream a materialized result |
308 | auto materialized = con.Query("SELECT CAST(a AS INTEGER) FROM test ORDER BY a" ); |
309 | VerifyStreamResult(move(materialized)); |
310 | // return multiple results using the stream API |
311 | result = con.SendQuery("SELECT CAST(a AS INTEGER) FROM test ORDER BY a; SELECT CAST(a AS INTEGER) FROM test ORDER " |
312 | "BY a; SELECT CAST(a AS INTEGER) FROM test ORDER BY a;" ); |
313 | auto next = move(result->next); |
314 | while (next) { |
315 | auto nextnext = move(next->next); |
316 | VerifyStreamResult(move(nextnext)); |
317 | next = move(nextnext); |
318 | } |
319 | VerifyStreamResult(move(result)); |
320 | } |
321 | |