1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | #include "arrow/io/readahead.h" |
19 | |
20 | #include <condition_variable> |
21 | #include <cstring> |
22 | #include <deque> |
23 | #include <memory> |
24 | #include <mutex> |
25 | #include <thread> |
26 | #include <utility> |
27 | |
28 | #include "arrow/buffer.h" |
29 | #include "arrow/io/interfaces.h" |
30 | #include "arrow/memory_pool.h" |
31 | #include "arrow/status.h" |
32 | #include "arrow/util/logging.h" |
33 | #include "arrow/util/macros.h" |
34 | |
35 | namespace arrow { |
36 | namespace io { |
37 | namespace internal { |
38 | |
39 | // ---------------------------------------------------------------------- |
40 | // ReadaheadSpooler implementation |
41 | |
42 | class ReadaheadSpooler::Impl { |
43 | public: |
44 | Impl(MemoryPool* pool, std::shared_ptr<InputStream> raw, int64_t read_size, |
45 | int32_t readahead_queue_size, int64_t left_padding, int64_t right_padding) |
46 | : pool_(pool), |
47 | raw_(raw), |
48 | read_size_(read_size), |
49 | readahead_queue_size_(readahead_queue_size), |
50 | left_padding_(left_padding), |
51 | right_padding_(right_padding) { |
52 | DCHECK_NE(raw, nullptr); |
53 | DCHECK_GT(read_size, 0); |
54 | DCHECK_GT(readahead_queue_size, 0); |
55 | io_worker_ = std::thread([&]() { WorkerLoop(); }); |
56 | } |
57 | |
58 | ~Impl() { ARROW_UNUSED(Close()); } |
59 | |
60 | Status Close() { |
61 | std::unique_lock<std::mutex> lock(mutex_); |
62 | please_close_ = true; |
63 | io_wakeup_.notify_one(); |
64 | // Wait for IO thread to finish |
65 | if (io_worker_.joinable()) { |
66 | lock.unlock(); |
67 | io_worker_.join(); |
68 | lock.lock(); |
69 | } |
70 | return Status::OK(); |
71 | } |
72 | |
73 | Status Read(ReadaheadBuffer* out) { |
74 | std::unique_lock<std::mutex> lock(mutex_); |
75 | while (true) { |
76 | // Drain queue before querying other flags |
77 | if (buffer_queue_.size() > 0) { |
78 | *out = std::move(buffer_queue_.front()); |
79 | DCHECK_NE(out->buffer, nullptr); |
80 | buffer_queue_.pop_front(); |
81 | // Need to fill up queue again |
82 | io_wakeup_.notify_one(); |
83 | return Status::OK(); |
84 | } |
85 | if (!read_status_.ok()) { |
86 | // Got a read error, bail out |
87 | return read_status_; |
88 | } |
89 | if (eof_) { |
90 | out->buffer.reset(); |
91 | return Status::OK(); |
92 | } |
93 | // Readahead queue is empty and we're not closed yet, wait for more I/O |
94 | io_progress_.wait(lock); |
95 | } |
96 | } |
97 | |
98 | int64_t left_padding() { |
99 | std::unique_lock<std::mutex> lock(mutex_); |
100 | return left_padding_; |
101 | } |
102 | |
103 | void left_padding(int64_t size) { |
104 | std::unique_lock<std::mutex> lock(mutex_); |
105 | left_padding_ = size; |
106 | } |
107 | |
108 | int64_t right_padding() { |
109 | std::unique_lock<std::mutex> lock(mutex_); |
110 | return right_padding_; |
111 | } |
112 | |
113 | void right_padding(int64_t size) { |
114 | std::unique_lock<std::mutex> lock(mutex_); |
115 | right_padding_ = size; |
116 | } |
117 | |
118 | protected: |
119 | // The background thread's main function |
120 | void WorkerLoop() { |
121 | std::unique_lock<std::mutex> lock(mutex_); |
122 | Status st; |
123 | |
124 | while (true) { |
125 | if (please_close_) { |
126 | goto eof; |
127 | } |
128 | // Fill up readahead queue until desired size |
129 | while (buffer_queue_.size() < static_cast<size_t>(readahead_queue_size_)) { |
130 | ReadaheadBuffer buf = {nullptr, left_padding_, right_padding_}; |
131 | lock.unlock(); |
132 | Status st = ReadOneBufferUnlocked(&buf); |
133 | lock.lock(); |
134 | if (!st.ok()) { |
135 | read_status_ = st; |
136 | goto error; |
137 | } |
138 | // Close() could have been called while unlocked above |
139 | if (please_close_) { |
140 | goto eof; |
141 | } |
142 | // Got empty read? |
143 | if (buf.buffer->size() == buf.left_padding + buf.right_padding) { |
144 | goto eof; |
145 | } |
146 | buffer_queue_.push_back(std::move(buf)); |
147 | io_progress_.notify_one(); |
148 | } |
149 | // Wait for Close() or Read() call |
150 | io_wakeup_.wait(lock); |
151 | } |
152 | eof: |
153 | eof_ = true; |
154 | error: |
155 | // Make sure any pending Read() doesn't block indefinitely |
156 | io_progress_.notify_one(); |
157 | } |
158 | |
159 | Status ReadOneBufferUnlocked(ReadaheadBuffer* buf) { |
160 | // Note that left_padding_ and right_padding_ may be modified while unlocked |
161 | std::shared_ptr<ResizableBuffer> buffer; |
162 | int64_t bytes_read; |
163 | RETURN_NOT_OK(AllocateResizableBuffer( |
164 | pool_, read_size_ + buf->left_padding + buf->right_padding, &buffer)); |
165 | DCHECK_NE(buffer->mutable_data(), nullptr); |
166 | RETURN_NOT_OK( |
167 | raw_->Read(read_size_, &bytes_read, buffer->mutable_data() + buf->left_padding)); |
168 | if (bytes_read < read_size_) { |
169 | // Got a short read |
170 | RETURN_NOT_OK(buffer->Resize(bytes_read + buf->left_padding + buf->right_padding)); |
171 | DCHECK_NE(buffer->mutable_data(), nullptr); |
172 | } |
173 | // Zero padding areas |
174 | memset(buffer->mutable_data(), 0, buf->left_padding); |
175 | memset(buffer->mutable_data() + bytes_read + buf->left_padding, 0, |
176 | buf->right_padding); |
177 | buf->buffer = std::move(buffer); |
178 | return Status::OK(); |
179 | } |
180 | |
181 | MemoryPool* pool_; |
182 | std::shared_ptr<InputStream> raw_; |
183 | int64_t read_size_; |
184 | int32_t readahead_queue_size_; |
185 | int64_t left_padding_ = 0; |
186 | int64_t right_padding_ = 0; |
187 | |
188 | std::mutex mutex_; |
189 | std::condition_variable io_wakeup_; |
190 | std::condition_variable io_progress_; |
191 | std::thread io_worker_; |
192 | bool please_close_ = false; |
193 | bool eof_ = false; |
194 | std::deque<ReadaheadBuffer> buffer_queue_; |
195 | Status read_status_; |
196 | }; |
197 | |
198 | ReadaheadSpooler::ReadaheadSpooler(MemoryPool* pool, std::shared_ptr<InputStream> raw, |
199 | int64_t read_size, int32_t readahead_queue_size, |
200 | int64_t left_padding, int64_t right_padding) |
201 | : impl_(new ReadaheadSpooler::Impl(pool, raw, read_size, readahead_queue_size, |
202 | left_padding, right_padding)) {} |
203 | |
204 | ReadaheadSpooler::ReadaheadSpooler(std::shared_ptr<InputStream> raw, int64_t read_size, |
205 | int32_t readahead_queue_size, int64_t left_padding, |
206 | int64_t right_padding) |
207 | : ReadaheadSpooler(default_memory_pool(), raw, read_size, readahead_queue_size, |
208 | left_padding, right_padding) {} |
209 | |
210 | int64_t ReadaheadSpooler::GetLeftPadding() { return impl_->left_padding(); } |
211 | |
212 | void ReadaheadSpooler::SetLeftPadding(int64_t size) { impl_->left_padding(size); } |
213 | |
214 | int64_t ReadaheadSpooler::GetRightPadding() { return impl_->right_padding(); } |
215 | |
216 | void ReadaheadSpooler::SetRightPadding(int64_t size) { impl_->right_padding(size); } |
217 | |
218 | Status ReadaheadSpooler::Close() { return impl_->Close(); } |
219 | |
220 | Status ReadaheadSpooler::Read(ReadaheadBuffer* out) { return impl_->Read(out); } |
221 | |
222 | ReadaheadSpooler::~ReadaheadSpooler() {} |
223 | |
224 | } // namespace internal |
225 | } // namespace io |
226 | } // namespace arrow |
227 | |