1 | #include <DataStreams/ExpressionBlockInputStream.h> |
2 | #include <DataStreams/CheckConstraintsBlockOutputStream.h> |
3 | #include <Parsers/formatAST.h> |
4 | #include <Columns/ColumnsCommon.h> |
5 | #include <Columns/ColumnsNumber.h> |
6 | #include <Common/assert_cast.h> |
7 | #include <Common/quoteString.h> |
8 | #include <Common/FieldVisitors.h> |
9 | |
10 | |
11 | namespace DB |
12 | { |
13 | |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int VIOLATED_CONSTRAINT; |
17 | } |
18 | |
19 | |
20 | CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream( |
21 | const String & table_, |
22 | const BlockOutputStreamPtr & output_, |
23 | const Block & , |
24 | const ConstraintsDescription & constraints_, |
25 | const Context & context_) |
26 | : table(table_), |
27 | output(output_), |
28 | header(header_), |
29 | constraints(constraints_), |
30 | expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList())) |
31 | { |
32 | } |
33 | |
34 | |
35 | void CheckConstraintsBlockOutputStream::write(const Block & block) |
36 | { |
37 | if (block.rows() > 0) |
38 | { |
39 | Block block_to_calculate = block; |
40 | for (size_t i = 0; i < expressions.size(); ++i) |
41 | { |
42 | auto constraint_expr = expressions[i]; |
43 | |
44 | constraint_expr->execute(block_to_calculate); |
45 | ColumnWithTypeAndName res_column = block_to_calculate.getByPosition(block_to_calculate.columns() - 1); |
46 | const ColumnUInt8 & res_column_uint8 = assert_cast<const ColumnUInt8 &>(*res_column.column); |
47 | |
48 | const UInt8 * data = res_column_uint8.getData().data(); |
49 | size_t size = res_column_uint8.size(); |
50 | |
51 | /// Is violated. |
52 | if (!memoryIsByte(data, size, 1)) |
53 | { |
54 | size_t row_idx = 0; |
55 | for (; row_idx < size; ++row_idx) |
56 | if (data[row_idx] != 1) |
57 | break; |
58 | |
59 | Names related_columns = constraint_expr->getRequiredColumns(); |
60 | |
61 | std::stringstream exception_message; |
62 | |
63 | exception_message << "Constraint " << backQuote(constraints.constraints[i]->name) |
64 | << " for table " << backQuote(table) |
65 | << " is violated at row " << (rows_written + row_idx + 1) |
66 | << ". Expression: (" << serializeAST(*(constraints.constraints[i]->expr), true) << ")" |
67 | << ". Column values" ; |
68 | |
69 | bool first = true; |
70 | for (const auto & name : related_columns) |
71 | { |
72 | const IColumn & column = *block.getByName(name).column; |
73 | assert(row_idx < column.size()); |
74 | |
75 | exception_message << (first ? ": " : ", " ) |
76 | << backQuoteIfNeed(name) << " = " << applyVisitor(FieldVisitorToString(), column[row_idx]); |
77 | |
78 | first = false; |
79 | } |
80 | |
81 | throw Exception{exception_message.str(), ErrorCodes::VIOLATED_CONSTRAINT}; |
82 | } |
83 | } |
84 | } |
85 | |
86 | output->write(block); |
87 | rows_written += block.rows(); |
88 | } |
89 | |
90 | void CheckConstraintsBlockOutputStream::flush() |
91 | { |
92 | output->flush(); |
93 | } |
94 | |
95 | void CheckConstraintsBlockOutputStream::writePrefix() |
96 | { |
97 | output->writePrefix(); |
98 | } |
99 | |
100 | void CheckConstraintsBlockOutputStream::writeSuffix() |
101 | { |
102 | output->writeSuffix(); |
103 | } |
104 | |
105 | } |
106 | |