1#include "duckdb/execution/operator/persistent/physical_copy_to_file.hpp"
2#include "duckdb/common/vector_operations/vector_operations.hpp"
3
4#include <algorithm>
5#include <fstream>
6
7using namespace duckdb;
8using namespace std;
9
10class BufferedWriter {
11 constexpr static idx_t BUFFER_SIZE = 4096 * 4;
12
13public:
14 BufferedWriter(string &path) : pos(0) {
15 to_csv.open(path);
16 if (to_csv.fail()) {
17 throw IOException("Could not open CSV file");
18 }
19 }
20
21 void Flush() {
22 if (pos > 0) {
23 to_csv.write(buffer, pos);
24 pos = 0;
25 }
26 }
27
28 void Close() {
29 Flush();
30 to_csv.close();
31 }
32
33 void Write(const char *buf, idx_t len) {
34 if (len >= BUFFER_SIZE) {
35 Flush();
36 to_csv.write(buf, len);
37 return;
38 }
39 if (pos + len > BUFFER_SIZE) {
40 Flush();
41 }
42 memcpy(buffer + pos, buf, len);
43 pos += len;
44 }
45
46 void Write(string &value) {
47 Write(value.c_str(), value.size());
48 }
49
50private:
51 char buffer[BUFFER_SIZE];
52 idx_t pos = 0;
53
54 ofstream to_csv;
55};
56
57string AddEscapes(string &to_be_escaped, string escape, string val) {
58 idx_t i = 0;
59 string new_val = "";
60 idx_t found = val.find(to_be_escaped);
61
62 while (found != string::npos) {
63 while (i < found) {
64 new_val += val[i];
65 i++;
66 }
67 new_val += escape;
68 found = val.find(to_be_escaped, found + escape.length());
69 }
70 while (i < val.length()) {
71 new_val += val[i];
72 i++;
73 }
74 return new_val;
75}
76
77static void WriteQuotedString(BufferedWriter &writer, string_t str_value, string &delimiter, string &quote,
78 string &escape, string &null_str, bool write_quoted) {
79 // used for adding escapes
80 bool add_escapes = false;
81 auto str_data = str_value.GetData();
82 string new_val(str_data, str_value.GetSize());
83
84 // check for \n, \r, \n\r in string
85 if (!write_quoted) {
86 for (idx_t i = 0; i < str_value.GetSize(); i++) {
87 if (str_data[i] == '\n' || str_data[i] == '\r') {
88 // newline, write a quoted string
89 write_quoted = true;
90 }
91 }
92 }
93
94 // check if value is null string
95 if (!write_quoted) {
96 if (new_val == null_str) {
97 write_quoted = true;
98 }
99 }
100
101 // check for delimiter
102 if (!write_quoted) {
103 if (new_val.find(delimiter) != string::npos) {
104 write_quoted = true;
105 }
106 }
107
108 // check for quote
109 if (new_val.find(quote) != string::npos) {
110 write_quoted = true;
111 add_escapes = true;
112 }
113
114 // check for escapes in quoted string
115 if (write_quoted && !add_escapes) {
116 if (new_val.find(escape) != string::npos) {
117 add_escapes = true;
118 }
119 }
120
121 if (add_escapes) {
122 new_val = AddEscapes(escape, escape, new_val);
123 // also escape quotes
124 if (escape != quote) {
125 new_val = AddEscapes(quote, escape, new_val);
126 }
127 }
128
129 if (!write_quoted) {
130 writer.Write(new_val);
131 } else {
132 writer.Write(quote);
133 writer.Write(new_val);
134 writer.Write(quote);
135 }
136}
137
138void PhysicalCopyToFile::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state) {
139 auto &info = *this->info;
140 idx_t total = 0;
141
142 string newline = "\n";
143 BufferedWriter writer(info.file_path);
144 if (info.header) {
145 // write the header line
146 for (idx_t i = 0; i < names.size(); i++) {
147 if (i != 0) {
148 writer.Write(info.delimiter);
149 }
150 WriteQuotedString(writer, names[i].c_str(), info.delimiter, info.quote, info.escape, info.null_str, false);
151 }
152 writer.Write(newline);
153 }
154 // create a chunk with VARCHAR columns
155 vector<TypeId> types;
156 for (idx_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) {
157 types.push_back(TypeId::VARCHAR);
158 }
159 DataChunk cast_chunk;
160 cast_chunk.Initialize(types);
161
162 while (true) {
163 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
164 if (state->child_chunk.size() == 0) {
165 break;
166 }
167 // cast the columns of the chunk to varchar
168 cast_chunk.SetCardinality(state->child_chunk);
169 for (idx_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) {
170 if (sql_types[col_idx].id == SQLTypeId::VARCHAR || sql_types[col_idx].id == SQLTypeId::BLOB) {
171 // VARCHAR, just create a reference
172 cast_chunk.data[col_idx].Reference(state->child_chunk.data[col_idx]);
173 } else {
174 // non varchar column, perform the cast
175 VectorOperations::Cast(state->child_chunk.data[col_idx], cast_chunk.data[col_idx], sql_types[col_idx],
176 SQLType::VARCHAR, cast_chunk.size());
177 }
178 }
179 cast_chunk.Normalify();
180 // now loop over the vectors and output the values
181 for (idx_t i = 0; i < cast_chunk.size(); i++) {
182 // write values
183 for (idx_t col_idx = 0; col_idx < state->child_chunk.column_count(); col_idx++) {
184 if (col_idx != 0) {
185 writer.Write(info.delimiter);
186 }
187 if (FlatVector::IsNull(cast_chunk.data[col_idx], i)) {
188 // write null value
189 writer.Write(info.null_str);
190 continue;
191 }
192
193 // non-null value, fetch the string value from the cast chunk
194 auto str_data = FlatVector::GetData<string_t>(cast_chunk.data[col_idx]);
195 auto str_value = str_data[i];
196 WriteQuotedString(writer, str_value, info.delimiter, info.quote, info.escape, info.null_str,
197 info.force_quote[col_idx]);
198 }
199 writer.Write(newline);
200 }
201 total += cast_chunk.size();
202 }
203 writer.Close();
204
205 chunk.SetCardinality(1);
206 chunk.SetValue(0, 0, Value::BIGINT(total));
207
208 state->finished = true;
209}
210