| 1 | #include "duckdb/execution/operator/persistent/csv_file_handle.hpp" |
| 2 | |
| 3 | namespace duckdb { |
| 4 | |
| 5 | CSVFileHandle::CSVFileHandle(FileSystem &fs, Allocator &allocator, unique_ptr<FileHandle> file_handle_p, |
| 6 | const string &path_p, FileCompressionType compression, bool enable_reset) |
| 7 | : fs(fs), allocator(allocator), file_handle(std::move(file_handle_p)), path(path_p), compression(compression), |
| 8 | reset_enabled(enable_reset) { |
| 9 | can_seek = file_handle->CanSeek(); |
| 10 | on_disk_file = file_handle->OnDiskFile(); |
| 11 | file_size = file_handle->GetFileSize(); |
| 12 | } |
| 13 | |
| 14 | unique_ptr<FileHandle> CSVFileHandle::OpenFileHandle(FileSystem &fs, Allocator &allocator, const string &path, |
| 15 | FileCompressionType compression) { |
| 16 | auto file_handle = fs.OpenFile(path: path.c_str(), flags: FileFlags::FILE_FLAGS_READ, lock: FileLockType::NO_LOCK, compression); |
| 17 | if (file_handle->CanSeek()) { |
| 18 | file_handle->Reset(); |
| 19 | } |
| 20 | return file_handle; |
| 21 | } |
| 22 | |
| 23 | unique_ptr<CSVFileHandle> CSVFileHandle::OpenFile(FileSystem &fs, Allocator &allocator, const string &path, |
| 24 | FileCompressionType compression, bool enable_reset) { |
| 25 | auto file_handle = CSVFileHandle::OpenFileHandle(fs, allocator, path, compression); |
| 26 | return make_uniq<CSVFileHandle>(args&: fs, args&: allocator, args: std::move(file_handle), args: path, args&: compression, args&: enable_reset); |
| 27 | } |
| 28 | |
| 29 | bool CSVFileHandle::CanSeek() { |
| 30 | return can_seek; |
| 31 | } |
| 32 | |
| 33 | void CSVFileHandle::Seek(idx_t position) { |
| 34 | if (!can_seek) { |
| 35 | throw InternalException("Cannot seek in this file" ); |
| 36 | } |
| 37 | file_handle->Seek(location: position); |
| 38 | } |
| 39 | |
| 40 | idx_t CSVFileHandle::SeekPosition() { |
| 41 | if (!can_seek) { |
| 42 | throw InternalException("Cannot seek in this file" ); |
| 43 | } |
| 44 | return file_handle->SeekPosition(); |
| 45 | } |
| 46 | |
| 47 | void CSVFileHandle::Reset() { |
| 48 | requested_bytes = 0; |
| 49 | read_position = 0; |
| 50 | if (can_seek) { |
| 51 | // we can seek - reset the file handle |
| 52 | file_handle->Reset(); |
| 53 | } else if (on_disk_file) { |
| 54 | // we cannot seek but it is an on-disk file - re-open the file |
| 55 | file_handle = CSVFileHandle::OpenFileHandle(fs, allocator, path, compression); |
| 56 | } else { |
| 57 | if (!reset_enabled) { |
| 58 | throw InternalException("Reset called but reset is not enabled for this CSV Handle" ); |
| 59 | } |
| 60 | read_position = 0; |
| 61 | } |
| 62 | } |
| 63 | bool CSVFileHandle::OnDiskFile() { |
| 64 | return on_disk_file; |
| 65 | } |
| 66 | |
| 67 | idx_t CSVFileHandle::FileSize() { |
| 68 | return file_size; |
| 69 | } |
| 70 | |
| 71 | bool CSVFileHandle::FinishedReading() { |
| 72 | return requested_bytes >= file_size; |
| 73 | } |
| 74 | |
| 75 | idx_t CSVFileHandle::Read(void *buffer, idx_t nr_bytes) { |
| 76 | requested_bytes += nr_bytes; |
| 77 | if (on_disk_file || can_seek) { |
| 78 | // if this is a plain file source OR we can seek we are not caching anything |
| 79 | return file_handle->Read(buffer, nr_bytes); |
| 80 | } |
| 81 | // not a plain file source: we need to do some bookkeeping around the reset functionality |
| 82 | idx_t result_offset = 0; |
| 83 | if (read_position < buffer_size) { |
| 84 | // we need to read from our cached buffer |
| 85 | auto buffer_read_count = MinValue<idx_t>(a: nr_bytes, b: buffer_size - read_position); |
| 86 | memcpy(dest: buffer, src: cached_buffer.get() + read_position, n: buffer_read_count); |
| 87 | result_offset += buffer_read_count; |
| 88 | read_position += buffer_read_count; |
| 89 | if (result_offset == nr_bytes) { |
| 90 | return nr_bytes; |
| 91 | } |
| 92 | } else if (!reset_enabled && cached_buffer.IsSet()) { |
| 93 | // reset is disabled, but we still have cached data |
| 94 | // we can remove any cached data |
| 95 | cached_buffer.Reset(); |
| 96 | buffer_size = 0; |
| 97 | buffer_capacity = 0; |
| 98 | read_position = 0; |
| 99 | } |
| 100 | // we have data left to read from the file |
| 101 | // read directly into the buffer |
| 102 | auto bytes_read = file_handle->Read(buffer: char_ptr_cast(src: buffer) + result_offset, nr_bytes: nr_bytes - result_offset); |
| 103 | file_size = file_handle->GetFileSize(); |
| 104 | read_position += bytes_read; |
| 105 | if (reset_enabled) { |
| 106 | // if reset caching is enabled, we need to cache the bytes that we have read |
| 107 | if (buffer_size + bytes_read >= buffer_capacity) { |
| 108 | // no space; first enlarge the buffer |
| 109 | buffer_capacity = MaxValue<idx_t>(a: NextPowerOfTwo(v: buffer_size + bytes_read), b: buffer_capacity * 2); |
| 110 | |
| 111 | auto new_buffer = allocator.Allocate(size: buffer_capacity); |
| 112 | if (buffer_size > 0) { |
| 113 | memcpy(dest: new_buffer.get(), src: cached_buffer.get(), n: buffer_size); |
| 114 | } |
| 115 | cached_buffer = std::move(new_buffer); |
| 116 | } |
| 117 | memcpy(dest: cached_buffer.get() + buffer_size, src: char_ptr_cast(src: buffer) + result_offset, n: bytes_read); |
| 118 | buffer_size += bytes_read; |
| 119 | } |
| 120 | |
| 121 | return result_offset + bytes_read; |
| 122 | } |
| 123 | |
| 124 | string CSVFileHandle::ReadLine() { |
| 125 | bool carriage_return = false; |
| 126 | string result; |
| 127 | char buffer[1]; |
| 128 | while (true) { |
| 129 | idx_t bytes_read = Read(buffer, nr_bytes: 1); |
| 130 | if (bytes_read == 0) { |
| 131 | return result; |
| 132 | } |
| 133 | if (carriage_return) { |
| 134 | if (buffer[0] != '\n') { |
| 135 | if (!file_handle->CanSeek()) { |
| 136 | throw BinderException( |
| 137 | "Carriage return newlines not supported when reading CSV files in which we cannot seek" ); |
| 138 | } |
| 139 | file_handle->Seek(location: file_handle->SeekPosition() - 1); |
| 140 | return result; |
| 141 | } |
| 142 | } |
| 143 | if (buffer[0] == '\n') { |
| 144 | return result; |
| 145 | } |
| 146 | if (buffer[0] != '\r') { |
| 147 | result += buffer[0]; |
| 148 | } else { |
| 149 | carriage_return = true; |
| 150 | } |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | void CSVFileHandle::DisableReset() { |
| 155 | this->reset_enabled = false; |
| 156 | } |
| 157 | |
| 158 | } // namespace duckdb |
| 159 | |