1 | #include "duckdb/execution/operator/persistent/physical_copy_to_file.hpp" |
2 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
3 | |
4 | #include <algorithm> |
5 | #include <fstream> |
6 | |
7 | using namespace duckdb; |
8 | using namespace std; |
9 | |
10 | class BufferedWriter { |
11 | constexpr static idx_t BUFFER_SIZE = 4096 * 4; |
12 | |
13 | public: |
14 | BufferedWriter(string &path) : pos(0) { |
15 | to_csv.open(path); |
16 | if (to_csv.fail()) { |
17 | throw IOException("Could not open CSV file" ); |
18 | } |
19 | } |
20 | |
21 | void Flush() { |
22 | if (pos > 0) { |
23 | to_csv.write(buffer, pos); |
24 | pos = 0; |
25 | } |
26 | } |
27 | |
28 | void Close() { |
29 | Flush(); |
30 | to_csv.close(); |
31 | } |
32 | |
33 | void Write(const char *buf, idx_t len) { |
34 | if (len >= BUFFER_SIZE) { |
35 | Flush(); |
36 | to_csv.write(buf, len); |
37 | return; |
38 | } |
39 | if (pos + len > BUFFER_SIZE) { |
40 | Flush(); |
41 | } |
42 | memcpy(buffer + pos, buf, len); |
43 | pos += len; |
44 | } |
45 | |
46 | void Write(string &value) { |
47 | Write(value.c_str(), value.size()); |
48 | } |
49 | |
50 | private: |
51 | char buffer[BUFFER_SIZE]; |
52 | idx_t pos = 0; |
53 | |
54 | ofstream to_csv; |
55 | }; |
56 | |
57 | string AddEscapes(string &to_be_escaped, string escape, string val) { |
58 | idx_t i = 0; |
59 | string new_val = "" ; |
60 | idx_t found = val.find(to_be_escaped); |
61 | |
62 | while (found != string::npos) { |
63 | while (i < found) { |
64 | new_val += val[i]; |
65 | i++; |
66 | } |
67 | new_val += escape; |
68 | found = val.find(to_be_escaped, found + escape.length()); |
69 | } |
70 | while (i < val.length()) { |
71 | new_val += val[i]; |
72 | i++; |
73 | } |
74 | return new_val; |
75 | } |
76 | |
77 | static void WriteQuotedString(BufferedWriter &writer, string_t str_value, string &delimiter, string "e, |
78 | string &escape, string &null_str, bool write_quoted) { |
79 | // used for adding escapes |
80 | bool add_escapes = false; |
81 | auto str_data = str_value.GetData(); |
82 | string new_val(str_data, str_value.GetSize()); |
83 | |
84 | // check for \n, \r, \n\r in string |
85 | if (!write_quoted) { |
86 | for (idx_t i = 0; i < str_value.GetSize(); i++) { |
87 | if (str_data[i] == '\n' || str_data[i] == '\r') { |
88 | // newline, write a quoted string |
89 | write_quoted = true; |
90 | } |
91 | } |
92 | } |
93 | |
94 | // check if value is null string |
95 | if (!write_quoted) { |
96 | if (new_val == null_str) { |
97 | write_quoted = true; |
98 | } |
99 | } |
100 | |
101 | // check for delimiter |
102 | if (!write_quoted) { |
103 | if (new_val.find(delimiter) != string::npos) { |
104 | write_quoted = true; |
105 | } |
106 | } |
107 | |
108 | // check for quote |
109 | if (new_val.find(quote) != string::npos) { |
110 | write_quoted = true; |
111 | add_escapes = true; |
112 | } |
113 | |
114 | // check for escapes in quoted string |
115 | if (write_quoted && !add_escapes) { |
116 | if (new_val.find(escape) != string::npos) { |
117 | add_escapes = true; |
118 | } |
119 | } |
120 | |
121 | if (add_escapes) { |
122 | new_val = AddEscapes(escape, escape, new_val); |
123 | // also escape quotes |
124 | if (escape != quote) { |
125 | new_val = AddEscapes(quote, escape, new_val); |
126 | } |
127 | } |
128 | |
129 | if (!write_quoted) { |
130 | writer.Write(new_val); |
131 | } else { |
132 | writer.Write(quote); |
133 | writer.Write(new_val); |
134 | writer.Write(quote); |
135 | } |
136 | } |
137 | |
138 | void PhysicalCopyToFile::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state) { |
139 | auto &info = *this->info; |
140 | idx_t total = 0; |
141 | |
142 | string newline = "\n" ; |
143 | BufferedWriter writer(info.file_path); |
144 | if (info.header) { |
145 | // write the header line |
146 | for (idx_t i = 0; i < names.size(); i++) { |
147 | if (i != 0) { |
148 | writer.Write(info.delimiter); |
149 | } |
150 | WriteQuotedString(writer, names[i].c_str(), info.delimiter, info.quote, info.escape, info.null_str, false); |
151 | } |
152 | writer.Write(newline); |
153 | } |
154 | // create a chunk with VARCHAR columns |
155 | vector<TypeId> types; |
156 | for (idx_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) { |
157 | types.push_back(TypeId::VARCHAR); |
158 | } |
159 | DataChunk cast_chunk; |
160 | cast_chunk.Initialize(types); |
161 | |
162 | while (true) { |
163 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
164 | if (state->child_chunk.size() == 0) { |
165 | break; |
166 | } |
167 | // cast the columns of the chunk to varchar |
168 | cast_chunk.SetCardinality(state->child_chunk); |
169 | for (idx_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) { |
170 | if (sql_types[col_idx].id == SQLTypeId::VARCHAR || sql_types[col_idx].id == SQLTypeId::BLOB) { |
171 | // VARCHAR, just create a reference |
172 | cast_chunk.data[col_idx].Reference(state->child_chunk.data[col_idx]); |
173 | } else { |
174 | // non varchar column, perform the cast |
175 | VectorOperations::Cast(state->child_chunk.data[col_idx], cast_chunk.data[col_idx], sql_types[col_idx], |
176 | SQLType::VARCHAR, cast_chunk.size()); |
177 | } |
178 | } |
179 | cast_chunk.Normalify(); |
180 | // now loop over the vectors and output the values |
181 | for (idx_t i = 0; i < cast_chunk.size(); i++) { |
182 | // write values |
183 | for (idx_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) { |
184 | if (col_idx != 0) { |
185 | writer.Write(info.delimiter); |
186 | } |
187 | if (FlatVector::IsNull(cast_chunk.data[col_idx], i)) { |
188 | // write null value |
189 | writer.Write(info.null_str); |
190 | continue; |
191 | } |
192 | |
193 | // non-null value, fetch the string value from the cast chunk |
194 | auto str_data = FlatVector::GetData<string_t>(cast_chunk.data[col_idx]); |
195 | auto str_value = str_data[i]; |
196 | WriteQuotedString(writer, str_value, info.delimiter, info.quote, info.escape, info.null_str, |
197 | info.force_quote[col_idx]); |
198 | } |
199 | writer.Write(newline); |
200 | } |
201 | total += cast_chunk.size(); |
202 | } |
203 | writer.Close(); |
204 | |
205 | chunk.SetCardinality(1); |
206 | chunk.SetValue(0, 0, Value::BIGINT(total)); |
207 | |
208 | state->finished = true; |
209 | } |
210 | |