1/*
2 * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15#include <aws/core/utils/stream/ConcurrentStreamBuf.h>
16#include <aws/core/utils/logging/LogMacros.h>
17#include <cstdint>
18#include <cassert>
19
20namespace Aws
21{
22 namespace Utils
23 {
24 namespace Stream
25 {
26 const char TAG[] = "ConcurrentStreamBuf";
27 ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) :
28 m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it.
29 m_eof(false)
30 {
31 m_getArea.reserve(bufferLength);
32 m_backbuf.reserve(bufferLength);
33
34 char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
35 setp(pbegin, pbegin + bufferLength);
36 }
37
38 void ConcurrentStreamBuf::SetEof()
39 {
40 {
41 std::unique_lock<std::mutex> lock(m_lock);
42 m_eof = true;
43 }
44 m_signal.notify_all();
45 }
46
47 void ConcurrentStreamBuf::FlushPutArea()
48 {
49 const size_t bitslen = pptr() - pbase();
50 if (bitslen)
51 {
52 // scope the lock
53 {
54 std::unique_lock<std::mutex> lock(m_lock);
55 m_signal.wait(lock, [this, bitslen]{ return m_eof || bitslen <= (m_backbuf.capacity() - m_backbuf.size()); });
56 if (m_eof)
57 {
58 return;
59 }
60 std::copy(pbase(), pptr(), std::back_inserter(m_backbuf));
61 }
62 m_signal.notify_one();
63 char* pbegin = reinterpret_cast<char*>(&m_putArea[0]);
64 setp(pbegin, pbegin + m_putArea.size());
65 }
66 }
67
68 std::streampos ConcurrentStreamBuf::seekoff(std::streamoff, std::ios_base::seekdir, std::ios_base::openmode)
69 {
70 return std::streamoff(-1); // Seeking is not supported.
71 }
72
73 std::streampos ConcurrentStreamBuf::seekpos(std::streampos, std::ios_base::openmode)
74 {
75 return std::streamoff(-1); // Seeking is not supported.
76 }
77
78 int ConcurrentStreamBuf::underflow()
79 {
80 {
81 std::unique_lock<std::mutex> lock(m_lock);
82 m_signal.wait(lock, [this]{ return m_backbuf.empty() == false || m_eof; });
83
84 if (m_eof && m_backbuf.empty())
85 {
86 return std::char_traits<char>::eof();
87 }
88
89 m_getArea.clear(); // keep the get-area from growing unbounded.
90 std::copy(m_backbuf.begin(), m_backbuf.end(), std::back_inserter(m_getArea));
91 m_backbuf.clear();
92 }
93 m_signal.notify_one();
94 char* gbegin = reinterpret_cast<char*>(&m_getArea[0]);
95 setg(gbegin, gbegin, gbegin + m_getArea.size());
96 return std::char_traits<char>::to_int_type(*gptr());
97 }
98
99 std::streamsize ConcurrentStreamBuf::showmanyc()
100 {
101 std::unique_lock<std::mutex> lock(m_lock);
102 AWS_LOGSTREAM_TRACE(TAG, "stream how many character? " << m_backbuf.size());
103 return m_backbuf.size();
104 }
105
106 int ConcurrentStreamBuf::overflow(int ch)
107 {
108 const auto eof = std::char_traits<char>::eof();
109
110 if (ch == eof)
111 {
112 FlushPutArea();
113 return eof;
114 }
115
116 FlushPutArea();
117 {
118 std::unique_lock<std::mutex> lock(m_lock);
119 if (m_eof)
120 {
121 return eof;
122 }
123 *pptr() = static_cast<char>(ch);
124 pbump(1);
125 return ch;
126 }
127 }
128
129 int ConcurrentStreamBuf::sync()
130 {
131 FlushPutArea();
132 return 0;
133 }
134 }
135 }
136}
137