1 | #include <Processors/Formats/RowInputFormatWithDiagnosticInfo.h> |
2 | #include <Formats/verbosePrintString.h> |
3 | #include <IO/Operators.h> |
4 | #include <IO/WriteBufferFromString.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | namespace ErrorCodes |
11 | { |
12 | extern const int LOGICAL_ERROR; |
13 | } |
14 | |
15 | DB::RowInputFormatWithDiagnosticInfo::RowInputFormatWithDiagnosticInfo(const Block & , ReadBuffer & in_, const Params & params_) |
16 | : IRowInputFormat(header_, in_, params_) |
17 | { |
18 | } |
19 | |
20 | void DB::RowInputFormatWithDiagnosticInfo::updateDiagnosticInfo() |
21 | { |
22 | ++row_num; |
23 | |
24 | bytes_read_at_start_of_buffer_on_prev_row = bytes_read_at_start_of_buffer_on_current_row; |
25 | bytes_read_at_start_of_buffer_on_current_row = in.count() - in.offset(); |
26 | |
27 | offset_of_prev_row = offset_of_current_row; |
28 | offset_of_current_row = in.offset(); |
29 | } |
30 | |
31 | String DB::RowInputFormatWithDiagnosticInfo::getDiagnosticInfo() |
32 | { |
33 | if (in.eof()) /// Buffer has gone, cannot extract information about what has been parsed. |
34 | return {}; |
35 | |
36 | WriteBufferFromOwnString out; |
37 | |
38 | auto & = getPort().getHeader(); |
39 | MutableColumns columns = header.cloneEmptyColumns(); |
40 | |
41 | /// It is possible to display detailed diagnostics only if the last and next to last rows are still in the read buffer. |
42 | size_t bytes_read_at_start_of_buffer = in.count() - in.offset(); |
43 | if (bytes_read_at_start_of_buffer != bytes_read_at_start_of_buffer_on_prev_row) |
44 | { |
45 | out << "Could not print diagnostic info because two last rows aren't in buffer (rare case)\n" ; |
46 | return out.str(); |
47 | } |
48 | |
49 | max_length_of_column_name = 0; |
50 | for (size_t i = 0; i < header.columns(); ++i) |
51 | if (header.safeGetByPosition(i).name.size() > max_length_of_column_name) |
52 | max_length_of_column_name = header.safeGetByPosition(i).name.size(); |
53 | |
54 | max_length_of_data_type_name = 0; |
55 | for (size_t i = 0; i < header.columns(); ++i) |
56 | if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name) |
57 | max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size(); |
58 | |
59 | /// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information. |
60 | |
61 | if (offset_of_prev_row <= in.buffer().size()) |
62 | { |
63 | in.position() = in.buffer().begin() + offset_of_prev_row; |
64 | |
65 | out << "\nRow " << (row_num - 1) << ":\n" ; |
66 | if (!parseRowAndPrintDiagnosticInfo(columns, out)) |
67 | return out.str(); |
68 | } |
69 | else |
70 | { |
71 | if (in.buffer().size() < offset_of_current_row) |
72 | { |
73 | out << "Could not print diagnostic info because parsing of data hasn't started.\n" ; |
74 | return out.str(); |
75 | } |
76 | |
77 | in.position() = in.buffer().begin() + offset_of_current_row; |
78 | } |
79 | |
80 | out << "\nRow " << row_num << ":\n" ; |
81 | parseRowAndPrintDiagnosticInfo(columns, out); |
82 | out << "\n" ; |
83 | |
84 | return out.str(); |
85 | } |
86 | |
87 | bool RowInputFormatWithDiagnosticInfo::deserializeFieldAndPrintDiagnosticInfo(const String & col_name, |
88 | const DataTypePtr & type, |
89 | IColumn & column, |
90 | WriteBuffer & out, |
91 | size_t file_column) |
92 | { |
93 | out << "Column " << file_column << ", " << std::string((file_column < 10 ? 2 : file_column < 100 ? 1 : 0), ' ') |
94 | << "name: " << alignedName(col_name, max_length_of_column_name) |
95 | << "type: " << alignedName(type->getName(), max_length_of_data_type_name); |
96 | |
97 | auto prev_position = in.position(); |
98 | auto curr_position = in.position(); |
99 | std::exception_ptr exception; |
100 | |
101 | try |
102 | { |
103 | tryDeserializeFiled(type, column, file_column, prev_position, curr_position); |
104 | } |
105 | catch (...) |
106 | { |
107 | exception = std::current_exception(); |
108 | } |
109 | |
110 | if (curr_position < prev_position) |
111 | throw Exception("Logical error: parsing is non-deterministic." , ErrorCodes::LOGICAL_ERROR); |
112 | |
113 | if (isNativeNumber(type) || isDateOrDateTime(type)) |
114 | { |
115 | /// An empty string instead of a value. |
116 | if (curr_position == prev_position) |
117 | { |
118 | out << "ERROR: text " ; |
119 | verbosePrintString(prev_position, std::min(prev_position + 10, in.buffer().end()), out); |
120 | out << " is not like " << type->getName() << "\n" ; |
121 | return false; |
122 | } |
123 | } |
124 | |
125 | out << "parsed text: " ; |
126 | verbosePrintString(prev_position, curr_position, out); |
127 | |
128 | if (exception) |
129 | { |
130 | if (type->getName() == "DateTime" ) |
131 | out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n" ; |
132 | else if (type->getName() == "Date" ) |
133 | out << "ERROR: Date must be in YYYY-MM-DD format.\n" ; |
134 | else |
135 | out << "ERROR\n" ; |
136 | return false; |
137 | } |
138 | |
139 | out << "\n" ; |
140 | |
141 | if (type->haveMaximumSizeOfValue()) |
142 | { |
143 | if (isGarbageAfterField(file_column, curr_position)) |
144 | { |
145 | out << "ERROR: garbage after " << type->getName() << ": " ; |
146 | verbosePrintString(curr_position, std::min(curr_position + 10, in.buffer().end()), out); |
147 | out << "\n" ; |
148 | |
149 | if (type->getName() == "DateTime" ) |
150 | out << "ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss or NNNNNNNNNN (unix timestamp, exactly 10 digits) format.\n" ; |
151 | else if (type->getName() == "Date" ) |
152 | out << "ERROR: Date must be in YYYY-MM-DD format.\n" ; |
153 | |
154 | return false; |
155 | } |
156 | } |
157 | |
158 | return true; |
159 | } |
160 | |
161 | String RowInputFormatWithDiagnosticInfo::alignedName(const String & name, size_t max_length) const |
162 | { |
163 | size_t spaces_count = max_length >= name.size() ? max_length - name.size() : 0; |
164 | return name + ", " + std::string(spaces_count, ' '); |
165 | } |
166 | |
167 | void RowInputFormatWithDiagnosticInfo::resetParser() |
168 | { |
169 | IRowInputFormat::resetParser(); |
170 | row_num = 0; |
171 | bytes_read_at_start_of_buffer_on_current_row = 0; |
172 | bytes_read_at_start_of_buffer_on_prev_row = 0; |
173 | offset_of_current_row = std::numeric_limits<size_t>::max(); |
174 | offset_of_prev_row = std::numeric_limits<size_t>::max(); |
175 | max_length_of_column_name = 0; |
176 | max_length_of_data_type_name = 0; |
177 | } |
178 | |
179 | |
180 | } |
181 | |