1#include <IO/CascadeWriteBuffer.h>
2#include <Common/Exception.h>
3
4namespace DB
5{
6
7namespace ErrorCodes
8{
9 extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
10 extern const int CANNOT_CREATE_IO_BUFFER;
11}
12
13CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_)
14 : WriteBuffer(nullptr, 0), prepared_sources(std::move(prepared_sources_)), lazy_sources(std::move(lazy_sources_))
15{
16 first_lazy_source_num = prepared_sources.size();
17 num_sources = first_lazy_source_num + lazy_sources.size();
18
19 /// fill lazy sources by nullptr
20 prepared_sources.resize(num_sources);
21
22 curr_buffer_num = 0;
23 curr_buffer = setNextBuffer();
24 set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
25}
26
27
28void CascadeWriteBuffer::nextImpl()
29{
30 try
31 {
32 curr_buffer->position() = position();
33 curr_buffer->next();
34 }
35 catch (const Exception & e)
36 {
37 if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED)
38 {
39 /// TODO: protocol should require set(position(), 0) before Exception
40
41 /// good situation, fetch next WriteBuffer
42 ++curr_buffer_num;
43 curr_buffer = setNextBuffer();
44 }
45 else
46 throw;
47 }
48
49 set(curr_buffer->position(), curr_buffer->buffer().end() - curr_buffer->position());
50// std::cerr << "CascadeWriteBuffer a count=" << count() << " bytes=" << bytes << " offset=" << offset()
51// << " bytes+size=" << bytes + buffer().size() << "\n";
52}
53
54
55void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res)
56{
57 /// Sync position with underlying buffer before invalidating
58 curr_buffer->position() = position();
59
60 res = std::move(prepared_sources);
61
62 curr_buffer = nullptr;
63 curr_buffer_num = num_sources = 0;
64 prepared_sources.clear();
65 lazy_sources.clear();
66}
67
68
69WriteBuffer * CascadeWriteBuffer::setNextBuffer()
70{
71 if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources)
72 {
73 if (!prepared_sources[curr_buffer_num])
74 {
75 WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr;
76 prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf);
77 }
78 }
79 else if (curr_buffer_num >= num_sources)
80 throw Exception("There are no WriteBuffers to write result", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
81
82 WriteBuffer * res = prepared_sources[curr_buffer_num].get();
83 if (!res)
84 throw Exception("Required WriteBuffer is not created", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
85
86 /// Check that returned buffer isn't empty
87 if (!res->hasPendingData())
88 res->next();
89
90 return res;
91}
92
93
94CascadeWriteBuffer::~CascadeWriteBuffer()
95{
96 /// Sync position with underlying buffer before exit
97 if (curr_buffer)
98 curr_buffer->position() = position();
99}
100
101
102}
103