1#include <Processors/Formats/RowInputFormatWithDiagnosticInfo.h>
2#include <Formats/verbosePrintString.h>
3#include <IO/Operators.h>
4#include <IO/WriteBufferFromString.h>
5
6
7namespace DB
8{
9
10namespace ErrorCodes
11{
12 extern const int LOGICAL_ERROR;
13}
14
15DB::RowInputFormatWithDiagnosticInfo::RowInputFormatWithDiagnosticInfo(const Block & header_, ReadBuffer & in_, const Params & params_)
16 : IRowInputFormat(header_, in_, params_)
17{
18}
19
20void DB::RowInputFormatWithDiagnosticInfo::updateDiagnosticInfo()
21{
22 ++row_num;
23
24 bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row;
25 bytes_read_at_start_of_buffer_on_current_row = in.count() - in.offset();
26
27 offset_of_prev_row = offset_of_current_row;
28 offset_of_current_row = in.offset();
29}
30
31String DB::RowInputFormatWithDiagnosticInfo::getDiagnosticInfo()
32{
33 if (in.eof()) /// Buffer has gone, cannot extract information about what has been parsed.
34 return {};
35
36 WriteBufferFromOwnString out;
37
38 auto & header = getPort().getHeader();
39 MutableColumns columns = header.cloneEmptyColumns();
40
41 /// It is possible to display detailed diagnostics only if the last and next to last rows are still in the read buffer.
42 size_t bytes_read_at_start_of_buffer = in.count() - in.offset();
43 if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row)
44 {
45 out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n";
46 return out.str();
47 }
48
49 max_length_of_column_name = 0;
50 for (size_t i = 0; i < header.columns(); ++i)
51 if (header.safeGetByPosition(i).name.size() > max_length_of_column_name)
52 max_length_of_column_name = header.safeGetByPosition(i).name.size();
53
54 max_length_of_data_type_name = 0;
55 for (size_t i = 0; i < header.columns(); ++i)
56 if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
57 max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size();
58
59 /// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information.
60
61 if (offset_of_prev_row <= in.buffer().size())
62 {
63 in.position() = in.buffer().begin() + offset_of_prev_row;
64
65 out << "\nRow " << (row_num - 1) << ":\n";
66 if (!parseRowAndPrintDiagnosticInfo(columns, out))
67 return out.str();
68 }
69 else
70 {
71 if (in.buffer().size() < offset_of_current_row)
72 {
73 out << "Could not print diagnostic info because parsing of data hasn't started.\n";
74 return out.str();
75 }
76
77 in.position() = in.buffer().begin() + offset_of_current_row;
78 }
79
80 out << "\nRow " << row_num << ":\n";
81 parseRowAndPrintDiagnosticInfo(columns, out);
82 out << "\n";
83
84 return out.str();
85}
86
87bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(const String & col_name,
88 const DataTypePtr & type,
89 IColumn & column,
90 WriteBuffer & out,
91 size_t file_column)
92{
93 out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ')
94 << "name: " << alignedName(col_name, max_length_of_column_name)
95 << "type: " << alignedName(type->getName(), max_length_of_data_type_name);
96
97 auto prev_position = in.position();
98 auto curr_position = in.position();
99 std::exception_ptr exception;
100
101 try
102 {
103 tryDeserializeFiled(type, column, file_column, prev_position, curr_position);
104 }
105 catch (...)
106 {
107 exception = std::current_exception();
108 }
109
110 if (curr_position < prev_position)
111 throw Exception("Logical error: parsing is non-deterministic.", ErrorCodes::LOGICAL_ERROR);
112
113 if (isNativeNumber(type) || isDateOrDateTime(type))
114 {
115 /// An empty string instead of a value.
116 if (curr_position == prev_position)
117 {
118 out << "ERROR: text ";
119 verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out);
120 out << " is not like " << type->getName() << "\n";
121 return false;
122 }
123 }
124
125 out << "parsed text: ";
126 verbosePrintString(prev_position, curr_position, out);
127
128 if (exception)
129 {
130 if (type->getName() == "DateTime")
131 out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
132 else if (type->getName() == "Date")
133 out << "ERROR: Date must be in YYYY-MM-DD format.\n";
134 else
135 out << "ERROR\n";
136 return false;
137 }
138
139 out << "\n";
140
141 if (type->haveMaximumSizeOfValue())
142 {
143 if (isGarbageAfterField(file_column, curr_position))
144 {
145 out << "ERROR: garbage after " << type->getName() << ": ";
146 verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out);
147 out << "\n";
148
149 if (type->getName() == "DateTime")
150 out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n";
151 else if (type->getName() == "Date")
152 out << "ERROR: Date must be in YYYY-MM-DD format.\n";
153
154 return false;
155 }
156 }
157
158 return true;
159}
160
161String RowInputFormatWithDiagnosticInfo::alignedName(const String & name, size_t max_length) const
162{
163 size_t spaces_count = max_length >= name.size() ? max_length - name.size() : 0;
164 return name + ", " + std::string(spaces_count, ' ');
165}
166
167void RowInputFormatWithDiagnosticInfo::resetParser()
168{
169 IRowInputFormat::resetParser();
170 row_num = 0;
171 bytes_read_at_start_of_buffer_on_current_row = 0;
172 bytes_read_at_start_of_buffer_on_prev_row = 0;
173 offset_of_current_row = std::numeric_limits<size_t>::max();
174 offset_of_prev_row = std::numeric_limits<size_t>::max();
175 max_length_of_column_name = 0;
176 max_length_of_data_type_name = 0;
177}
178
179
180}
181