1 | #include "duckdb/execution/operator/persistent/csv_file_handle.hpp" |
2 | |
3 | namespace duckdb { |
4 | |
5 | CSVFileHandle::CSVFileHandle(FileSystem &fs, Allocator &allocator, unique_ptr<FileHandle> file_handle_p, |
6 | const string &path_p, FileCompressionType compression, bool enable_reset) |
7 | : fs(fs), allocator(allocator), file_handle(std::move(file_handle_p)), path(path_p), compression(compression), |
8 | reset_enabled(enable_reset) { |
9 | can_seek = file_handle->CanSeek(); |
10 | on_disk_file = file_handle->OnDiskFile(); |
11 | file_size = file_handle->GetFileSize(); |
12 | } |
13 | |
14 | unique_ptr<FileHandle> CSVFileHandle::OpenFileHandle(FileSystem &fs, Allocator &allocator, const string &path, |
15 | FileCompressionType compression) { |
16 | auto file_handle = fs.OpenFile(path: path.c_str(), flags: FileFlags::FILE_FLAGS_READ, lock: FileLockType::NO_LOCK, compression); |
17 | if (file_handle->CanSeek()) { |
18 | file_handle->Reset(); |
19 | } |
20 | return file_handle; |
21 | } |
22 | |
23 | unique_ptr<CSVFileHandle> CSVFileHandle::OpenFile(FileSystem &fs, Allocator &allocator, const string &path, |
24 | FileCompressionType compression, bool enable_reset) { |
25 | auto file_handle = CSVFileHandle::OpenFileHandle(fs, allocator, path, compression); |
26 | return make_uniq<CSVFileHandle>(args&: fs, args&: allocator, args: std::move(file_handle), args: path, args&: compression, args&: enable_reset); |
27 | } |
28 | |
29 | bool CSVFileHandle::CanSeek() { |
30 | return can_seek; |
31 | } |
32 | |
33 | void CSVFileHandle::Seek(idx_t position) { |
34 | if (!can_seek) { |
35 | throw InternalException("Cannot seek in this file" ); |
36 | } |
37 | file_handle->Seek(location: position); |
38 | } |
39 | |
40 | idx_t CSVFileHandle::SeekPosition() { |
41 | if (!can_seek) { |
42 | throw InternalException("Cannot seek in this file" ); |
43 | } |
44 | return file_handle->SeekPosition(); |
45 | } |
46 | |
47 | void CSVFileHandle::Reset() { |
48 | requested_bytes = 0; |
49 | read_position = 0; |
50 | if (can_seek) { |
51 | // we can seek - reset the file handle |
52 | file_handle->Reset(); |
53 | } else if (on_disk_file) { |
54 | // we cannot seek but it is an on-disk file - re-open the file |
55 | file_handle = CSVFileHandle::OpenFileHandle(fs, allocator, path, compression); |
56 | } else { |
57 | if (!reset_enabled) { |
58 | throw InternalException("Reset called but reset is not enabled for this CSV Handle" ); |
59 | } |
60 | read_position = 0; |
61 | } |
62 | } |
63 | bool CSVFileHandle::OnDiskFile() { |
64 | return on_disk_file; |
65 | } |
66 | |
67 | idx_t CSVFileHandle::FileSize() { |
68 | return file_size; |
69 | } |
70 | |
71 | bool CSVFileHandle::FinishedReading() { |
72 | return requested_bytes >= file_size; |
73 | } |
74 | |
75 | idx_t CSVFileHandle::Read(void *buffer, idx_t nr_bytes) { |
76 | requested_bytes += nr_bytes; |
77 | if (on_disk_file || can_seek) { |
78 | // if this is a plain file source OR we can seek we are not caching anything |
79 | return file_handle->Read(buffer, nr_bytes); |
80 | } |
81 | // not a plain file source: we need to do some bookkeeping around the reset functionality |
82 | idx_t result_offset = 0; |
83 | if (read_position < buffer_size) { |
84 | // we need to read from our cached buffer |
85 | auto buffer_read_count = MinValue<idx_t>(a: nr_bytes, b: buffer_size - read_position); |
86 | memcpy(dest: buffer, src: cached_buffer.get() + read_position, n: buffer_read_count); |
87 | result_offset += buffer_read_count; |
88 | read_position += buffer_read_count; |
89 | if (result_offset == nr_bytes) { |
90 | return nr_bytes; |
91 | } |
92 | } else if (!reset_enabled && cached_buffer.IsSet()) { |
93 | // reset is disabled, but we still have cached data |
94 | // we can remove any cached data |
95 | cached_buffer.Reset(); |
96 | buffer_size = 0; |
97 | buffer_capacity = 0; |
98 | read_position = 0; |
99 | } |
100 | // we have data left to read from the file |
101 | // read directly into the buffer |
102 | auto bytes_read = file_handle->Read(buffer: char_ptr_cast(src: buffer) + result_offset, nr_bytes: nr_bytes - result_offset); |
103 | file_size = file_handle->GetFileSize(); |
104 | read_position += bytes_read; |
105 | if (reset_enabled) { |
106 | // if reset caching is enabled, we need to cache the bytes that we have read |
107 | if (buffer_size + bytes_read >= buffer_capacity) { |
108 | // no space; first enlarge the buffer |
109 | buffer_capacity = MaxValue<idx_t>(a: NextPowerOfTwo(v: buffer_size + bytes_read), b: buffer_capacity * 2); |
110 | |
111 | auto new_buffer = allocator.Allocate(size: buffer_capacity); |
112 | if (buffer_size > 0) { |
113 | memcpy(dest: new_buffer.get(), src: cached_buffer.get(), n: buffer_size); |
114 | } |
115 | cached_buffer = std::move(new_buffer); |
116 | } |
117 | memcpy(dest: cached_buffer.get() + buffer_size, src: char_ptr_cast(src: buffer) + result_offset, n: bytes_read); |
118 | buffer_size += bytes_read; |
119 | } |
120 | |
121 | return result_offset + bytes_read; |
122 | } |
123 | |
124 | string CSVFileHandle::ReadLine() { |
125 | bool carriage_return = false; |
126 | string result; |
127 | char buffer[1]; |
128 | while (true) { |
129 | idx_t bytes_read = Read(buffer, nr_bytes: 1); |
130 | if (bytes_read == 0) { |
131 | return result; |
132 | } |
133 | if (carriage_return) { |
134 | if (buffer[0] != '\n') { |
135 | if (!file_handle->CanSeek()) { |
136 | throw BinderException( |
137 | "Carriage return newlines not supported when reading CSV files in which we cannot seek" ); |
138 | } |
139 | file_handle->Seek(location: file_handle->SeekPosition() - 1); |
140 | return result; |
141 | } |
142 | } |
143 | if (buffer[0] == '\n') { |
144 | return result; |
145 | } |
146 | if (buffer[0] != '\r') { |
147 | result += buffer[0]; |
148 | } else { |
149 | carriage_return = true; |
150 | } |
151 | } |
152 | } |
153 | |
154 | void CSVFileHandle::DisableReset() { |
155 | this->reset_enabled = false; |
156 | } |
157 | |
158 | } // namespace duckdb |
159 | |