| 1 | #include "catch.hpp" |
| 2 | #include "test_helpers.hpp" |
| 3 | |
| 4 | #include <chrono> |
| 5 | #include <thread> |
| 6 | |
| 7 | using namespace duckdb; |
| 8 | using namespace std; |
| 9 | |
| 10 | TEST_CASE("Test using connection after database is gone" , "[api]" ) { |
| 11 | auto db = make_unique<DuckDB>(nullptr); |
| 12 | auto conn = make_unique<Connection>(*db); |
| 13 | // check that the connection works |
| 14 | auto result = conn->Query("SELECT 42" ); |
| 15 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
| 16 | // destroy the database |
| 17 | db.reset(); |
| 18 | // try to use the connection |
| 19 | REQUIRE_FAIL(conn->Query("SELECT 42" )); |
| 20 | |
| 21 | // now try it with an open transaction |
| 22 | db = make_unique<DuckDB>(nullptr); |
| 23 | conn = make_unique<Connection>(*db); |
| 24 | |
| 25 | REQUIRE_NO_FAIL(conn->Query("BEGIN TRANSACTION" )); |
| 26 | result = conn->Query("SELECT 42" ); |
| 27 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
| 28 | |
| 29 | db.reset(); |
| 30 | |
| 31 | REQUIRE_FAIL(conn->Query("SELECT 42" )); |
| 32 | } |
| 33 | |
| 34 | TEST_CASE("Test destroying connections with open transactions" , "[api]" ) { |
| 35 | auto db = make_unique<DuckDB>(nullptr); |
| 36 | { |
| 37 | Connection con(*db); |
| 38 | con.Query("BEGIN TRANSACTION" ); |
| 39 | con.Query("CREATE TABLE test(i INTEGER);" ); |
| 40 | } |
| 41 | |
| 42 | auto conn = make_unique<Connection>(*db); |
| 43 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE test(i INTEGER)" )); |
| 44 | } |
| 45 | |
| 46 | static void long_running_query(Connection *conn, bool *correct) { |
| 47 | *correct = true; |
| 48 | auto result = conn->Query("SELECT i1.i FROM integers i1, integers i2, integers i3, integers i4, integers i5, " |
| 49 | "integers i6, integers i7, integers i8, integers i9, integers i10," |
| 50 | "integers i11, integers i12, integers i13" ); |
| 51 | // the query should fail |
| 52 | *correct = !result->success; |
| 53 | } |
| 54 | |
| 55 | TEST_CASE("Test closing database during long running query" , "[api]" ) { |
| 56 | auto db = make_unique<DuckDB>(nullptr); |
| 57 | auto conn = make_unique<Connection>(*db); |
| 58 | // create the database |
| 59 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)" )); |
| 60 | REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)" )); |
| 61 | conn->DisableProfiling(); |
| 62 | // perform a long running query in the background (many cross products) |
| 63 | bool correct; |
| 64 | auto background_thread = thread(long_running_query, conn.get(), &correct); |
| 65 | // wait a little bit |
| 66 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
| 67 | // destroy the database |
| 68 | db.reset(); |
| 69 | // wait for the thread |
| 70 | background_thread.join(); |
| 71 | REQUIRE(correct); |
| 72 | // try to use the connection |
| 73 | REQUIRE_FAIL(conn->Query("SELECT 42" )); |
| 74 | } |
| 75 | |
| 76 | TEST_CASE("Test closing result after database is gone" , "[api]" ) { |
| 77 | auto db = make_unique<DuckDB>(nullptr); |
| 78 | auto conn = make_unique<Connection>(*db); |
| 79 | // check that the connection works |
| 80 | auto result = conn->Query("SELECT 42" ); |
| 81 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
| 82 | // destroy the database |
| 83 | db.reset(); |
| 84 | conn.reset(); |
| 85 | result.reset(); |
| 86 | |
| 87 | // now the streaming result |
| 88 | db = make_unique<DuckDB>(nullptr); |
| 89 | conn = make_unique<Connection>(*db); |
| 90 | // check that the connection works |
| 91 | auto streaming_result = conn->SendQuery("SELECT 42" ); |
| 92 | // destroy the database |
| 93 | db.reset(); |
| 94 | conn.reset(); |
| 95 | streaming_result.reset(); |
| 96 | } |
| 97 | |
| 98 | TEST_CASE("Test closing database with open prepared statements" , "[api]" ) { |
| 99 | auto db = make_unique<DuckDB>(nullptr); |
| 100 | auto conn = make_unique<Connection>(*db); |
| 101 | |
| 102 | auto p1 = conn->Prepare("CREATE TABLE a (i INTEGER)" ); |
| 103 | p1->Execute(); |
| 104 | auto p2 = conn->Prepare("INSERT INTO a VALUES (42)" ); |
| 105 | p2->Execute(); |
| 106 | |
| 107 | db.reset(); |
| 108 | conn.reset(); |
| 109 | } |
| 110 | |
| 111 | static void parallel_query(Connection *conn, bool *correct, size_t threadnr) { |
| 112 | correct[threadnr] = true; |
| 113 | for (size_t i = 0; i < 100; i++) { |
| 114 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
| 115 | if (!CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})) { |
| 116 | correct[threadnr] = false; |
| 117 | } |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | TEST_CASE("Test parallel usage of single client" , "[api][.]" ) { |
| 122 | auto db = make_unique<DuckDB>(nullptr); |
| 123 | auto conn = make_unique<Connection>(*db); |
| 124 | |
| 125 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)" )); |
| 126 | REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)" )); |
| 127 | |
| 128 | bool correct[20]; |
| 129 | thread threads[20]; |
| 130 | for (size_t i = 0; i < 20; i++) { |
| 131 | threads[i] = thread(parallel_query, conn.get(), correct, i); |
| 132 | } |
| 133 | for (size_t i = 0; i < 20; i++) { |
| 134 | threads[i].join(); |
| 135 | REQUIRE(correct[i]); |
| 136 | } |
| 137 | } |
| 138 | |
| 139 | static void parallel_query_with_new_connection(DuckDB *db, bool *correct, size_t threadnr) { |
| 140 | correct[threadnr] = true; |
| 141 | for (size_t i = 0; i < 100; i++) { |
| 142 | auto conn = make_unique<Connection>(*db); |
| 143 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
| 144 | if (!CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})) { |
| 145 | correct[threadnr] = false; |
| 146 | } |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | TEST_CASE("Test making and dropping connections in parallel to a single database" , "[api][.]" ) { |
| 151 | auto db = make_unique<DuckDB>(nullptr); |
| 152 | auto conn = make_unique<Connection>(*db); |
| 153 | |
| 154 | REQUIRE_NO_FAIL(conn->Query("CREATE TABLE integers(i INTEGER)" )); |
| 155 | REQUIRE_NO_FAIL(conn->Query("INSERT INTO integers VALUES (1), (2), (3), (NULL)" )); |
| 156 | |
| 157 | bool correct[20]; |
| 158 | thread threads[20]; |
| 159 | for (size_t i = 0; i < 20; i++) { |
| 160 | threads[i] = thread(parallel_query_with_new_connection, db.get(), correct, i); |
| 161 | } |
| 162 | for (size_t i = 0; i < 100; i++) { |
| 163 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
| 164 | REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})); |
| 165 | } |
| 166 | for (size_t i = 0; i < 20; i++) { |
| 167 | threads[i].join(); |
| 168 | REQUIRE(correct[i]); |
| 169 | } |
| 170 | auto result = conn->Query("SELECT * FROM integers ORDER BY i" ); |
| 171 | REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})); |
| 172 | } |
| 173 | |
| 174 | TEST_CASE("Test multiple result sets" , "[api]" ) { |
| 175 | unique_ptr<QueryResult> result; |
| 176 | DuckDB db(nullptr); |
| 177 | Connection con(db); |
| 178 | con.EnableQueryVerification(); |
| 179 | |
| 180 | result = con.Query("SELECT 42; SELECT 84" ); |
| 181 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
| 182 | result = move(result->next); |
| 183 | REQUIRE(CHECK_COLUMN(result, 0, {84})); |
| 184 | REQUIRE(!result->next); |
| 185 | |
| 186 | // also with stream api |
| 187 | result = con.SendQuery("SELECT 42; SELECT 84" ); |
| 188 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
| 189 | result = move(result->next); |
| 190 | REQUIRE(CHECK_COLUMN(result, 0, {84})); |
| 191 | REQUIRE(!result->next); |
| 192 | } |
| 193 | |
| 194 | TEST_CASE("Test fetch API" , "[api]" ) { |
| 195 | DuckDB db(nullptr); |
| 196 | Connection con(db); |
| 197 | con.EnableQueryVerification(); |
| 198 | |
| 199 | unique_ptr<QueryResult> result; |
| 200 | |
| 201 | result = con.SendQuery("CREATE TABLE test (a INTEGER);" ); |
| 202 | |
| 203 | result = con.Query("select a from test where 1 <> 1" ); |
| 204 | REQUIRE(CHECK_COLUMN(result, 0, {})); |
| 205 | |
| 206 | result = con.SendQuery("INSERT INTO test VALUES (42)" ); |
| 207 | result = con.SendQuery("SELECT a from test" ); |
| 208 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
| 209 | |
| 210 | auto materialized_result = con.Query("select a from test" ); |
| 211 | REQUIRE(CHECK_COLUMN(materialized_result, 0, {42})); |
| 212 | |
| 213 | // override fetch result |
| 214 | result = con.SendQuery("SELECT a from test" ); |
| 215 | result = con.SendQuery("SELECT a from test" ); |
| 216 | result = con.SendQuery("SELECT a from test" ); |
| 217 | result = con.SendQuery("SELECT a from test" ); |
| 218 | REQUIRE(CHECK_COLUMN(result, 0, {42})); |
| 219 | } |
| 220 | |
| 221 | TEST_CASE("Test fetch API robustness" , "[api]" ) { |
| 222 | auto db = make_unique<DuckDB>(nullptr); |
| 223 | auto conn = make_unique<Connection>(*db); |
| 224 | |
| 225 | // remove connection with active stream result |
| 226 | auto result = conn->SendQuery("SELECT 42" ); |
| 227 | // close the connection |
| 228 | conn.reset(); |
| 229 | // now try to fetch a chunk, this should return a nullptr |
| 230 | auto chunk = result->Fetch(); |
| 231 | REQUIRE(!chunk); |
| 232 | |
| 233 | // now close the entire database |
| 234 | conn = make_unique<Connection>(*db); |
| 235 | result = conn->SendQuery("SELECT 42" ); |
| 236 | |
| 237 | db.reset(); |
| 238 | // fetch should fail |
| 239 | chunk = result->Fetch(); |
| 240 | REQUIRE(!chunk); |
| 241 | // new queries on the connection should fail as well |
| 242 | REQUIRE_FAIL(conn->SendQuery("SELECT 42" )); |
| 243 | |
| 244 | // override fetch result |
| 245 | db = make_unique<DuckDB>(nullptr); |
| 246 | conn = make_unique<Connection>(*db); |
| 247 | auto result1 = conn->SendQuery("SELECT 42" ); |
| 248 | auto result2 = conn->SendQuery("SELECT 84" ); |
| 249 | REQUIRE_NO_FAIL(*result1); |
| 250 | REQUIRE_NO_FAIL(*result2); |
| 251 | |
| 252 | // result1 should be closed now |
| 253 | REQUIRE(!result1->Fetch()); |
| 254 | // result2 should work |
| 255 | REQUIRE(result2->Fetch()); |
| 256 | |
| 257 | // test materialize |
| 258 | result1 = conn->SendQuery("SELECT 42" ); |
| 259 | REQUIRE(result1->type == QueryResultType::STREAM_RESULT); |
| 260 | auto materialized = ((StreamQueryResult &)*result1).Materialize(); |
| 261 | result2 = conn->SendQuery("SELECT 84" ); |
| 262 | |
| 263 | // we can read materialized still, even after opening a new result |
| 264 | REQUIRE(CHECK_COLUMN(materialized, 0, {42})); |
| 265 | REQUIRE(CHECK_COLUMN(result2, 0, {84})); |
| 266 | } |
| 267 | |
| 268 | static void VerifyStreamResult(unique_ptr<QueryResult> result) { |
| 269 | REQUIRE(result->types[0] == TypeId::INT32); |
| 270 | size_t current_row = 0; |
| 271 | size_t current_expected_value = 0; |
| 272 | size_t expected_rows = 500 * 5; |
| 273 | while (true) { |
| 274 | auto chunk = result->Fetch(); |
| 275 | if (chunk->size() == 0) { |
| 276 | break; |
| 277 | } |
| 278 | auto col1_data = FlatVector::GetData<int>(chunk->data[0]); |
| 279 | for (size_t k = 0; k < chunk->size(); k++) { |
| 280 | if (current_row % 500 == 0) { |
| 281 | current_expected_value++; |
| 282 | } |
| 283 | REQUIRE(col1_data[k] == current_expected_value); |
| 284 | current_row++; |
| 285 | } |
| 286 | } |
| 287 | REQUIRE(current_row == expected_rows); |
| 288 | } |
| 289 | |
| 290 | TEST_CASE("Test fetch API with big results" , "[api][.]" ) { |
| 291 | DuckDB db(nullptr); |
| 292 | Connection con(db); |
| 293 | con.EnableQueryVerification(); |
| 294 | |
| 295 | // create table that consists of multiple chunks |
| 296 | REQUIRE_NO_FAIL(con.Query("BEGIN TRANSACTION" )); |
| 297 | REQUIRE_NO_FAIL(con.Query("CREATE TABLE test(a INTEGER)" )); |
| 298 | for (size_t i = 0; i < 500; i++) { |
| 299 | REQUIRE_NO_FAIL(con.Query("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES " |
| 300 | "(3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);" )); |
| 301 | } |
| 302 | REQUIRE_NO_FAIL(con.Query("COMMIT" )); |
| 303 | |
| 304 | // stream the results using the Fetch() API |
| 305 | auto result = con.SendQuery("SELECT CAST(a AS INTEGER) FROM test ORDER BY a" ); |
| 306 | VerifyStreamResult(move(result)); |
| 307 | // we can also stream a materialized result |
| 308 | auto materialized = con.Query("SELECT CAST(a AS INTEGER) FROM test ORDER BY a" ); |
| 309 | VerifyStreamResult(move(materialized)); |
| 310 | // return multiple results using the stream API |
| 311 | result = con.SendQuery("SELECT CAST(a AS INTEGER) FROM test ORDER BY a; SELECT CAST(a AS INTEGER) FROM test ORDER " |
| 312 | "BY a; SELECT CAST(a AS INTEGER) FROM test ORDER BY a;" ); |
| 313 | auto next = move(result->next); |
| 314 | while (next) { |
| 315 | auto nextnext = move(next->next); |
| 316 | VerifyStreamResult(move(nextnext)); |
| 317 | next = move(nextnext); |
| 318 | } |
| 319 | VerifyStreamResult(move(result)); |
| 320 | } |
| 321 | |