1 | /* |
2 | * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"). |
5 | * You may not use this file except in compliance with the License. |
6 | * A copy of the License is located at |
7 | * |
8 | * http://aws.amazon.com/apache2.0 |
9 | * |
10 | * or in the "license" file accompanying this file. This file is distributed |
11 | * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
12 | * express or implied. See the License for the specific language governing |
13 | * permissions and limitations under the License. |
14 | */ |
15 | #include <aws/core/utils/stream/ConcurrentStreamBuf.h> |
16 | #include <aws/core/utils/logging/LogMacros.h> |
17 | #include <cstdint> |
18 | #include <cassert> |
19 | |
20 | namespace Aws |
21 | { |
22 | namespace Utils |
23 | { |
24 | namespace Stream |
25 | { |
26 | const char TAG[] = "ConcurrentStreamBuf" ; |
27 | ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) : |
28 | m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it. |
29 | m_eof(false) |
30 | { |
31 | m_getArea.reserve(bufferLength); |
32 | m_backbuf.reserve(bufferLength); |
33 | |
34 | char* pbegin = reinterpret_cast<char*>(&m_putArea[0]); |
35 | setp(pbegin, pbegin + bufferLength); |
36 | } |
37 | |
38 | void ConcurrentStreamBuf::SetEof() |
39 | { |
40 | { |
41 | std::unique_lock<std::mutex> lock(m_lock); |
42 | m_eof = true; |
43 | } |
44 | m_signal.notify_all(); |
45 | } |
46 | |
47 | void ConcurrentStreamBuf::FlushPutArea() |
48 | { |
49 | const size_t bitslen = pptr() - pbase(); |
50 | if (bitslen) |
51 | { |
52 | // scope the lock |
53 | { |
54 | std::unique_lock<std::mutex> lock(m_lock); |
55 | m_signal.wait(lock, [this, bitslen]{ return m_eof || bitslen <= (m_backbuf.capacity() - m_backbuf.size()); }); |
56 | if (m_eof) |
57 | { |
58 | return; |
59 | } |
60 | std::copy(pbase(), pptr(), std::back_inserter(m_backbuf)); |
61 | } |
62 | m_signal.notify_one(); |
63 | char* pbegin = reinterpret_cast<char*>(&m_putArea[0]); |
64 | setp(pbegin, pbegin + m_putArea.size()); |
65 | } |
66 | } |
67 | |
68 | std::streampos ConcurrentStreamBuf::seekoff(std::streamoff, std::ios_base::seekdir, std::ios_base::openmode) |
69 | { |
70 | return std::streamoff(-1); // Seeking is not supported. |
71 | } |
72 | |
73 | std::streampos ConcurrentStreamBuf::seekpos(std::streampos, std::ios_base::openmode) |
74 | { |
75 | return std::streamoff(-1); // Seeking is not supported. |
76 | } |
77 | |
78 | int ConcurrentStreamBuf::underflow() |
79 | { |
80 | { |
81 | std::unique_lock<std::mutex> lock(m_lock); |
82 | m_signal.wait(lock, [this]{ return m_backbuf.empty() == false || m_eof; }); |
83 | |
84 | if (m_eof && m_backbuf.empty()) |
85 | { |
86 | return std::char_traits<char>::eof(); |
87 | } |
88 | |
89 | m_getArea.clear(); // keep the get-area from growing unbounded. |
90 | std::copy(m_backbuf.begin(), m_backbuf.end(), std::back_inserter(m_getArea)); |
91 | m_backbuf.clear(); |
92 | } |
93 | m_signal.notify_one(); |
94 | char* gbegin = reinterpret_cast<char*>(&m_getArea[0]); |
95 | setg(gbegin, gbegin, gbegin + m_getArea.size()); |
96 | return std::char_traits<char>::to_int_type(*gptr()); |
97 | } |
98 | |
99 | std::streamsize ConcurrentStreamBuf::showmanyc() |
100 | { |
101 | std::unique_lock<std::mutex> lock(m_lock); |
102 | AWS_LOGSTREAM_TRACE(TAG, "stream how many character? " << m_backbuf.size()); |
103 | return m_backbuf.size(); |
104 | } |
105 | |
106 | int ConcurrentStreamBuf::overflow(int ch) |
107 | { |
108 | const auto eof = std::char_traits<char>::eof(); |
109 | |
110 | if (ch == eof) |
111 | { |
112 | FlushPutArea(); |
113 | return eof; |
114 | } |
115 | |
116 | FlushPutArea(); |
117 | { |
118 | std::unique_lock<std::mutex> lock(m_lock); |
119 | if (m_eof) |
120 | { |
121 | return eof; |
122 | } |
123 | *pptr() = static_cast<char>(ch); |
124 | pbump(1); |
125 | return ch; |
126 | } |
127 | } |
128 | |
129 | int ConcurrentStreamBuf::sync() |
130 | { |
131 | FlushPutArea(); |
132 | return 0; |
133 | } |
134 | } |
135 | } |
136 | } |
137 | |