1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// Buffered stream implementations
19
20#ifndef ARROW_IO_BUFFERED_H
21#define ARROW_IO_BUFFERED_H
22
23#include <cstdint>
24#include <memory>
25
26#include "arrow/io/interfaces.h"
27#include "arrow/util/string_view.h"
28#include "arrow/util/visibility.h"
29
30namespace arrow {
31
32class Buffer;
33class MemoryPool;
34class Status;
35
36namespace io {
37
38class ARROW_EXPORT BufferedOutputStream : public OutputStream {
39 public:
40 ~BufferedOutputStream() override;
41
42 /// \brief Create a buffered output stream wrapping the given output stream.
43 /// \param[in] buffer_size the size of the temporary write buffer
44 /// \param[in] pool a MemoryPool to use for allocations
45 /// \param[in] raw another OutputStream
46 /// \param[out] out the created BufferedOutputStream
47 /// \return Status
48 static Status Create(int64_t buffer_size, MemoryPool* pool,
49 std::shared_ptr<OutputStream> raw,
50 std::shared_ptr<BufferedOutputStream>* out);
51
52 /// \brief Resize internal buffer
53 /// \param[in] new_buffer_size the new buffer size
54 /// \return Status
55 Status SetBufferSize(int64_t new_buffer_size);
56
57 /// \brief Return the current size of the internal buffer
58 int64_t buffer_size() const;
59
60 /// \brief Flush any buffered writes and release the raw
61 /// OutputStream. Further operations on this object are invalid
62 /// \param[out] raw the underlying OutputStream
63 /// \return Status
64 Status Detach(std::shared_ptr<OutputStream>* raw);
65
66 // OutputStream interface
67
68 /// \brief Close the buffered output stream. This implicitly closes the
69 /// underlying raw output stream.
70 Status Close() override;
71 bool closed() const override;
72
73 Status Tell(int64_t* position) const override;
74 // Write bytes to the stream. Thread-safe
75 Status Write(const void* data, int64_t nbytes) override;
76
77 Status Flush() override;
78
79 /// \brief Return the underlying raw output stream.
80 std::shared_ptr<OutputStream> raw() const;
81
82 private:
83 explicit BufferedOutputStream(std::shared_ptr<OutputStream> raw, MemoryPool* pool);
84
85 class ARROW_NO_EXPORT Impl;
86 std::unique_ptr<Impl> impl_;
87};
88
89/// \class BufferedInputStream
90/// \brief An InputStream that performs buffered reads from an unbuffered
91/// InputStream, which can mitigate the overhead of many small reads in some
92/// cases
93class ARROW_EXPORT BufferedInputStream : public InputStream {
94 public:
95 ~BufferedInputStream() override;
96
97 /// \brief Create a BufferedInputStream from a raw InputStream
98 /// \param[in] buffer_size the size of the temporary read buffer
99 /// \param[in] pool a MemoryPool to use for allocations
100 /// \param[in] raw a raw InputStream
101 /// \param[out] out the created BufferedInputStream
102 static Status Create(int64_t buffer_size, MemoryPool* pool,
103 std::shared_ptr<InputStream> raw,
104 std::shared_ptr<BufferedInputStream>* out);
105
106 /// \brief Resize internal read buffer; calls to Read(...) will read at least
107 /// \param[in] new_buffer_size the new read buffer size
108 /// \return Status
109 Status SetBufferSize(int64_t new_buffer_size);
110
111 /// \brief Return the number of remaining bytes in the read buffer
112 int64_t bytes_buffered() const;
113
114 /// \brief Return the current size of the internal buffer
115 int64_t buffer_size() const;
116
117 /// \brief Release the raw InputStream. Any data buffered will be
118 /// discarded. Further operations on this object are invalid
119 /// \return raw the underlying InputStream
120 std::shared_ptr<InputStream> Detach();
121
122 /// \brief Return the unbuffered InputStream
123 std::shared_ptr<InputStream> raw() const;
124
125 // InputStream APIs
126 util::string_view Peek(int64_t nbytes) const override;
127 Status Close() override;
128 bool closed() const override;
129
130 /// \brief Returns the position of the buffered stream, though the position
131 /// of the unbuffered stream may be further advanced
132 Status Tell(int64_t* position) const override;
133
134 Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override;
135
136 /// \brief Read into buffer. If the read is already buffered, then this will
137 /// return a slice into the buffer
138 Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
139
140 private:
141 explicit BufferedInputStream(std::shared_ptr<InputStream> raw, MemoryPool* pool);
142
143 class ARROW_NO_EXPORT Impl;
144 std::unique_ptr<Impl> impl_;
145};
146
147} // namespace io
148} // namespace arrow
149
150#endif // ARROW_IO_BUFFERED_H
151