1#include "catch.hpp"
2#include "duckdb/common/value_operations/value_operations.hpp"
3#include "test_helpers.hpp"
4
5#include <atomic>
6#include <random>
7#include <thread>
8
9using namespace duckdb;
10using namespace std;
11
12namespace test_concurrent_append {
13
14static constexpr int THREAD_COUNT = 100;
15static constexpr int INSERT_ELEMENTS = 1000;
16
17TEST_CASE("Sequential append", "[transactions][.]") {
18 unique_ptr<MaterializedQueryResult> result;
19 DuckDB db(nullptr);
20 Connection con(db);
21 vector<unique_ptr<Connection>> connections;
22
23 // initialize the database
24 REQUIRE_NO_FAIL(con.Query("CREATE TABLE integers(i INTEGER);"));
25
26 for (size_t i = 0; i < THREAD_COUNT; i++) {
27 connections.push_back(make_unique<Connection>(db));
28 connections[i]->Query("BEGIN TRANSACTION;");
29 }
30
31 for (size_t i = 0; i < THREAD_COUNT; i++) {
32 result = connections[i]->Query("SELECT COUNT(*) FROM integers");
33 assert(result->collection.count > 0);
34 Value count = result->collection.chunks[0]->GetValue(0, 0);
35 REQUIRE(count == 0);
36 for (size_t j = 0; j < INSERT_ELEMENTS; j++) {
37 connections[i]->Query("INSERT INTO integers VALUES (3)");
38 result = connections[i]->Query("SELECT COUNT(*) FROM integers");
39 Value new_count = result->collection.chunks[0]->GetValue(0, 0);
40 REQUIRE(new_count == j + 1);
41 count = new_count;
42 }
43 }
44
45 for (size_t i = 0; i < THREAD_COUNT; i++) {
46 connections[i]->Query("COMMIT;");
47 }
48 result = con.Query("SELECT COUNT(*) FROM integers");
49 Value count = result->collection.chunks[0]->GetValue(0, 0);
50 REQUIRE(count == THREAD_COUNT * INSERT_ELEMENTS);
51}
52
53static volatile std::atomic<int> finished_threads;
54
55static void insert_random_elements(DuckDB *db, bool *correct, int threadnr) {
56 correct[threadnr] = true;
57 Connection con(*db);
58 // initial count
59 con.Query("BEGIN TRANSACTION;");
60 auto result = con.Query("SELECT COUNT(*) FROM integers");
61 Value count = result->collection.chunks[0]->GetValue(0, 0);
62 auto start_count = count.GetValue<int64_t>();
63 for (size_t i = 0; i < INSERT_ELEMENTS; i++) {
64 // count should increase by one for every append we do
65 con.Query("INSERT INTO integers VALUES (3)");
66 result = con.Query("SELECT COUNT(*) FROM integers");
67 Value new_count = result->collection.chunks[0]->GetValue(0, 0);
68 if (new_count != start_count + i + 1) {
69 correct[threadnr] = false;
70 }
71 count = new_count;
72 }
73 finished_threads++;
74 while (finished_threads != THREAD_COUNT)
75 ;
76 con.Query("COMMIT;");
77}
78
79TEST_CASE("Concurrent append", "[transactions][.]") {
80 unique_ptr<QueryResult> result;
81 DuckDB db(nullptr);
82 Connection con(db);
83
84 // initialize the database
85 REQUIRE_NO_FAIL(con.Query("CREATE TABLE integers(i INTEGER);"));
86
87 finished_threads = 0;
88
89 bool correct[THREAD_COUNT];
90 thread threads[THREAD_COUNT];
91 for (size_t i = 0; i < THREAD_COUNT; i++) {
92 threads[i] = thread(insert_random_elements, &db, correct, i);
93 }
94
95 for (size_t i = 0; i < THREAD_COUNT; i++) {
96 threads[i].join();
97 REQUIRE(correct[i]);
98 }
99
100 result = con.Query("SELECT COUNT(*), SUM(i) FROM integers");
101 REQUIRE(CHECK_COLUMN(result, 0, {Value::BIGINT(THREAD_COUNT * INSERT_ELEMENTS)}));
102 REQUIRE(CHECK_COLUMN(result, 1, {Value::BIGINT(3 * THREAD_COUNT * INSERT_ELEMENTS)}));
103}
104
105} // namespace test_concurrent_append
106