1/*
2 * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15#include <aws/core/utils/event/EventStreamBuf.h>
16#include <cassert>
17
18namespace Aws
19{
20 namespace Utils
21 {
22 namespace Event
23 {
24 const size_t DEFAULT_BUF_SIZE = 1024;
25
26 EventStreamBuf::EventStreamBuf(EventStreamDecoder& decoder, size_t bufferLength) :
27 m_byteBuffer(bufferLength),
28 m_bufferLength(bufferLength),
29 m_decoder(decoder)
30 {
31 assert(decoder);
32 char* begin = reinterpret_cast<char*>(m_byteBuffer.GetUnderlyingData());
33 char* end = begin + bufferLength - 1;
34
35 setp(begin, end);
36 setg(begin, begin, begin);
37 }
38
39 EventStreamBuf::~EventStreamBuf()
40 {
41 if (m_decoder)
42 {
43 writeToDecoder();
44 }
45 }
46
47 void EventStreamBuf::writeToDecoder()
48 {
49 if (pptr() > pbase())
50 {
51 size_t length = static_cast<size_t>(pptr() - pbase());
52 m_decoder.Pump(m_byteBuffer, length);
53
54 if (!m_decoder)
55 {
56 m_err.write(reinterpret_cast<char*>(m_byteBuffer.GetUnderlyingData()), length);
57 }
58 else
59 {
60 pbump(-static_cast<int>(length));
61 }
62 }
63 }
64
65 std::streampos EventStreamBuf::seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which)
66 {
67 if (dir == std::ios_base::beg)
68 {
69 return seekpos(off, which);
70 }
71 else if (dir == std::ios_base::end)
72 {
73 return seekpos(m_bufferLength - 1 - off, which);
74 }
75 else if (dir == std::ios_base::cur)
76 {
77 if (which == std::ios_base::in)
78 {
79 return seekpos((gptr() - (char*)m_byteBuffer.GetUnderlyingData()) + off, which);
80 }
81 if (which == std::ios_base::out)
82 {
83 return seekpos((pptr() - (char*)m_byteBuffer.GetUnderlyingData()) + off, which);
84 }
85 }
86
87 return std::streamoff(-1);
88 }
89
90 std::streampos EventStreamBuf::seekpos(std::streampos pos, std::ios_base::openmode which)
91 {
92 assert(static_cast<size_t>(pos) <= m_bufferLength);
93 if (static_cast<size_t>(pos) > m_bufferLength)
94 {
95 return std::streampos(std::streamoff(-1));
96 }
97
98 if (which == std::ios_base::in)
99 {
100 m_err.seekg(pos);
101 return m_err.tellg();
102 }
103
104 if (which == std::ios_base::out)
105 {
106 return pos;
107 }
108
109 return std::streampos(std::streamoff(-1));
110 }
111
112 int EventStreamBuf::underflow()
113 {
114 if (!m_err || m_err.eof() || m_decoder)
115 {
116 return std::char_traits<char>::eof();
117 }
118
119 m_err.flush();
120 m_err.read(reinterpret_cast<char*>(m_byteBuffer.GetUnderlyingData()), m_byteBuffer.GetLength());
121
122 char* begin = reinterpret_cast<char*>(m_byteBuffer.GetUnderlyingData());
123 setg(begin, begin, begin + m_err.gcount());
124 return std::char_traits<char>::to_int_type(*gptr());
125 }
126
127 int EventStreamBuf::overflow(int ch)
128 {
129 auto eof = std::char_traits<char>::eof();
130
131 if (m_decoder)
132 {
133 if (ch != eof)
134 {
135 *pptr() = (char)ch;
136 pbump(1);
137 }
138
139 writeToDecoder();
140 return ch;
141 }
142
143 return eof;
144 }
145
146 int EventStreamBuf::sync()
147 {
148 if (m_decoder)
149 {
150 writeToDecoder();
151 }
152
153 return 0;
154 }
155 }
156 }
157}
158