1#include "config_core.h"
2#if USE_MYSQL
3
4#include <vector>
5#include <Columns/ColumnNullable.h>
6#include <Columns/ColumnString.h>
7#include <Columns/ColumnsNumber.h>
8#include <Common/assert_cast.h>
9#include <IO/ReadHelpers.h>
10#include <IO/WriteHelpers.h>
11#include <ext/range.h>
12#include "MySQLBlockInputStream.h"
13
14
15namespace DB
16{
17namespace ErrorCodes
18{
19 extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
20}
21
22
23MySQLBlockInputStream::MySQLBlockInputStream(
24 const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, const bool auto_close_)
25 : entry{entry_}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size_}, auto_close{auto_close_}
26{
27 if (sample_block.columns() != result.getNumFields())
28 throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while "
29 + toString(sample_block.columns()) + " expected",
30 ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
31
32 description.init(sample_block);
33}
34
35
36namespace
37{
38 using ValueType = ExternalResultDescription::ValueType;
39
40 void insertValue(IColumn & column, const ValueType type, const mysqlxx::Value & value)
41 {
42 switch (type)
43 {
44 case ValueType::vtUInt8:
45 assert_cast<ColumnUInt8 &>(column).insertValue(value.getUInt());
46 break;
47 case ValueType::vtUInt16:
48 assert_cast<ColumnUInt16 &>(column).insertValue(value.getUInt());
49 break;
50 case ValueType::vtUInt32:
51 assert_cast<ColumnUInt32 &>(column).insertValue(value.getUInt());
52 break;
53 case ValueType::vtUInt64:
54 assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());
55 break;
56 case ValueType::vtInt8:
57 assert_cast<ColumnInt8 &>(column).insertValue(value.getInt());
58 break;
59 case ValueType::vtInt16:
60 assert_cast<ColumnInt16 &>(column).insertValue(value.getInt());
61 break;
62 case ValueType::vtInt32:
63 assert_cast<ColumnInt32 &>(column).insertValue(value.getInt());
64 break;
65 case ValueType::vtInt64:
66 assert_cast<ColumnInt64 &>(column).insertValue(value.getInt());
67 break;
68 case ValueType::vtFloat32:
69 assert_cast<ColumnFloat32 &>(column).insertValue(value.getDouble());
70 break;
71 case ValueType::vtFloat64:
72 assert_cast<ColumnFloat64 &>(column).insertValue(value.getDouble());
73 break;
74 case ValueType::vtString:
75 assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
76 break;
77 case ValueType::vtDate:
78 assert_cast<ColumnUInt16 &>(column).insertValue(UInt16(value.getDate().getDayNum()));
79 break;
80 case ValueType::vtDateTime:
81 assert_cast<ColumnUInt32 &>(column).insertValue(UInt32(value.getDateTime()));
82 break;
83 case ValueType::vtUUID:
84 assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
85 break;
86 }
87 }
88
89 void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
90}
91
92
93Block MySQLBlockInputStream::readImpl()
94{
95 auto row = result.fetch();
96 if (!row)
97 {
98 if (auto_close)
99 entry.disconnect();
100 return {};
101 }
102
103 MutableColumns columns(description.sample_block.columns());
104 for (const auto i : ext::range(0, columns.size()))
105 columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty();
106
107 size_t num_rows = 0;
108 while (row)
109 {
110 for (const auto idx : ext::range(0, row.size()))
111 {
112 const auto value = row[idx];
113 if (!value.isNull())
114 {
115 if (description.types[idx].second)
116 {
117 ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
118 insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
119 column_nullable.getNullMapData().emplace_back(0);
120 }
121 else
122 insertValue(*columns[idx], description.types[idx].first, value);
123 }
124 else
125 insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
126 }
127
128 ++num_rows;
129 if (num_rows == max_block_size)
130 break;
131
132 row = result.fetch();
133 }
134 return description.sample_block.cloneWithColumns(std::move(columns));
135}
136
137}
138
139#endif
140