1#include <DataStreams/ColumnGathererStream.h>
2#include <common/logger_useful.h>
3#include <Common/typeid_cast.h>
4#include <IO/WriteHelpers.h>
5#include <iomanip>
6
7
8namespace DB
9{
10
11namespace ErrorCodes
12{
13 extern const int LOGICAL_ERROR;
14 extern const int INCOMPATIBLE_COLUMNS;
15 extern const int INCORRECT_NUMBER_OF_COLUMNS;
16 extern const int NOT_FOUND_COLUMN_IN_BLOCK;
17 extern const int EMPTY_DATA_PASSED;
18 extern const int RECEIVED_EMPTY_DATA;
19}
20
21ColumnGathererStream::ColumnGathererStream(
22 const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
23 size_t block_preferred_size_)
24 : column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
25 , block_preferred_size(block_preferred_size_), log(&Logger::get("ColumnGathererStream"))
26{
27 if (source_streams.empty())
28 throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
29
30 children.assign(source_streams.begin(), source_streams.end());
31
32 for (size_t i = 0; i < children.size(); ++i)
33 {
34 const Block & header = children[i]->getHeader();
35
36 /// Sometimes MergeTreeReader injects additional column with partitioning key
37 if (header.columns() > 2)
38 throw Exception(
39 "Block should have 1 or 2 columns, but contains " + toString(header.columns()),
40 ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
41
42 if (i == 0)
43 {
44 column.name = column_name;
45 column.type = header.getByName(column_name).type;
46 column.column = column.type->createColumn();
47 }
48 else if (header.getByName(column_name).column->getName() != column.column->getName())
49 throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
50 }
51}
52
53
54Block ColumnGathererStream::readImpl()
55{
56 /// Special case: single source and there are no skipped rows
57 if (children.size() == 1 && row_sources_buf.eof())
58 return children[0]->read();
59
60 if (!source_to_fully_copy && row_sources_buf.eof())
61 return Block();
62
63 MutableColumnPtr output_column = column.column->cloneEmpty();
64 output_block = Block{column.cloneEmpty()};
65 output_column->gather(*this);
66 if (!output_column->empty())
67 output_block.getByPosition(0).column = std::move(output_column);
68 return output_block;
69}
70
71
72void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
73{
74 try
75 {
76 source.block = children[source_num]->read();
77 source.update(column_name);
78 }
79 catch (Exception & e)
80 {
81 e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
82 throw;
83 }
84
85 if (0 == source.size)
86 {
87 throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
88 ErrorCodes::RECEIVED_EMPTY_DATA);
89 }
90}
91
92
93void ColumnGathererStream::readSuffixImpl()
94{
95 const BlockStreamProfileInfo & profile_info = getProfileInfo();
96
97 /// Don't print info for small parts (< 10M rows)
98 if (profile_info.rows < 10000000)
99 return;
100
101 double seconds = profile_info.total_stopwatch.elapsedSeconds();
102
103 std::stringstream message;
104 message << std::fixed << std::setprecision(2)
105 << "Gathered column " << column_name
106 << " (" << static_cast<double>(profile_info.bytes) / profile_info.rows << " bytes/elem.)"
107 << " in " << seconds << " sec.";
108
109 if (seconds)
110 message << ", " << profile_info.rows / seconds << " rows/sec., "
111 << profile_info.bytes / 1048576.0 / seconds << " MiB/sec.";
112
113 LOG_TRACE(log, message.str());
114}
115
116}
117