1#include <DataStreams/ExpressionBlockInputStream.h>
2#include <DataStreams/CheckConstraintsBlockOutputStream.h>
3#include <Parsers/formatAST.h>
4#include <Columns/ColumnsCommon.h>
5#include <Columns/ColumnsNumber.h>
6#include <Common/assert_cast.h>
7#include <Common/quoteString.h>
8#include <Common/FieldVisitors.h>
9
10
11namespace DB
12{
13
14namespace ErrorCodes
15{
16 extern const int VIOLATED_CONSTRAINT;
17}
18
19
20CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream(
21 const String & table_,
22 const BlockOutputStreamPtr & output_,
23 const Block & header_,
24 const ConstraintsDescription & constraints_,
25 const Context & context_)
26 : table(table_),
27 output(output_),
28 header(header_),
29 constraints(constraints_),
30 expressions(constraints_.getExpressions(context_, header.getNamesAndTypesList()))
31{
32}
33
34
35void CheckConstraintsBlockOutputStream::write(const Block & block)
36{
37 if (block.rows() > 0)
38 {
39 Block block_to_calculate = block;
40 for (size_t i = 0; i < expressions.size(); ++i)
41 {
42 auto constraint_expr = expressions[i];
43
44 constraint_expr->execute(block_to_calculate);
45 ColumnWithTypeAndName res_column = block_to_calculate.getByPosition(block_to_calculate.columns() - 1);
46 const ColumnUInt8 & res_column_uint8 = assert_cast<const ColumnUInt8 &>(*res_column.column);
47
48 const UInt8 * data = res_column_uint8.getData().data();
49 size_t size = res_column_uint8.size();
50
51 /// Is violated.
52 if (!memoryIsByte(data, size, 1))
53 {
54 size_t row_idx = 0;
55 for (; row_idx < size; ++row_idx)
56 if (data[row_idx] != 1)
57 break;
58
59 Names related_columns = constraint_expr->getRequiredColumns();
60
61 std::stringstream exception_message;
62
63 exception_message << "Constraint " << backQuote(constraints.constraints[i]->name)
64 << " for table " << backQuote(table)
65 << " is violated at row " << (rows_written + row_idx + 1)
66 << ". Expression: (" << serializeAST(*(constraints.constraints[i]->expr), true) << ")"
67 << ". Column values";
68
69 bool first = true;
70 for (const auto & name : related_columns)
71 {
72 const IColumn & column = *block.getByName(name).column;
73 assert(row_idx < column.size());
74
75 exception_message << (first ? ": " : ", ")
76 << backQuoteIfNeed(name) << " = " << applyVisitor(FieldVisitorToString(), column[row_idx]);
77
78 first = false;
79 }
80
81 throw Exception{exception_message.str(), ErrorCodes::VIOLATED_CONSTRAINT};
82 }
83 }
84 }
85
86 output->write(block);
87 rows_written += block.rows();
88}
89
90void CheckConstraintsBlockOutputStream::flush()
91{
92 output->flush();
93}
94
95void CheckConstraintsBlockOutputStream::writePrefix()
96{
97 output->writePrefix();
98}
99
100void CheckConstraintsBlockOutputStream::writeSuffix()
101{
102 output->writeSuffix();
103}
104
105}
106