1 | #pragma once |
---|---|
2 | |
3 | #include <math.h> |
4 | |
5 | #include <vector> |
6 | |
7 | #include <Common/ThreadPool.h> |
8 | #include <IO/WriteBuffer.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | |
15 | /** Writes data asynchronously using double buffering. |
16 | */ |
17 | class AsynchronousWriteBuffer : public WriteBuffer |
18 | { |
19 | private: |
20 | WriteBuffer & out; /// The main buffer, responsible for writing data. |
21 | std::vector <char> memory; /// A piece of memory for duplicating the buffer. |
22 | ThreadPool pool; /// For asynchronous data writing. |
23 | bool started; /// Has an asynchronous data write started? |
24 | |
25 | /// Swap the main and duplicate buffers. |
26 | void swapBuffers() |
27 | { |
28 | swap(out); |
29 | } |
30 | |
31 | void nextImpl() override |
32 | { |
33 | if (!offset()) |
34 | return; |
35 | |
36 | if (started) |
37 | pool.wait(); |
38 | else |
39 | started = true; |
40 | |
41 | swapBuffers(); |
42 | |
43 | /// The data will be written in separate stream. |
44 | pool.scheduleOrThrowOnError([this] { thread(); }); |
45 | } |
46 | |
47 | public: |
48 | AsynchronousWriteBuffer(WriteBuffer & out_) : WriteBuffer(nullptr, 0), out(out_), memory(out.buffer().size()), pool(1), started(false) |
49 | { |
50 | /// Data is written to the duplicate buffer. |
51 | set(memory.data(), memory.size()); |
52 | } |
53 | |
54 | ~AsynchronousWriteBuffer() override |
55 | { |
56 | try |
57 | { |
58 | if (started) |
59 | pool.wait(); |
60 | |
61 | swapBuffers(); |
62 | out.next(); |
63 | } |
64 | catch (...) |
65 | { |
66 | tryLogCurrentException(__PRETTY_FUNCTION__); |
67 | } |
68 | } |
69 | |
70 | /// That is executed in a separate thread |
71 | void thread() |
72 | { |
73 | out.next(); |
74 | } |
75 | }; |
76 | |
77 | } |
78 |