1#include "duckdb_miniparquet.hpp"
2
3#include "miniparquet.h"
4#include "duckdb/function/table_function.hpp"
5#include "duckdb/parser/parsed_data/create_table_function_info.hpp"
6#include "duckdb/main/client_context.hpp"
7#include "duckdb/common/types/date.hpp"
8#include "duckdb/common/types/timestamp.hpp"
9#include "utf8proc_wrapper.hpp"
10#include "duckdb.hpp"
11
12
13using namespace duckdb;
14using namespace miniparquet;
15
16struct ParquetScanFunctionData : public TableFunctionData {
17 ParquetScanFunctionData(string filename)
18 : position(0), pf(filename) {
19 }
20 vector<SQLType> sql_types;
21 idx_t position;
22 ParquetFile pf;
23 ResultChunk rc;
24 ScanState s;
25};
26
27struct ParquetScanFunction : public TableFunction {
28 ParquetScanFunction()
29 : TableFunction("parquet_scan", {SQLType::VARCHAR}, parquet_scan_bind, parquet_scan_function, nullptr){};
30
31 static unique_ptr<FunctionData> parquet_scan_bind(ClientContext &context, vector<Value> inputs,
32 vector<SQLType> &return_types, vector<string> &names) {
33
34 auto file_name = inputs[0].GetValue<string>();
35 auto res = make_unique<ParquetScanFunctionData>(file_name);
36 res->pf.initialize_result(res->rc);
37
38 for (auto& col : res->pf.columns) {
39 names.push_back(col->name);
40 SQLType type;
41 switch (col->type) {
42 case parquet::format::Type::BOOLEAN:
43 type = SQLType::BOOLEAN;
44 break;
45 case parquet::format::Type::INT32:
46 type = SQLType::INTEGER;
47 break;
48 case parquet::format::Type::INT64:
49 type = SQLType::BIGINT;
50 break;
51 case parquet::format::Type::INT96: // always a timestamp?
52 type = SQLType::TIMESTAMP;
53 break;
54 case parquet::format::Type::FLOAT:
55 type = SQLType::FLOAT;
56 break;
57 case parquet::format::Type::DOUBLE:
58 type = SQLType::DOUBLE;
59 break;
60// case parquet::format::Type::FIXED_LEN_BYTE_ARRAY: {
61 // todo some decimals yuck
62 case parquet::format::Type::BYTE_ARRAY:
63 type = SQLType::VARCHAR;
64 break;
65
66 default:
67 throw NotImplementedException("Invalid type");
68 break;
69 }
70 return_types.push_back(type);
71 res->sql_types.push_back(type);
72 }
73
74
75 return move(res);
76 }
77
78 template <class T> static void scan_parquet_column(ResultColumn& parquet_col, idx_t count, idx_t offset, Vector &out) {
79 auto src_ptr = (T *)parquet_col.data.ptr;
80 auto tgt_ptr = FlatVector::GetData<T>(out);
81
82 for (idx_t row = 0; row < count; row++) {
83 tgt_ptr[row] = src_ptr[row + offset];
84 }
85 }
86
87 // surely they are joking
88 static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
89 static constexpr int64_t kMillisecondsInADay = 86400000LL;
90 static constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;
91
92 static int64_t impala_timestamp_to_nanoseconds(const Int96 &impala_timestamp) {
93 int64_t days_since_epoch = impala_timestamp.value[2]
94 - kJulianToUnixEpochDays;
95 int64_t nanoseconds =
96 *(reinterpret_cast<const int64_t*>(&(impala_timestamp.value)));
97 return days_since_epoch * kNanosecondsInADay + nanoseconds;
98 }
99
100
101 static void parquet_scan_function(ClientContext &context, vector<Value> &input, DataChunk &output,
102 FunctionData *dataptr) {
103 auto &data = *((ParquetScanFunctionData *)dataptr);
104
105 if (data.position >= data.rc.nrows) {
106 if (!data.pf.scan(data.s, data.rc)) {
107 return;
108 }
109 data.position = 0;
110 }
111 idx_t this_count = std::min((idx_t)STANDARD_VECTOR_SIZE, data.rc.nrows - data.position);
112 assert(this_count > 0);
113 output.SetCardinality(this_count);
114
115 for (idx_t col_idx = 0; col_idx < output.column_count(); col_idx++) {
116 auto& col = data.rc.cols[col_idx];
117
118 for (idx_t row = 0; row < this_count; row++) {
119 FlatVector::SetNull(output.data[col_idx], row, !((uint8_t*) col.defined.ptr)[row + data.position]);
120 }
121
122 switch (data.sql_types[col_idx].id) {
123 case SQLTypeId::BOOLEAN:
124 scan_parquet_column<bool>(col, this_count, data.position, output.data[col_idx]);
125 break;
126 case SQLTypeId::INTEGER:
127 scan_parquet_column<int32_t>(col, this_count, data.position, output.data[col_idx]);
128 break;
129 case SQLTypeId::BIGINT:
130 scan_parquet_column<int64_t>(col, this_count, data.position, output.data[col_idx]);
131 break;
132 case SQLTypeId::FLOAT:
133 scan_parquet_column<float>(col, this_count, data.position, output.data[col_idx]);
134 break;
135 case SQLTypeId::DOUBLE:
136 scan_parquet_column<double>(col, this_count, data.position, output.data[col_idx]);
137 break;
138 case SQLTypeId::TIMESTAMP: {
139 auto tgt_ptr = (timestamp_t *)FlatVector::GetData(output.data[col_idx]);
140 for (idx_t row = 0; row < this_count; row++) {
141 auto impala_ns = impala_timestamp_to_nanoseconds(((Int96*) col.data.ptr)[row + data.position]);
142
143 auto ms = impala_ns / 1000000; // nanoseconds
144 auto ms_per_day = (int64_t)60 * 60 * 24 * 1000;
145 date_t date = Date::EpochToDate(ms / 1000);
146 dtime_t time = (dtime_t)(ms % ms_per_day);
147 tgt_ptr[row] = Timestamp::FromDatetime(date, time);
148 ;
149 }
150 // ugh
151 break;
152 } break;
153 case SQLTypeId::VARCHAR: {
154 auto src_ptr = (char**)col.data.ptr;
155 auto tgt_ptr = (string_t *)FlatVector::GetData(output.data[col_idx]);
156
157 for (idx_t row = 0; row < this_count; row++) {
158 auto val = src_ptr[row + data.position];
159 if (!FlatVector::IsNull(output.data[col_idx], row)) {
160 auto utf_type = Utf8Proc::Analyze(val);
161 if (utf_type == UnicodeType::INVALID) {
162 throw Exception("Invalid UTF in Parquet file");
163 } else if (utf_type == UnicodeType::ASCII) {
164 tgt_ptr[row] = StringVector::AddString(output.data[col_idx], val);
165 } else {
166 auto val_norm = Utf8Proc::Normalize(val);
167 tgt_ptr[row] = StringVector::AddString(output.data[col_idx], val_norm);
168 free(val_norm);
169 }
170 }
171 }
172 break;
173 }
174 default:
175 throw NotImplementedException("Unsupported type " + SQLTypeToString(data.sql_types[col_idx]));
176 }
177 }
178 data.position += this_count;
179 }
180};
181
182void Parquet::Init(DuckDB& db) {
183 ParquetScanFunction scan_fun;
184 CreateTableFunctionInfo info(scan_fun);
185
186 Connection conn(db);
187 conn.context->transaction.BeginTransaction();
188 conn.context->catalog.CreateTableFunction(*conn.context, &info);
189 conn.context->transaction.Commit();
190}
191