| 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | #include "arrow/io/readahead.h" |
| 19 | |
| 20 | #include <condition_variable> |
| 21 | #include <cstring> |
| 22 | #include <deque> |
| 23 | #include <memory> |
| 24 | #include <mutex> |
| 25 | #include <thread> |
| 26 | #include <utility> |
| 27 | |
| 28 | #include "arrow/buffer.h" |
| 29 | #include "arrow/io/interfaces.h" |
| 30 | #include "arrow/memory_pool.h" |
| 31 | #include "arrow/status.h" |
| 32 | #include "arrow/util/logging.h" |
| 33 | #include "arrow/util/macros.h" |
| 34 | |
| 35 | namespace arrow { |
| 36 | namespace io { |
| 37 | namespace internal { |
| 38 | |
| 39 | // ---------------------------------------------------------------------- |
| 40 | // ReadaheadSpooler implementation |
| 41 | |
| 42 | class ReadaheadSpooler::Impl { |
| 43 | public: |
| 44 | Impl(MemoryPool* pool, std::shared_ptr<InputStream> raw, int64_t read_size, |
| 45 | int32_t readahead_queue_size, int64_t left_padding, int64_t right_padding) |
| 46 | : pool_(pool), |
| 47 | raw_(raw), |
| 48 | read_size_(read_size), |
| 49 | readahead_queue_size_(readahead_queue_size), |
| 50 | left_padding_(left_padding), |
| 51 | right_padding_(right_padding) { |
| 52 | DCHECK_NE(raw, nullptr); |
| 53 | DCHECK_GT(read_size, 0); |
| 54 | DCHECK_GT(readahead_queue_size, 0); |
| 55 | io_worker_ = std::thread([&]() { WorkerLoop(); }); |
| 56 | } |
| 57 | |
| 58 | ~Impl() { ARROW_UNUSED(Close()); } |
| 59 | |
| 60 | Status Close() { |
| 61 | std::unique_lock<std::mutex> lock(mutex_); |
| 62 | please_close_ = true; |
| 63 | io_wakeup_.notify_one(); |
| 64 | // Wait for IO thread to finish |
| 65 | if (io_worker_.joinable()) { |
| 66 | lock.unlock(); |
| 67 | io_worker_.join(); |
| 68 | lock.lock(); |
| 69 | } |
| 70 | return Status::OK(); |
| 71 | } |
| 72 | |
| 73 | Status Read(ReadaheadBuffer* out) { |
| 74 | std::unique_lock<std::mutex> lock(mutex_); |
| 75 | while (true) { |
| 76 | // Drain queue before querying other flags |
| 77 | if (buffer_queue_.size() > 0) { |
| 78 | *out = std::move(buffer_queue_.front()); |
| 79 | DCHECK_NE(out->buffer, nullptr); |
| 80 | buffer_queue_.pop_front(); |
| 81 | // Need to fill up queue again |
| 82 | io_wakeup_.notify_one(); |
| 83 | return Status::OK(); |
| 84 | } |
| 85 | if (!read_status_.ok()) { |
| 86 | // Got a read error, bail out |
| 87 | return read_status_; |
| 88 | } |
| 89 | if (eof_) { |
| 90 | out->buffer.reset(); |
| 91 | return Status::OK(); |
| 92 | } |
| 93 | // Readahead queue is empty and we're not closed yet, wait for more I/O |
| 94 | io_progress_.wait(lock); |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | int64_t left_padding() { |
| 99 | std::unique_lock<std::mutex> lock(mutex_); |
| 100 | return left_padding_; |
| 101 | } |
| 102 | |
| 103 | void left_padding(int64_t size) { |
| 104 | std::unique_lock<std::mutex> lock(mutex_); |
| 105 | left_padding_ = size; |
| 106 | } |
| 107 | |
| 108 | int64_t right_padding() { |
| 109 | std::unique_lock<std::mutex> lock(mutex_); |
| 110 | return right_padding_; |
| 111 | } |
| 112 | |
| 113 | void right_padding(int64_t size) { |
| 114 | std::unique_lock<std::mutex> lock(mutex_); |
| 115 | right_padding_ = size; |
| 116 | } |
| 117 | |
| 118 | protected: |
| 119 | // The background thread's main function |
| 120 | void WorkerLoop() { |
| 121 | std::unique_lock<std::mutex> lock(mutex_); |
| 122 | Status st; |
| 123 | |
| 124 | while (true) { |
| 125 | if (please_close_) { |
| 126 | goto eof; |
| 127 | } |
| 128 | // Fill up readahead queue until desired size |
| 129 | while (buffer_queue_.size() < static_cast<size_t>(readahead_queue_size_)) { |
| 130 | ReadaheadBuffer buf = {nullptr, left_padding_, right_padding_}; |
| 131 | lock.unlock(); |
| 132 | Status st = ReadOneBufferUnlocked(&buf); |
| 133 | lock.lock(); |
| 134 | if (!st.ok()) { |
| 135 | read_status_ = st; |
| 136 | goto error; |
| 137 | } |
| 138 | // Close() could have been called while unlocked above |
| 139 | if (please_close_) { |
| 140 | goto eof; |
| 141 | } |
| 142 | // Got empty read? |
| 143 | if (buf.buffer->size() == buf.left_padding + buf.right_padding) { |
| 144 | goto eof; |
| 145 | } |
| 146 | buffer_queue_.push_back(std::move(buf)); |
| 147 | io_progress_.notify_one(); |
| 148 | } |
| 149 | // Wait for Close() or Read() call |
| 150 | io_wakeup_.wait(lock); |
| 151 | } |
| 152 | eof: |
| 153 | eof_ = true; |
| 154 | error: |
| 155 | // Make sure any pending Read() doesn't block indefinitely |
| 156 | io_progress_.notify_one(); |
| 157 | } |
| 158 | |
| 159 | Status ReadOneBufferUnlocked(ReadaheadBuffer* buf) { |
| 160 | // Note that left_padding_ and right_padding_ may be modified while unlocked |
| 161 | std::shared_ptr<ResizableBuffer> buffer; |
| 162 | int64_t bytes_read; |
| 163 | RETURN_NOT_OK(AllocateResizableBuffer( |
| 164 | pool_, read_size_ + buf->left_padding + buf->right_padding, &buffer)); |
| 165 | DCHECK_NE(buffer->mutable_data(), nullptr); |
| 166 | RETURN_NOT_OK( |
| 167 | raw_->Read(read_size_, &bytes_read, buffer->mutable_data() + buf->left_padding)); |
| 168 | if (bytes_read < read_size_) { |
| 169 | // Got a short read |
| 170 | RETURN_NOT_OK(buffer->Resize(bytes_read + buf->left_padding + buf->right_padding)); |
| 171 | DCHECK_NE(buffer->mutable_data(), nullptr); |
| 172 | } |
| 173 | // Zero padding areas |
| 174 | memset(buffer->mutable_data(), 0, buf->left_padding); |
| 175 | memset(buffer->mutable_data() + bytes_read + buf->left_padding, 0, |
| 176 | buf->right_padding); |
| 177 | buf->buffer = std::move(buffer); |
| 178 | return Status::OK(); |
| 179 | } |
| 180 | |
| 181 | MemoryPool* pool_; |
| 182 | std::shared_ptr<InputStream> raw_; |
| 183 | int64_t read_size_; |
| 184 | int32_t readahead_queue_size_; |
| 185 | int64_t left_padding_ = 0; |
| 186 | int64_t right_padding_ = 0; |
| 187 | |
| 188 | std::mutex mutex_; |
| 189 | std::condition_variable io_wakeup_; |
| 190 | std::condition_variable io_progress_; |
| 191 | std::thread io_worker_; |
| 192 | bool please_close_ = false; |
| 193 | bool eof_ = false; |
| 194 | std::deque<ReadaheadBuffer> buffer_queue_; |
| 195 | Status read_status_; |
| 196 | }; |
| 197 | |
| 198 | ReadaheadSpooler::ReadaheadSpooler(MemoryPool* pool, std::shared_ptr<InputStream> raw, |
| 199 | int64_t read_size, int32_t readahead_queue_size, |
| 200 | int64_t left_padding, int64_t right_padding) |
| 201 | : impl_(new ReadaheadSpooler::Impl(pool, raw, read_size, readahead_queue_size, |
| 202 | left_padding, right_padding)) {} |
| 203 | |
| 204 | ReadaheadSpooler::ReadaheadSpooler(std::shared_ptr<InputStream> raw, int64_t read_size, |
| 205 | int32_t readahead_queue_size, int64_t left_padding, |
| 206 | int64_t right_padding) |
| 207 | : ReadaheadSpooler(default_memory_pool(), raw, read_size, readahead_queue_size, |
| 208 | left_padding, right_padding) {} |
| 209 | |
| 210 | int64_t ReadaheadSpooler::GetLeftPadding() { return impl_->left_padding(); } |
| 211 | |
| 212 | void ReadaheadSpooler::SetLeftPadding(int64_t size) { impl_->left_padding(size); } |
| 213 | |
| 214 | int64_t ReadaheadSpooler::GetRightPadding() { return impl_->right_padding(); } |
| 215 | |
| 216 | void ReadaheadSpooler::SetRightPadding(int64_t size) { impl_->right_padding(size); } |
| 217 | |
| 218 | Status ReadaheadSpooler::Close() { return impl_->Close(); } |
| 219 | |
| 220 | Status ReadaheadSpooler::Read(ReadaheadBuffer* out) { return impl_->Read(out); } |
| 221 | |
| 222 | ReadaheadSpooler::~ReadaheadSpooler() {} |
| 223 | |
| 224 | } // namespace internal |
| 225 | } // namespace io |
| 226 | } // namespace arrow |
| 227 | |