1#include "ODBCBlockInputStream.h"
2#include <vector>
3#include <Columns/ColumnNullable.h>
4#include <Columns/ColumnString.h>
5#include <Columns/ColumnsNumber.h>
6#include <Common/assert_cast.h>
7#include <IO/ReadHelpers.h>
8#include <IO/WriteHelpers.h>
9#include <common/logger_useful.h>
10#include <ext/range.h>
11
12
13namespace DB
14{
15namespace ErrorCodes
16{
17 extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
18}
19
20
21ODBCBlockInputStream::ODBCBlockInputStream(
22 Poco::Data::Session && session_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_)
23 : session{session_}
24 , statement{(this->session << query_str, Poco::Data::Keywords::now)}
25 , result{statement}
26 , iterator{result.begin()}
27 , max_block_size{max_block_size_}
28 , log(&Logger::get("ODBCBlockInputStream"))
29{
30 if (sample_block.columns() != result.columnCount())
31 throw Exception{"RecordSet contains " + toString(result.columnCount()) + " columns while " + toString(sample_block.columns())
32 + " expected",
33 ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
34
35 description.init(sample_block);
36}
37
38
39namespace
40{
41 using ValueType = ExternalResultDescription::ValueType;
42
43 static void insertValue(IColumn & column, const ValueType type, const Poco::Dynamic::Var & value)
44 {
45 switch (type)
46 {
47 case ValueType::vtUInt8:
48 assert_cast<ColumnUInt8 &>(column).insertValue(value.convert<UInt64>());
49 break;
50 case ValueType::vtUInt16:
51 assert_cast<ColumnUInt16 &>(column).insertValue(value.convert<UInt64>());
52 break;
53 case ValueType::vtUInt32:
54 assert_cast<ColumnUInt32 &>(column).insertValue(value.convert<UInt64>());
55 break;
56 case ValueType::vtUInt64:
57 assert_cast<ColumnUInt64 &>(column).insertValue(value.convert<UInt64>());
58 break;
59 case ValueType::vtInt8:
60 assert_cast<ColumnInt8 &>(column).insertValue(value.convert<Int64>());
61 break;
62 case ValueType::vtInt16:
63 assert_cast<ColumnInt16 &>(column).insertValue(value.convert<Int64>());
64 break;
65 case ValueType::vtInt32:
66 assert_cast<ColumnInt32 &>(column).insertValue(value.convert<Int64>());
67 break;
68 case ValueType::vtInt64:
69 assert_cast<ColumnInt64 &>(column).insertValue(value.convert<Int64>());
70 break;
71 case ValueType::vtFloat32:
72 assert_cast<ColumnFloat32 &>(column).insertValue(value.convert<Float64>());
73 break;
74 case ValueType::vtFloat64:
75 assert_cast<ColumnFloat64 &>(column).insertValue(value.convert<Float64>());
76 break;
77 case ValueType::vtString:
78 assert_cast<ColumnString &>(column).insert(value.convert<String>());
79 break;
80 case ValueType::vtDate:
81 assert_cast<ColumnUInt16 &>(column).insertValue(UInt16{LocalDate{value.convert<String>()}.getDayNum()});
82 break;
83 case ValueType::vtDateTime:
84 assert_cast<ColumnUInt32 &>(column).insertValue(time_t{LocalDateTime{value.convert<String>()}});
85 break;
86 case ValueType::vtUUID:
87 assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.convert<std::string>()));
88 break;
89 }
90 }
91
92 void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
93}
94
95
96Block ODBCBlockInputStream::readImpl()
97{
98 if (iterator == result.end())
99 return {};
100
101 MutableColumns columns(description.sample_block.columns());
102 for (const auto i : ext::range(0, columns.size()))
103 columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
104
105 size_t num_rows = 0;
106 while (iterator != result.end())
107 {
108 Poco::Data::Row & row = *iterator;
109
110 for (const auto idx : ext::range(0, row.fieldCount()))
111 {
112 const Poco::Dynamic::Var & value = row[idx];
113
114 if (!value.isEmpty())
115 {
116 if (description.types[idx].second)
117 {
118 ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
119 insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
120 column_nullable.getNullMapData().emplace_back(0);
121 }
122 else
123 insertValue(*columns[idx], description.types[idx].first, value);
124 }
125 else
126 insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
127 }
128
129 ++iterator;
130
131 ++num_rows;
132 if (num_rows == max_block_size)
133 break;
134 }
135
136 return description.sample_block.cloneWithColumns(std::move(columns));
137}
138
139}
140