1#include <Common/Exception.h>
2#include <IO/ReadBuffer.h>
3#include <IO/WriteBuffer.h>
4#include <IO/copyData.h>
5
6
7namespace DB
8{
9
10namespace
11{
12
13void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, const std::atomic<int> * is_cancelled)
14{
15 /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
16 while (bytes > 0 && !from.eof())
17 {
18 if (is_cancelled && *is_cancelled)
19 return;
20
21 /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read.
22 size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
23 to.write(from.position(), count);
24 from.position() += count;
25 bytes -= count;
26 }
27
28 if (check_bytes && bytes > 0)
29 throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
30}
31
32void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::function<void()> cancellation_hook)
33{
34 /// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
35 while (bytes > 0 && !from.eof())
36 {
37 if (cancellation_hook)
38 cancellation_hook();
39
40 /// buffer() - a piece of data available for reading; position() - the cursor of the place to which you have already read.
41 size_t count = std::min(bytes, static_cast<size_t>(from.buffer().end() - from.position()));
42 to.write(from.position(), count);
43 from.position() += count;
44 bytes -= count;
45 }
46
47 if (check_bytes && bytes > 0)
48 throw Exception("Attempt to read after EOF.", ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
49}
50
51}
52
53void copyData(ReadBuffer & from, WriteBuffer & to)
54{
55 copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr);
56}
57
58void copyData(ReadBuffer & from, WriteBuffer & to, const std::atomic<int> & is_cancelled)
59{
60 copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled);
61}
62
63void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook)
64{
65 copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), cancellation_hook);
66}
67
68void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
69{
70 copyDataImpl(from, to, true, bytes, nullptr);
71}
72
73void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, const std::atomic<int> & is_cancelled)
74{
75 copyDataImpl(from, to, true, bytes, &is_cancelled);
76}
77
78void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook)
79{
80 copyDataImpl(from, to, true, bytes, cancellation_hook);
81}
82
83}
84