1#include <Processors/Transforms/ConvertingTransform.h>
2
3#include <Interpreters/castColumn.h>
4#include <Columns/ColumnConst.h>
5#include <Parsers/IAST.h>
6#include <Common/typeid_cast.h>
7#include <Common/quoteString.h>
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14 extern const int THERE_IS_NO_COLUMN;
15 extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
16 extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
17}
18
19static ColumnPtr castColumnWithDiagnostic(
20 const ColumnWithTypeAndName & src_elem,
21 const ColumnWithTypeAndName & res_elem,
22 const Context & context)
23{
24 try
25 {
26 return castColumn(src_elem, res_elem.type, context);
27 }
28 catch (Exception & e)
29 {
30 e.addMessage("while converting source column " + backQuoteIfNeed(src_elem.name) +
31 " to destination column " + backQuoteIfNeed(res_elem.name));
32 throw;
33 }
34}
35
36ConvertingTransform::ConvertingTransform(
37 Block source_header_,
38 Block result_header_,
39 MatchColumnsMode mode_,
40 const Context & context_)
41 : ISimpleTransform(std::move(source_header_), std::move(result_header_), false)
42 , context(context_)
43 , conversion(getOutputPort().getHeader().columns())
44{
45 auto & source = getInputPort().getHeader();
46 auto & result = getOutputPort().getHeader();
47
48 size_t num_input_columns = source.columns();
49 size_t num_result_columns = result.columns();
50
51 if (mode_ == MatchColumnsMode::Position && num_input_columns != num_result_columns)
52 throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
53
54 for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
55 {
56 const auto & res_elem = result.getByPosition(result_col_num);
57
58 switch (mode_)
59 {
60 case MatchColumnsMode::Position:
61 conversion[result_col_num] = result_col_num;
62 break;
63
64 case MatchColumnsMode::Name:
65 if (source.has(res_elem.name))
66 conversion[result_col_num] = source.getPositionByName(res_elem.name);
67 else
68 throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",
69 ErrorCodes::THERE_IS_NO_COLUMN);
70 break;
71 }
72
73 const auto & src_elem = source.getByPosition(conversion[result_col_num]);
74
75 /// Check constants.
76
77 if (auto * res_const = typeid_cast<const ColumnConst *>(res_elem.column.get()))
78 {
79 if (auto * src_const = typeid_cast<const ColumnConst *>(src_elem.column.get()))
80 {
81 if (res_const->getField() != src_const->getField())
82 throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
83 "it is constant but values of constants are different in source and result",
84 ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
85 }
86 else
87 throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name) + " because "
88 "it is non constant in source stream but must be constant in result",
89 ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
90 }
91
92 /// Check conversion by dry run CAST function.
93
94 castColumnWithDiagnostic(src_elem, res_elem, context);
95 }
96}
97
98void ConvertingTransform::transform(Chunk & chunk)
99{
100 auto & source = getInputPort().getHeader();
101 auto & result = getOutputPort().getHeader();
102
103 auto num_rows = chunk.getNumRows();
104 auto src_columns = chunk.detachColumns();
105
106 size_t num_res_columns = conversion.size();
107
108 Columns res_columns;
109 res_columns.reserve(num_res_columns);
110
111 for (size_t res_pos = 0; res_pos < num_res_columns; ++res_pos)
112 {
113 auto src_elem = source.getByPosition(conversion[res_pos]);
114 src_elem.column = src_columns[conversion[res_pos]];
115 auto res_elem = result.getByPosition(res_pos);
116
117 ColumnPtr converted = castColumnWithDiagnostic(src_elem, res_elem, context);
118
119 if (!isColumnConst(*res_elem.column))
120 converted = converted->convertToFullColumnIfConst();
121
122 res_columns.emplace_back(std::move(converted));
123 }
124
125 chunk.setColumns(std::move(res_columns), num_rows);
126}
127
128}
129