1#include <Processors/Formats/IRowInputFormat.h>
2#include <IO/WriteHelpers.h> // toString
3#include <common/logger_useful.h>
4
5
6namespace DB
7{
8
9namespace ErrorCodes
10{
11 extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
12 extern const int CANNOT_PARSE_QUOTED_STRING;
13 extern const int CANNOT_PARSE_DATE;
14 extern const int CANNOT_PARSE_DATETIME;
15 extern const int CANNOT_READ_ARRAY_FROM_TEXT;
16 extern const int CANNOT_PARSE_NUMBER;
17 extern const int CANNOT_PARSE_UUID;
18 extern const int TOO_LARGE_STRING_SIZE;
19 extern const int INCORRECT_NUMBER_OF_COLUMNS;
20 extern const int TIMEOUT_EXCEEDED;
21}
22
23
24bool isParseError(int code)
25{
26 return code == ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED
27 || code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING
28 || code == ErrorCodes::CANNOT_PARSE_DATE
29 || code == ErrorCodes::CANNOT_PARSE_DATETIME
30 || code == ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT
31 || code == ErrorCodes::CANNOT_PARSE_NUMBER
32 || code == ErrorCodes::CANNOT_PARSE_UUID
33 || code == ErrorCodes::TOO_LARGE_STRING_SIZE;
34}
35
36
37Chunk IRowInputFormat::generate()
38{
39 if (total_rows == 0)
40 readPrefix();
41
42 const Block & header = getPort().getHeader();
43
44 size_t num_columns = header.columns();
45 MutableColumns columns = header.cloneEmptyColumns();
46
47 ///auto chunk_missing_values = std::make_unique<ChunkMissingValues>();
48 block_missing_values.clear();
49
50 try
51 {
52 for (size_t rows = 0; rows < params.max_block_size; ++rows)
53 {
54 try
55 {
56 ++total_rows;
57
58 RowReadExtension info;
59 if (!readRow(columns, info))
60 break;
61 if (params.callback)
62 params.callback();
63
64 for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
65 {
66 if (!info.read_columns[column_idx])
67 {
68 size_t column_size = columns[column_idx]->size();
69 if (column_size == 0)
70 throw Exception("Unexpected empty column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
71 block_missing_values.setBit(column_idx, column_size - 1);
72 }
73 }
74 }
75 catch (Exception & e)
76 {
77 /// Logic for possible skipping of errors.
78
79 if (!isParseError(e.code()))
80 throw;
81
82 if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0)
83 throw;
84
85 ++num_errors;
86 Float64 current_error_ratio = static_cast<Float64>(num_errors) / total_rows;
87
88 if (num_errors > params.allow_errors_num
89 && current_error_ratio > params.allow_errors_ratio)
90 {
91 e.addMessage("(Already have " + toString(num_errors) + " errors"
92 " out of " + toString(total_rows) + " rows"
93 ", which is " + toString(current_error_ratio) + " of all rows)");
94 throw;
95 }
96
97 if (!allowSyncAfterError())
98 {
99 e.addMessage("(Input format doesn't allow to skip errors)");
100 throw;
101 }
102
103 syncAfterError();
104
105 /// Truncate all columns in block to minimal size (remove values, that was appended to only part of columns).
106
107 size_t min_size = std::numeric_limits<size_t>::max();
108 for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
109 min_size = std::min(min_size, columns[column_idx]->size());
110
111 for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
112 {
113 auto & column = columns[column_idx];
114 if (column->size() > min_size)
115 column->popBack(column->size() - min_size);
116 }
117 }
118 }
119 }
120 catch (Exception & e)
121 {
122 if (!isParseError(e.code()))
123 throw;
124
125 String verbose_diagnostic;
126 try
127 {
128 verbose_diagnostic = getDiagnosticInfo();
129 }
130 catch (...)
131 {
132 /// Error while trying to obtain verbose diagnostic. Ok to ignore.
133 }
134
135 e.addMessage("(at row " + toString(total_rows) + ")\n" + verbose_diagnostic);
136 throw;
137 }
138
139 if (columns.empty() || columns[0]->empty())
140 {
141 if (params.allow_errors_num > 0 || params.allow_errors_ratio > 0)
142 {
143 Logger * log = &Logger::get("IRowInputFormat");
144 LOG_TRACE(log, "Skipped " << num_errors << " rows with errors while reading the input stream");
145 }
146
147 readSuffix();
148 return {};
149 }
150
151 auto num_rows = columns.front()->size();
152 Chunk chunk(std::move(columns), num_rows);
153 //chunk.setChunkInfo(std::move(chunk_missing_values));
154 return chunk;
155}
156
157void IRowInputFormat::syncAfterError()
158{
159 throw Exception("Method syncAfterError is not implemented for input format", ErrorCodes::NOT_IMPLEMENTED);
160}
161
162void IRowInputFormat::resetParser()
163{
164 IInputFormat::resetParser();
165 total_rows = 0;
166 num_errors = 0;
167 block_missing_values.clear();
168}
169
170
171}
172