1// Protocol Buffers - Google's data interchange format
2// Copyright 2008 Google Inc. All rights reserved.
3// https://developers.google.com/protocol-buffers/
4//
5// Redistribution and use in source and binary forms, with or without
6// modification, are permitted provided that the following conditions are
7// met:
8//
9// * Redistributions of source code must retain the above copyright
10// notice, this list of conditions and the following disclaimer.
11// * Redistributions in binary form must reproduce the above
12// copyright notice, this list of conditions and the following disclaimer
13// in the documentation and/or other materials provided with the
14// distribution.
15// * Neither the name of Google Inc. nor the names of its
16// contributors may be used to endorse or promote products derived from
17// this software without specific prior written permission.
18//
19// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
31// Author: kenton@google.com (Kenton Varda)
32// Based on original Protocol Buffers design by
33// Sanjay Ghemawat, Jeff Dean, and others.
34
35#ifndef _MSC_VER
36#include <fcntl.h>
37#include <sys/stat.h>
38#include <sys/types.h>
39#include <unistd.h>
40#endif
41#include <errno.h>
42
43#include <algorithm>
44#include <iostream>
45
46#include <google/protobuf/stubs/common.h>
47#include <google/protobuf/stubs/logging.h>
48#include <google/protobuf/io/io_win32.h>
49#include <google/protobuf/io/zero_copy_stream_impl.h>
50#include <google/protobuf/stubs/stl_util.h>
51
52
53namespace google {
54namespace protobuf {
55namespace io {
56
57#ifdef _WIN32
58// Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
59// return value is undefined. We re-define it to always produce an error.
60#define lseek(fd, offset, origin) ((off_t)-1)
61// DO NOT include <io.h>, instead create functions in io_win32.{h,cc} and import
62// them like we do below.
63using google::protobuf::io::win32::access;
64using google::protobuf::io::win32::close;
65using google::protobuf::io::win32::open;
66using google::protobuf::io::win32::read;
67using google::protobuf::io::win32::write;
68#endif
69
70namespace {
71
72// EINTR sucks.
73int close_no_eintr(int fd) {
74 int result;
75 do {
76 result = close(fd: fd);
77 } while (result < 0 && errno == EINTR);
78 return result;
79}
80
81} // namespace
82
83// ===================================================================
84
85FileInputStream::FileInputStream(int file_descriptor, int block_size)
86 : copying_input_(file_descriptor), impl_(&copying_input_, block_size) {}
87
88bool FileInputStream::Close() { return copying_input_.Close(); }
89
90bool FileInputStream::Next(const void** data, int* size) {
91 return impl_.Next(data, size);
92}
93
94void FileInputStream::BackUp(int count) { impl_.BackUp(count); }
95
96bool FileInputStream::Skip(int count) { return impl_.Skip(count); }
97
98int64_t FileInputStream::ByteCount() const { return impl_.ByteCount(); }
99
100FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
101 int file_descriptor)
102 : file_(file_descriptor),
103 close_on_delete_(false),
104 is_closed_(false),
105 errno_(0),
106 previous_seek_failed_(false) {
107#ifndef _WIN32
108 int flags = fcntl(fd: file_, F_GETFL);
109 flags &= ~O_NONBLOCK;
110 fcntl(fd: file_, F_SETFL, flags);
111#endif
112}
113
114FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
115 if (close_on_delete_) {
116 if (!Close()) {
117 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errnum: errno_);
118 }
119 }
120}
121
122bool FileInputStream::CopyingFileInputStream::Close() {
123 GOOGLE_CHECK(!is_closed_);
124
125 is_closed_ = true;
126 if (close_no_eintr(fd: file_) != 0) {
127 // The docs on close() do not specify whether a file descriptor is still
128 // open after close() fails with EIO. However, the glibc source code
129 // seems to indicate that it is not.
130 errno_ = errno;
131 return false;
132 }
133
134 return true;
135}
136
137int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
138 GOOGLE_CHECK(!is_closed_);
139
140 int result;
141 do {
142 result = read(fd: file_, buf: buffer, nbytes: size);
143 } while (result < 0 && errno == EINTR);
144
145 if (result < 0) {
146 // Read error (not EOF).
147 errno_ = errno;
148 }
149
150 return result;
151}
152
153int FileInputStream::CopyingFileInputStream::Skip(int count) {
154 GOOGLE_CHECK(!is_closed_);
155
156 if (!previous_seek_failed_ && lseek(fd: file_, offset: count, SEEK_CUR) != (off_t)-1) {
157 // Seek succeeded.
158 return count;
159 } else {
160 // Failed to seek.
161
162 // Note to self: Don't seek again. This file descriptor doesn't
163 // support it.
164 previous_seek_failed_ = true;
165
166 // Use the default implementation.
167 return CopyingInputStream::Skip(count);
168 }
169}
170
171// ===================================================================
172
173FileOutputStream::FileOutputStream(int file_descriptor, int /*block_size*/)
174 : CopyingOutputStreamAdaptor(&copying_output_),
175 copying_output_(file_descriptor) {}
176
177bool FileOutputStream::Close() {
178 bool flush_succeeded = Flush();
179 return copying_output_.Close() && flush_succeeded;
180}
181
182FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
183 int file_descriptor)
184 : file_(file_descriptor),
185 close_on_delete_(false),
186 is_closed_(false),
187 errno_(0) {}
188
189FileOutputStream::~FileOutputStream() { Flush(); }
190
191FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
192 if (close_on_delete_) {
193 if (!Close()) {
194 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errnum: errno_);
195 }
196 }
197}
198
199bool FileOutputStream::CopyingFileOutputStream::Close() {
200 GOOGLE_CHECK(!is_closed_);
201
202 is_closed_ = true;
203 if (close_no_eintr(fd: file_) != 0) {
204 // The docs on close() do not specify whether a file descriptor is still
205 // open after close() fails with EIO. However, the glibc source code
206 // seems to indicate that it is not.
207 errno_ = errno;
208 return false;
209 }
210
211 return true;
212}
213
214bool FileOutputStream::CopyingFileOutputStream::Write(const void* buffer,
215 int size) {
216 GOOGLE_CHECK(!is_closed_);
217 int total_written = 0;
218
219 const uint8_t* buffer_base = reinterpret_cast<const uint8_t*>(buffer);
220
221 while (total_written < size) {
222 int bytes;
223 do {
224 bytes = write(fd: file_, buf: buffer_base + total_written, n: size - total_written);
225 } while (bytes < 0 && errno == EINTR);
226
227 if (bytes <= 0) {
228 // Write error.
229
230 // FIXME(kenton): According to the man page, if write() returns zero,
231 // there was no error; write() simply did not write anything. It's
232 // unclear under what circumstances this might happen, but presumably
233 // errno won't be set in this case. I am confused as to how such an
234 // event should be handled. For now I'm treating it as an error, since
235 // retrying seems like it could lead to an infinite loop. I suspect
236 // this never actually happens anyway.
237
238 if (bytes < 0) {
239 errno_ = errno;
240 }
241 return false;
242 }
243 total_written += bytes;
244 }
245
246 return true;
247}
248
249// ===================================================================
250
251IstreamInputStream::IstreamInputStream(std::istream* input, int block_size)
252 : copying_input_(input), impl_(&copying_input_, block_size) {}
253
254bool IstreamInputStream::Next(const void** data, int* size) {
255 return impl_.Next(data, size);
256}
257
258void IstreamInputStream::BackUp(int count) { impl_.BackUp(count); }
259
260bool IstreamInputStream::Skip(int count) { return impl_.Skip(count); }
261
262int64_t IstreamInputStream::ByteCount() const { return impl_.ByteCount(); }
263
264IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
265 std::istream* input)
266 : input_(input) {}
267
268IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
269
270int IstreamInputStream::CopyingIstreamInputStream::Read(void* buffer,
271 int size) {
272 input_->read(s: reinterpret_cast<char*>(buffer), n: size);
273 int result = input_->gcount();
274 if (result == 0 && input_->fail() && !input_->eof()) {
275 return -1;
276 }
277 return result;
278}
279
280// ===================================================================
281
282OstreamOutputStream::OstreamOutputStream(std::ostream* output, int block_size)
283 : copying_output_(output), impl_(&copying_output_, block_size) {}
284
285OstreamOutputStream::~OstreamOutputStream() { impl_.Flush(); }
286
287bool OstreamOutputStream::Next(void** data, int* size) {
288 return impl_.Next(data, size);
289}
290
291void OstreamOutputStream::BackUp(int count) { impl_.BackUp(count); }
292
293int64_t OstreamOutputStream::ByteCount() const { return impl_.ByteCount(); }
294
295OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
296 std::ostream* output)
297 : output_(output) {}
298
299OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
300}
301
302bool OstreamOutputStream::CopyingOstreamOutputStream::Write(const void* buffer,
303 int size) {
304 output_->write(s: reinterpret_cast<const char*>(buffer), n: size);
305 return output_->good();
306}
307
308// ===================================================================
309
310ConcatenatingInputStream::ConcatenatingInputStream(
311 ZeroCopyInputStream* const streams[], int count)
312 : streams_(streams), stream_count_(count), bytes_retired_(0) {
313}
314
315bool ConcatenatingInputStream::Next(const void** data, int* size) {
316 while (stream_count_ > 0) {
317 if (streams_[0]->Next(data, size)) return true;
318
319 // That stream is done. Advance to the next one.
320 bytes_retired_ += streams_[0]->ByteCount();
321 ++streams_;
322 --stream_count_;
323 }
324
325 // No more streams.
326 return false;
327}
328
329void ConcatenatingInputStream::BackUp(int count) {
330 if (stream_count_ > 0) {
331 streams_[0]->BackUp(count);
332 } else {
333 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
334 }
335}
336
337bool ConcatenatingInputStream::Skip(int count) {
338 while (stream_count_ > 0) {
339 // Assume that ByteCount() can be used to find out how much we actually
340 // skipped when Skip() fails.
341 int64_t target_byte_count = streams_[0]->ByteCount() + count;
342 if (streams_[0]->Skip(count)) return true;
343
344 // Hit the end of the stream. Figure out how many more bytes we still have
345 // to skip.
346 int64_t final_byte_count = streams_[0]->ByteCount();
347 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
348 count = target_byte_count - final_byte_count;
349
350 // That stream is done. Advance to the next one.
351 bytes_retired_ += final_byte_count;
352 ++streams_;
353 --stream_count_;
354 }
355
356 return false;
357}
358
359int64_t ConcatenatingInputStream::ByteCount() const {
360 if (stream_count_ == 0) {
361 return bytes_retired_;
362 } else {
363 return bytes_retired_ + streams_[0]->ByteCount();
364 }
365}
366
367
368// ===================================================================
369
370} // namespace io
371} // namespace protobuf
372} // namespace google
373