1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19#include "orc/Exceptions.hh"
20#include "OutputStream.hh"
21
22#include <sstream>
23
24namespace orc {
25
26 PositionRecorder::~PositionRecorder() {
27 // PASS
28 }
29
30 BufferedOutputStream::BufferedOutputStream(
31 MemoryPool& pool,
32 OutputStream * outStream,
33 uint64_t capacity_,
34 uint64_t blockSize_)
35 : outputStream(outStream),
36 blockSize(blockSize_) {
37 dataBuffer.reset(new DataBuffer<char>(pool));
38 dataBuffer->reserve(capacity_);
39 }
40
41 BufferedOutputStream::~BufferedOutputStream() {
42 // PASS
43 }
44
45 bool BufferedOutputStream::Next(void** buffer, int* size) {
46 *size = static_cast<int>(blockSize);
47 uint64_t oldSize = dataBuffer->size();
48 uint64_t newSize = oldSize + blockSize;
49 uint64_t newCapacity = dataBuffer->capacity();
50 while (newCapacity < newSize) {
51 newCapacity += dataBuffer->capacity();
52 }
53 dataBuffer->reserve(newCapacity);
54 dataBuffer->resize(dataBuffer->size() + blockSize);
55 *buffer = dataBuffer->data() + oldSize;
56 return true;
57 }
58
59 void BufferedOutputStream::BackUp(int count) {
60 if (count >= 0) {
61 uint64_t unsignedCount = static_cast<uint64_t>(count);
62 if (unsignedCount <= dataBuffer->size()) {
63 dataBuffer->resize(dataBuffer->size() - unsignedCount);
64 } else {
65 throw std::logic_error("Can't backup that much!");
66 }
67 }
68 }
69
70 google::protobuf::int64 BufferedOutputStream::ByteCount() const {
71 return static_cast<google::protobuf::int64>(dataBuffer->size());
72 }
73
74 bool BufferedOutputStream::WriteAliasedRaw(const void *, int) {
75 throw NotImplementedYet("WriteAliasedRaw is not supported.");
76 }
77
78 bool BufferedOutputStream::AllowsAliasing() const {
79 return false;
80 }
81
82 std::string BufferedOutputStream::getName() const {
83 std::ostringstream result;
84 result << "BufferedOutputStream " << dataBuffer->size() << " of "
85 << dataBuffer->capacity();
86 return result.str();
87 }
88
89 uint64_t BufferedOutputStream::getSize() const {
90 return dataBuffer->size();
91 }
92
93 uint64_t BufferedOutputStream::flush() {
94 uint64_t dataSize = dataBuffer->size();
95 outputStream->write(dataBuffer->data(), dataBuffer->size());
96 dataBuffer->resize(0);
97 return dataSize;
98 }
99
100 void AppendOnlyBufferedStream::write(const char * data, size_t size) {
101 size_t dataOffset = 0;
102 while (size > 0) {
103 if (bufferOffset == bufferLength) {
104 if (!outStream->Next(
105 reinterpret_cast<void **>(&buffer),
106 &bufferLength)) {
107 throw std::logic_error("Failed to allocate buffer.");
108 }
109 bufferOffset = 0;
110 }
111 size_t len = std::min(
112 static_cast<size_t>(bufferLength - bufferOffset),
113 size);
114 memcpy(buffer + bufferOffset, data + dataOffset, len);
115 bufferOffset += static_cast<int>(len);
116 dataOffset += len;
117 size -= len;
118 }
119 }
120
121 uint64_t AppendOnlyBufferedStream::getSize() const {
122 return outStream->getSize();
123 }
124
125 uint64_t AppendOnlyBufferedStream::flush() {
126 outStream->BackUp(bufferLength - bufferOffset);
127 bufferOffset = bufferLength = 0;
128 buffer = nullptr;
129 return outStream->flush();
130 }
131
132 void AppendOnlyBufferedStream::recordPosition(PositionRecorder* recorder) const {
133 uint64_t flushedSize = outStream->getSize();
134 uint64_t unflushedSize = static_cast<uint64_t>(bufferOffset);
135 if (outStream->isCompressed()) {
136 // start of the compression chunk in the stream
137 recorder->add(flushedSize);
138 // number of decompressed bytes that need to be consumed
139 recorder->add(unflushedSize);
140 } else {
141 flushedSize -= static_cast<uint64_t>(bufferLength);
142 // byte offset of the start location
143 recorder->add(flushedSize + unflushedSize);
144 }
145 }
146
147}
148