1#pragma once
2
3#include <math.h>
4
5#include <vector>
6
7#include <Common/ThreadPool.h>
8#include <IO/WriteBuffer.h>
9
10
11namespace DB
12{
13
14
15/** Writes data asynchronously using double buffering.
16 */
17class AsynchronousWriteBuffer : public WriteBuffer
18{
19private:
20 WriteBuffer & out; /// The main buffer, responsible for writing data.
21 std::vector <char> memory; /// A piece of memory for duplicating the buffer.
22 ThreadPool pool; /// For asynchronous data writing.
23 bool started; /// Has an asynchronous data write started?
24
25 /// Swap the main and duplicate buffers.
26 void swapBuffers()
27 {
28 swap(out);
29 }
30
31 void nextImpl() override
32 {
33 if (!offset())
34 return;
35
36 if (started)
37 pool.wait();
38 else
39 started = true;
40
41 swapBuffers();
42
43 /// The data will be written in separate stream.
44 pool.scheduleOrThrowOnError([this] { thread(); });
45 }
46
47public:
48 AsynchronousWriteBuffer(WriteBuffer & out_) : WriteBuffer(nullptr, 0), out(out_), memory(out.buffer().size()), pool(1), started(false)
49 {
50 /// Data is written to the duplicate buffer.
51 set(memory.data(), memory.size());
52 }
53
54 ~AsynchronousWriteBuffer() override
55 {
56 try
57 {
58 if (started)
59 pool.wait();
60
61 swapBuffers();
62 out.next();
63 }
64 catch (...)
65 {
66 tryLogCurrentException(__PRETTY_FUNCTION__);
67 }
68 }
69
70 /// That is executed in a separate thread
71 void thread()
72 {
73 out.next();
74 }
75};
76
77}
78