1#include "duckdb/execution/operator/persistent/csv_file_handle.hpp"
2
3namespace duckdb {
4
5CSVFileHandle::CSVFileHandle(FileSystem &fs, Allocator &allocator, unique_ptr<FileHandle> file_handle_p,
6 const string &path_p, FileCompressionType compression, bool enable_reset)
7 : fs(fs), allocator(allocator), file_handle(std::move(file_handle_p)), path(path_p), compression(compression),
8 reset_enabled(enable_reset) {
9 can_seek = file_handle->CanSeek();
10 on_disk_file = file_handle->OnDiskFile();
11 file_size = file_handle->GetFileSize();
12}
13
14unique_ptr<FileHandle> CSVFileHandle::OpenFileHandle(FileSystem &fs, Allocator &allocator, const string &path,
15 FileCompressionType compression) {
16 auto file_handle = fs.OpenFile(path: path.c_str(), flags: FileFlags::FILE_FLAGS_READ, lock: FileLockType::NO_LOCK, compression);
17 if (file_handle->CanSeek()) {
18 file_handle->Reset();
19 }
20 return file_handle;
21}
22
23unique_ptr<CSVFileHandle> CSVFileHandle::OpenFile(FileSystem &fs, Allocator &allocator, const string &path,
24 FileCompressionType compression, bool enable_reset) {
25 auto file_handle = CSVFileHandle::OpenFileHandle(fs, allocator, path, compression);
26 return make_uniq<CSVFileHandle>(args&: fs, args&: allocator, args: std::move(file_handle), args: path, args&: compression, args&: enable_reset);
27}
28
29bool CSVFileHandle::CanSeek() {
30 return can_seek;
31}
32
33void CSVFileHandle::Seek(idx_t position) {
34 if (!can_seek) {
35 throw InternalException("Cannot seek in this file");
36 }
37 file_handle->Seek(location: position);
38}
39
40idx_t CSVFileHandle::SeekPosition() {
41 if (!can_seek) {
42 throw InternalException("Cannot seek in this file");
43 }
44 return file_handle->SeekPosition();
45}
46
47void CSVFileHandle::Reset() {
48 requested_bytes = 0;
49 read_position = 0;
50 if (can_seek) {
51 // we can seek - reset the file handle
52 file_handle->Reset();
53 } else if (on_disk_file) {
54 // we cannot seek but it is an on-disk file - re-open the file
55 file_handle = CSVFileHandle::OpenFileHandle(fs, allocator, path, compression);
56 } else {
57 if (!reset_enabled) {
58 throw InternalException("Reset called but reset is not enabled for this CSV Handle");
59 }
60 read_position = 0;
61 }
62}
63bool CSVFileHandle::OnDiskFile() {
64 return on_disk_file;
65}
66
67idx_t CSVFileHandle::FileSize() {
68 return file_size;
69}
70
71bool CSVFileHandle::FinishedReading() {
72 return requested_bytes >= file_size;
73}
74
75idx_t CSVFileHandle::Read(void *buffer, idx_t nr_bytes) {
76 requested_bytes += nr_bytes;
77 if (on_disk_file || can_seek) {
78 // if this is a plain file source OR we can seek we are not caching anything
79 return file_handle->Read(buffer, nr_bytes);
80 }
81 // not a plain file source: we need to do some bookkeeping around the reset functionality
82 idx_t result_offset = 0;
83 if (read_position < buffer_size) {
84 // we need to read from our cached buffer
85 auto buffer_read_count = MinValue<idx_t>(a: nr_bytes, b: buffer_size - read_position);
86 memcpy(dest: buffer, src: cached_buffer.get() + read_position, n: buffer_read_count);
87 result_offset += buffer_read_count;
88 read_position += buffer_read_count;
89 if (result_offset == nr_bytes) {
90 return nr_bytes;
91 }
92 } else if (!reset_enabled && cached_buffer.IsSet()) {
93 // reset is disabled, but we still have cached data
94 // we can remove any cached data
95 cached_buffer.Reset();
96 buffer_size = 0;
97 buffer_capacity = 0;
98 read_position = 0;
99 }
100 // we have data left to read from the file
101 // read directly into the buffer
102 auto bytes_read = file_handle->Read(buffer: char_ptr_cast(src: buffer) + result_offset, nr_bytes: nr_bytes - result_offset);
103 file_size = file_handle->GetFileSize();
104 read_position += bytes_read;
105 if (reset_enabled) {
106 // if reset caching is enabled, we need to cache the bytes that we have read
107 if (buffer_size + bytes_read >= buffer_capacity) {
108 // no space; first enlarge the buffer
109 buffer_capacity = MaxValue<idx_t>(a: NextPowerOfTwo(v: buffer_size + bytes_read), b: buffer_capacity * 2);
110
111 auto new_buffer = allocator.Allocate(size: buffer_capacity);
112 if (buffer_size > 0) {
113 memcpy(dest: new_buffer.get(), src: cached_buffer.get(), n: buffer_size);
114 }
115 cached_buffer = std::move(new_buffer);
116 }
117 memcpy(dest: cached_buffer.get() + buffer_size, src: char_ptr_cast(src: buffer) + result_offset, n: bytes_read);
118 buffer_size += bytes_read;
119 }
120
121 return result_offset + bytes_read;
122}
123
124string CSVFileHandle::ReadLine() {
125 bool carriage_return = false;
126 string result;
127 char buffer[1];
128 while (true) {
129 idx_t bytes_read = Read(buffer, nr_bytes: 1);
130 if (bytes_read == 0) {
131 return result;
132 }
133 if (carriage_return) {
134 if (buffer[0] != '\n') {
135 if (!file_handle->CanSeek()) {
136 throw BinderException(
137 "Carriage return newlines not supported when reading CSV files in which we cannot seek");
138 }
139 file_handle->Seek(location: file_handle->SeekPosition() - 1);
140 return result;
141 }
142 }
143 if (buffer[0] == '\n') {
144 return result;
145 }
146 if (buffer[0] != '\r') {
147 result += buffer[0];
148 } else {
149 carriage_return = true;
150 }
151 }
152}
153
154void CSVFileHandle::DisableReset() {
155 this->reset_enabled = false;
156}
157
158} // namespace duckdb
159