1 | /** |
2 | * Licensed to the Apache Software Foundation (ASF) under one |
3 | * or more contributor license agreements. See the NOTICE file |
4 | * distributed with this work for additional information |
5 | * regarding copyright ownership. The ASF licenses this file |
6 | * to you under the Apache License, Version 2.0 (the |
7 | * "License"); you may not use this file except in compliance |
8 | * with the License. You may obtain a copy of the License at |
9 | * |
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
11 | * |
12 | * Unless required by applicable law or agreed to in writing, software |
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15 | * See the License for the specific language governing permissions and |
16 | * limitations under the License. |
17 | */ |
18 | |
19 | #include "orc/Exceptions.hh" |
20 | #include "OutputStream.hh" |
21 | |
22 | #include <sstream> |
23 | |
24 | namespace orc { |
25 | |
26 | PositionRecorder::~PositionRecorder() { |
27 | // PASS |
28 | } |
29 | |
30 | BufferedOutputStream::BufferedOutputStream( |
31 | MemoryPool& pool, |
32 | OutputStream * outStream, |
33 | uint64_t capacity_, |
34 | uint64_t blockSize_) |
35 | : outputStream(outStream), |
36 | blockSize(blockSize_) { |
37 | dataBuffer.reset(new DataBuffer<char>(pool)); |
38 | dataBuffer->reserve(capacity_); |
39 | } |
40 | |
41 | BufferedOutputStream::~BufferedOutputStream() { |
42 | // PASS |
43 | } |
44 | |
45 | bool BufferedOutputStream::Next(void** buffer, int* size) { |
46 | *size = static_cast<int>(blockSize); |
47 | uint64_t oldSize = dataBuffer->size(); |
48 | uint64_t newSize = oldSize + blockSize; |
49 | uint64_t newCapacity = dataBuffer->capacity(); |
50 | while (newCapacity < newSize) { |
51 | newCapacity += dataBuffer->capacity(); |
52 | } |
53 | dataBuffer->reserve(newCapacity); |
54 | dataBuffer->resize(dataBuffer->size() + blockSize); |
55 | *buffer = dataBuffer->data() + oldSize; |
56 | return true; |
57 | } |
58 | |
59 | void BufferedOutputStream::BackUp(int count) { |
60 | if (count >= 0) { |
61 | uint64_t unsignedCount = static_cast<uint64_t>(count); |
62 | if (unsignedCount <= dataBuffer->size()) { |
63 | dataBuffer->resize(dataBuffer->size() - unsignedCount); |
64 | } else { |
65 | throw std::logic_error("Can't backup that much!" ); |
66 | } |
67 | } |
68 | } |
69 | |
70 | google::protobuf::int64 BufferedOutputStream::ByteCount() const { |
71 | return static_cast<google::protobuf::int64>(dataBuffer->size()); |
72 | } |
73 | |
74 | bool BufferedOutputStream::WriteAliasedRaw(const void *, int) { |
75 | throw NotImplementedYet("WriteAliasedRaw is not supported." ); |
76 | } |
77 | |
78 | bool BufferedOutputStream::AllowsAliasing() const { |
79 | return false; |
80 | } |
81 | |
82 | std::string BufferedOutputStream::getName() const { |
83 | std::ostringstream result; |
84 | result << "BufferedOutputStream " << dataBuffer->size() << " of " |
85 | << dataBuffer->capacity(); |
86 | return result.str(); |
87 | } |
88 | |
89 | uint64_t BufferedOutputStream::getSize() const { |
90 | return dataBuffer->size(); |
91 | } |
92 | |
93 | uint64_t BufferedOutputStream::flush() { |
94 | uint64_t dataSize = dataBuffer->size(); |
95 | outputStream->write(dataBuffer->data(), dataBuffer->size()); |
96 | dataBuffer->resize(0); |
97 | return dataSize; |
98 | } |
99 | |
100 | void AppendOnlyBufferedStream::write(const char * data, size_t size) { |
101 | size_t dataOffset = 0; |
102 | while (size > 0) { |
103 | if (bufferOffset == bufferLength) { |
104 | if (!outStream->Next( |
105 | reinterpret_cast<void **>(&buffer), |
106 | &bufferLength)) { |
107 | throw std::logic_error("Failed to allocate buffer." ); |
108 | } |
109 | bufferOffset = 0; |
110 | } |
111 | size_t len = std::min( |
112 | static_cast<size_t>(bufferLength - bufferOffset), |
113 | size); |
114 | memcpy(buffer + bufferOffset, data + dataOffset, len); |
115 | bufferOffset += static_cast<int>(len); |
116 | dataOffset += len; |
117 | size -= len; |
118 | } |
119 | } |
120 | |
121 | uint64_t AppendOnlyBufferedStream::getSize() const { |
122 | return outStream->getSize(); |
123 | } |
124 | |
125 | uint64_t AppendOnlyBufferedStream::flush() { |
126 | outStream->BackUp(bufferLength - bufferOffset); |
127 | bufferOffset = bufferLength = 0; |
128 | buffer = nullptr; |
129 | return outStream->flush(); |
130 | } |
131 | |
132 | void AppendOnlyBufferedStream::recordPosition(PositionRecorder* recorder) const { |
133 | uint64_t flushedSize = outStream->getSize(); |
134 | uint64_t unflushedSize = static_cast<uint64_t>(bufferOffset); |
135 | if (outStream->isCompressed()) { |
136 | // start of the compression chunk in the stream |
137 | recorder->add(flushedSize); |
138 | // number of decompressed bytes that need to be consumed |
139 | recorder->add(unflushedSize); |
140 | } else { |
141 | flushedSize -= static_cast<uint64_t>(bufferLength); |
142 | // byte offset of the start location |
143 | recorder->add(flushedSize + unflushedSize); |
144 | } |
145 | } |
146 | |
147 | } |
148 | |