1#include <DataStreams/IBlockInputStream.h>
2#include <DataStreams/IBlockOutputStream.h>
3#include <DataStreams/copyData.h>
4
5
6namespace DB
7{
8
9namespace
10{
11
12bool isAtomicSet(std::atomic<bool> * val)
13{
14 return ((val != nullptr) && val->load(std::memory_order_seq_cst));
15}
16
17}
18
19template <typename TCancelCallback, typename TProgressCallback>
20void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCallback && is_cancelled, TProgressCallback && progress)
21{
22 from.readPrefix();
23 to.writePrefix();
24
25 while (Block block = from.read())
26 {
27 if (is_cancelled())
28 break;
29
30 to.write(block);
31 progress(block);
32 }
33
34 if (is_cancelled())
35 return;
36
37 /// For outputting additional information in some formats.
38 if (from.getProfileInfo().hasAppliedLimit())
39 to.setRowsBeforeLimit(from.getProfileInfo().getRowsBeforeLimit());
40
41 to.setTotals(from.getTotals());
42 to.setExtremes(from.getExtremes());
43
44 if (is_cancelled())
45 return;
46
47 from.readSuffix();
48 to.writeSuffix();
49}
50
51
52inline void doNothing(const Block &) {}
53
54void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
55{
56 auto is_cancelled_pred = [is_cancelled] ()
57 {
58 return isAtomicSet(is_cancelled);
59 };
60
61 copyDataImpl(from, to, is_cancelled_pred, doNothing);
62}
63
64
65void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
66{
67 copyDataImpl(from, to, is_cancelled, doNothing);
68}
69
70void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
71 const std::function<void(const Block & block)> & progress)
72{
73 copyDataImpl(from, to, is_cancelled, progress);
74}
75
76}
77