1 | #include <IO/CascadeWriteBuffer.h> |
2 | #include <Common/Exception.h> |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | namespace ErrorCodes |
8 | { |
9 | extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER; |
10 | extern const int CANNOT_CREATE_IO_BUFFER; |
11 | } |
12 | |
13 | CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_) |
14 | : WriteBuffer(nullptr, 0), prepared_sources(std::move(prepared_sources_)), lazy_sources(std::move(lazy_sources_)) |
15 | { |
16 | first_lazy_source_num = prepared_sources.size(); |
17 | num_sources = first_lazy_source_num + lazy_sources.size(); |
18 | |
19 | /// fill lazy sources by nullptr |
20 | prepared_sources.resize(num_sources); |
21 | |
22 | curr_buffer_num = 0; |
23 | curr_buffer = setNextBuffer(); |
24 | set(curr_buffer->buffer().begin(), curr_buffer->buffer().size()); |
25 | } |
26 | |
27 | |
28 | void CascadeWriteBuffer::nextImpl() |
29 | { |
30 | try |
31 | { |
32 | curr_buffer->position() = position(); |
33 | curr_buffer->next(); |
34 | } |
35 | catch (const Exception & e) |
36 | { |
37 | if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED) |
38 | { |
39 | /// TODO: protocol should require set(position(), 0) before Exception |
40 | |
41 | /// good situation, fetch next WriteBuffer |
42 | ++curr_buffer_num; |
43 | curr_buffer = setNextBuffer(); |
44 | } |
45 | else |
46 | throw; |
47 | } |
48 | |
49 | set(curr_buffer->position(), curr_buffer->buffer().end() - curr_buffer->position()); |
50 | // std::cerr << "CascadeWriteBuffer a count=" << count() << " bytes=" << bytes << " offset=" << offset() |
51 | // << " bytes+size=" << bytes + buffer().size() << "\n"; |
52 | } |
53 | |
54 | |
55 | void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res) |
56 | { |
57 | /// Sync position with underlying buffer before invalidating |
58 | curr_buffer->position() = position(); |
59 | |
60 | res = std::move(prepared_sources); |
61 | |
62 | curr_buffer = nullptr; |
63 | curr_buffer_num = num_sources = 0; |
64 | prepared_sources.clear(); |
65 | lazy_sources.clear(); |
66 | } |
67 | |
68 | |
69 | WriteBuffer * CascadeWriteBuffer::setNextBuffer() |
70 | { |
71 | if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources) |
72 | { |
73 | if (!prepared_sources[curr_buffer_num]) |
74 | { |
75 | WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr; |
76 | prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf); |
77 | } |
78 | } |
79 | else if (curr_buffer_num >= num_sources) |
80 | throw Exception("There are no WriteBuffers to write result" , ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER); |
81 | |
82 | WriteBuffer * res = prepared_sources[curr_buffer_num].get(); |
83 | if (!res) |
84 | throw Exception("Required WriteBuffer is not created" , ErrorCodes::CANNOT_CREATE_IO_BUFFER); |
85 | |
86 | /// Check that returned buffer isn't empty |
87 | if (!res->hasPendingData()) |
88 | res->next(); |
89 | |
90 | return res; |
91 | } |
92 | |
93 | |
94 | CascadeWriteBuffer::~CascadeWriteBuffer() |
95 | { |
96 | /// Sync position with underlying buffer before exit |
97 | if (curr_buffer) |
98 | curr_buffer->position() = position(); |
99 | } |
100 | |
101 | |
102 | } |
103 | |