1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include "arrow/io/readahead.h"
19
20#include <condition_variable>
21#include <cstring>
22#include <deque>
23#include <memory>
24#include <mutex>
25#include <thread>
26#include <utility>
27
28#include "arrow/buffer.h"
29#include "arrow/io/interfaces.h"
30#include "arrow/memory_pool.h"
31#include "arrow/status.h"
32#include "arrow/util/logging.h"
33#include "arrow/util/macros.h"
34
35namespace arrow {
36namespace io {
37namespace internal {
38
39// ----------------------------------------------------------------------
40// ReadaheadSpooler implementation
41
42class ReadaheadSpooler::Impl {
43 public:
44 Impl(MemoryPool* pool, std::shared_ptr<InputStream> raw, int64_t read_size,
45 int32_t readahead_queue_size, int64_t left_padding, int64_t right_padding)
46 : pool_(pool),
47 raw_(raw),
48 read_size_(read_size),
49 readahead_queue_size_(readahead_queue_size),
50 left_padding_(left_padding),
51 right_padding_(right_padding) {
52 DCHECK_NE(raw, nullptr);
53 DCHECK_GT(read_size, 0);
54 DCHECK_GT(readahead_queue_size, 0);
55 io_worker_ = std::thread([&]() { WorkerLoop(); });
56 }
57
58 ~Impl() { ARROW_UNUSED(Close()); }
59
60 Status Close() {
61 std::unique_lock<std::mutex> lock(mutex_);
62 please_close_ = true;
63 io_wakeup_.notify_one();
64 // Wait for IO thread to finish
65 if (io_worker_.joinable()) {
66 lock.unlock();
67 io_worker_.join();
68 lock.lock();
69 }
70 return Status::OK();
71 }
72
73 Status Read(ReadaheadBuffer* out) {
74 std::unique_lock<std::mutex> lock(mutex_);
75 while (true) {
76 // Drain queue before querying other flags
77 if (buffer_queue_.size() > 0) {
78 *out = std::move(buffer_queue_.front());
79 DCHECK_NE(out->buffer, nullptr);
80 buffer_queue_.pop_front();
81 // Need to fill up queue again
82 io_wakeup_.notify_one();
83 return Status::OK();
84 }
85 if (!read_status_.ok()) {
86 // Got a read error, bail out
87 return read_status_;
88 }
89 if (eof_) {
90 out->buffer.reset();
91 return Status::OK();
92 }
93 // Readahead queue is empty and we're not closed yet, wait for more I/O
94 io_progress_.wait(lock);
95 }
96 }
97
98 int64_t left_padding() {
99 std::unique_lock<std::mutex> lock(mutex_);
100 return left_padding_;
101 }
102
103 void left_padding(int64_t size) {
104 std::unique_lock<std::mutex> lock(mutex_);
105 left_padding_ = size;
106 }
107
108 int64_t right_padding() {
109 std::unique_lock<std::mutex> lock(mutex_);
110 return right_padding_;
111 }
112
113 void right_padding(int64_t size) {
114 std::unique_lock<std::mutex> lock(mutex_);
115 right_padding_ = size;
116 }
117
118 protected:
119 // The background thread's main function
120 void WorkerLoop() {
121 std::unique_lock<std::mutex> lock(mutex_);
122 Status st;
123
124 while (true) {
125 if (please_close_) {
126 goto eof;
127 }
128 // Fill up readahead queue until desired size
129 while (buffer_queue_.size() < static_cast<size_t>(readahead_queue_size_)) {
130 ReadaheadBuffer buf = {nullptr, left_padding_, right_padding_};
131 lock.unlock();
132 Status st = ReadOneBufferUnlocked(&buf);
133 lock.lock();
134 if (!st.ok()) {
135 read_status_ = st;
136 goto error;
137 }
138 // Close() could have been called while unlocked above
139 if (please_close_) {
140 goto eof;
141 }
142 // Got empty read?
143 if (buf.buffer->size() == buf.left_padding + buf.right_padding) {
144 goto eof;
145 }
146 buffer_queue_.push_back(std::move(buf));
147 io_progress_.notify_one();
148 }
149 // Wait for Close() or Read() call
150 io_wakeup_.wait(lock);
151 }
152 eof:
153 eof_ = true;
154 error:
155 // Make sure any pending Read() doesn't block indefinitely
156 io_progress_.notify_one();
157 }
158
159 Status ReadOneBufferUnlocked(ReadaheadBuffer* buf) {
160 // Note that left_padding_ and right_padding_ may be modified while unlocked
161 std::shared_ptr<ResizableBuffer> buffer;
162 int64_t bytes_read;
163 RETURN_NOT_OK(AllocateResizableBuffer(
164 pool_, read_size_ + buf->left_padding + buf->right_padding, &buffer));
165 DCHECK_NE(buffer->mutable_data(), nullptr);
166 RETURN_NOT_OK(
167 raw_->Read(read_size_, &bytes_read, buffer->mutable_data() + buf->left_padding));
168 if (bytes_read < read_size_) {
169 // Got a short read
170 RETURN_NOT_OK(buffer->Resize(bytes_read + buf->left_padding + buf->right_padding));
171 DCHECK_NE(buffer->mutable_data(), nullptr);
172 }
173 // Zero padding areas
174 memset(buffer->mutable_data(), 0, buf->left_padding);
175 memset(buffer->mutable_data() + bytes_read + buf->left_padding, 0,
176 buf->right_padding);
177 buf->buffer = std::move(buffer);
178 return Status::OK();
179 }
180
181 MemoryPool* pool_;
182 std::shared_ptr<InputStream> raw_;
183 int64_t read_size_;
184 int32_t readahead_queue_size_;
185 int64_t left_padding_ = 0;
186 int64_t right_padding_ = 0;
187
188 std::mutex mutex_;
189 std::condition_variable io_wakeup_;
190 std::condition_variable io_progress_;
191 std::thread io_worker_;
192 bool please_close_ = false;
193 bool eof_ = false;
194 std::deque<ReadaheadBuffer> buffer_queue_;
195 Status read_status_;
196};
197
198ReadaheadSpooler::ReadaheadSpooler(MemoryPool* pool, std::shared_ptr<InputStream> raw,
199 int64_t read_size, int32_t readahead_queue_size,
200 int64_t left_padding, int64_t right_padding)
201 : impl_(new ReadaheadSpooler::Impl(pool, raw, read_size, readahead_queue_size,
202 left_padding, right_padding)) {}
203
204ReadaheadSpooler::ReadaheadSpooler(std::shared_ptr<InputStream> raw, int64_t read_size,
205 int32_t readahead_queue_size, int64_t left_padding,
206 int64_t right_padding)
207 : ReadaheadSpooler(default_memory_pool(), raw, read_size, readahead_queue_size,
208 left_padding, right_padding) {}
209
210int64_t ReadaheadSpooler::GetLeftPadding() { return impl_->left_padding(); }
211
212void ReadaheadSpooler::SetLeftPadding(int64_t size) { impl_->left_padding(size); }
213
214int64_t ReadaheadSpooler::GetRightPadding() { return impl_->right_padding(); }
215
216void ReadaheadSpooler::SetRightPadding(int64_t size) { impl_->right_padding(size); }
217
218Status ReadaheadSpooler::Close() { return impl_->Close(); }
219
220Status ReadaheadSpooler::Read(ReadaheadBuffer* out) { return impl_->Read(out); }
221
222ReadaheadSpooler::~ReadaheadSpooler() {}
223
224} // namespace internal
225} // namespace io
226} // namespace arrow
227