1#include <DataStreams/ConvertingBlockInputStream.h>
2#include <Interpreters/castColumn.h>
3#include <Columns/ColumnConst.h>
4#include <Common/assert_cast.h>
5#include <Common/quoteString.h>
6#include <Parsers/IAST.h>
7
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14 extern const int THERE_IS_NO_COLUMN;
15 extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
16 extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
17}
18
19
20static ColumnPtr castColumnWithDiagnostic(const ColumnWithTypeAndName & src_elem, const ColumnWithTypeAndName & res_elem, const Context & context)
21{
22 try
23 {
24 return castColumn(src_elem, res_elem.type, context);
25 }
26 catch (Exception & e)
27 {
28 e.addMessage("while converting source column " + backQuoteIfNeed(src_elem.name) + " to destination column " + backQuoteIfNeed(res_elem.name));
29 throw;
30 }
31}
32
33
34ConvertingBlockInputStream::ConvertingBlockInputStream(
35 const Context & context_,
36 const BlockInputStreamPtr & input,
37 const Block & result_header,
38 MatchColumnsMode mode)
39 : context(context_), header(result_header), conversion(header.columns())
40{
41 children.emplace_back(input);
42
43 Block input_header = input->getHeader();
44
45 size_t num_input_columns = input_header.columns();
46 size_t num_result_columns = result_header.columns();
47
48 if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
49 throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
50
51 for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
52 {
53 const auto & res_elem = result_header.getByPosition(result_col_num);
54
55 switch (mode)
56 {
57 case MatchColumnsMode::Position:
58 conversion[result_col_num] = result_col_num;
59 break;
60
61 case MatchColumnsMode::Name:
62 if (input_header.has(res_elem.name))
63 conversion[result_col_num] = input_header.getPositionByName(res_elem.name);
64 else
65 throw Exception("Cannot find column " + backQuote(res_elem.name) + " in source stream",
66 ErrorCodes::THERE_IS_NO_COLUMN);
67 break;
68 }
69
70 const auto & src_elem = input_header.getByPosition(conversion[result_col_num]);
71
72 /// Check constants.
73
74 if (isColumnConst(*res_elem.column))
75 {
76 if (!isColumnConst(*src_elem.column))
77 throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name)
78 + " because it is non constant in source stream but must be constant in result",
79 ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
80 else if (assert_cast<const ColumnConst &>(*src_elem.column).getField() != assert_cast<const ColumnConst &>(*res_elem.column).getField())
81 throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name)
82 + " because it is constant but values of constants are different in source and result",
83 ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
84 }
85
86 /// Check conversion by dry run CAST function.
87
88 castColumnWithDiagnostic(src_elem, res_elem, context);
89 }
90}
91
92
93Block ConvertingBlockInputStream::readImpl()
94{
95 Block src = children.back()->read();
96
97 if (!src)
98 return src;
99
100 Block res = header.cloneEmpty();
101 for (size_t res_pos = 0, size = conversion.size(); res_pos < size; ++res_pos)
102 {
103 const auto & src_elem = src.getByPosition(conversion[res_pos]);
104 auto & res_elem = res.getByPosition(res_pos);
105
106 ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context);
107
108 if (isColumnConst(*src_elem.column) && !isColumnConst(*res_elem.column))
109 converted = converted->convertToFullColumnIfConst();
110
111 res_elem.column = std::move(converted);
112 }
113 return res;
114}
115
116}
117