1#pragma once
2
3#include <string>
4#include <Columns/IColumn.h>
5#include <Processors/Formats/IInputFormat.h>
6#include <DataStreams/SizeLimits.h>
7#include <Poco/Timespan.h>
8#include <Common/Stopwatch.h>
9
10
11namespace DB
12{
13
14/// Contains extra information about read data.
15struct RowReadExtension
16{
17 /// IRowInputStream.read() output. It contains non zero for columns that actually read from the source and zero otherwise.
18 /// It's used to attach defaults for partially filled rows.
19 std::vector<UInt8> read_columns;
20};
21
22/// Common parameters for generating blocks.
23struct RowInputFormatParams
24{
25 size_t max_block_size;
26
27 UInt64 allow_errors_num;
28 Float64 allow_errors_ratio;
29
30 using ReadCallback = std::function<void()>;
31 ReadCallback callback;
32
33 Poco::Timespan max_execution_time = 0;
34 OverflowMode timeout_overflow_mode = OverflowMode::THROW;
35};
36
37bool isParseError(int code);
38bool checkTimeLimit(const RowInputFormatParams & params, const Stopwatch & stopwatch);
39
40///Row oriented input format: reads data row by row.
41class IRowInputFormat : public IInputFormat
42{
43public:
44 using Params = RowInputFormatParams;
45
46 IRowInputFormat(
47 Block header,
48 ReadBuffer & in_,
49 Params params_)
50 : IInputFormat(std::move(header), in_), params(params_)
51 {
52 }
53
54 Chunk generate() override;
55
56 void resetParser() override;
57
58protected:
59 /** Read next row and append it to the columns.
60 * If no more rows - return false.
61 */
62 virtual bool readRow(MutableColumns & columns, RowReadExtension & extra) = 0;
63
64 virtual void readPrefix() {} /// delimiter before begin of result
65 virtual void readSuffix() {} /// delimiter after end of result
66
67 /// Skip data until next row.
68 /// This is intended for text streams, that allow skipping of errors.
69 /// By default - throws not implemented exception.
70 virtual bool allowSyncAfterError() const { return false; }
71 virtual void syncAfterError();
72
73 /// In case of parse error, try to roll back and parse last one or two rows very carefully
74 /// and collect as much as possible diagnostic information about error.
75 /// If not implemented, returns empty string.
76 virtual std::string getDiagnosticInfo() { return {}; }
77
78 const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
79
80private:
81 Params params;
82
83 size_t total_rows = 0;
84 size_t num_errors = 0;
85
86 BlockMissingValues block_missing_values;
87};
88
89}
90