1/*
2 * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16#include <aws/common/common.h>
17#include <aws/core/utils/event/EventHeader.h>
18#include <aws/core/utils/event/EventMessage.h>
19#include <aws/core/utils/event/EventStreamDecoder.h>
20#include <aws/core/utils/logging/LogMacros.h>
21#include <aws/core/utils/UnreferencedParam.h>
22#include <aws/core/utils/memory/AWSMemory.h>
23
24namespace Aws
25{
26 namespace Utils
27 {
28 namespace Event
29 {
30 static const char EVENT_STREAM_DECODER_CLASS_TAG[] = "Aws::Utils::Event::EventStreamDecoder";
31
32 EventStreamDecoder::EventStreamDecoder(EventStreamHandler* handler) : m_eventStreamHandler(handler)
33 {
34 aws_event_stream_streaming_decoder_init(&m_decoder,
35 get_aws_allocator(),
36 onPayloadSegment,
37 onPreludeReceived,
38 onHeaderReceived,
39 onError,
40 (void*)handler);
41 }
42
43 EventStreamDecoder::~EventStreamDecoder()
44 {
45 aws_event_stream_streaming_decoder_clean_up(&m_decoder);
46 }
47
48 void EventStreamDecoder::Pump(const ByteBuffer& data)
49 {
50 Pump(data, data.GetLength());
51 }
52
53 void EventStreamDecoder::Pump(const ByteBuffer& data, size_t length)
54 {
55 aws_byte_buf dataBuf = aws_byte_buf_from_array(static_cast<uint8_t*>(data.GetUnderlyingData()), length);
56 aws_event_stream_streaming_decoder_pump(&m_decoder, &dataBuf);
57 }
58
59 void EventStreamDecoder::Reset()
60 {
61 m_eventStreamHandler->Reset();
62 }
63
64 void EventStreamDecoder::ResetEventStreamHandler(EventStreamHandler* handler)
65 {
66 aws_event_stream_streaming_decoder_init(&m_decoder, get_aws_allocator(),
67 onPayloadSegment,
68 onPreludeReceived,
69 onHeaderReceived,
70 onError,
71 reinterpret_cast<void *>(handler));
72 }
73
74 void EventStreamDecoder::onPayloadSegment(
75 aws_event_stream_streaming_decoder* decoder,
76 aws_byte_buf* payload,
77 int8_t isFinalSegment,
78 void* context)
79 {
80 AWS_UNREFERENCED_PARAM(decoder);
81 auto handler = static_cast<EventStreamHandler*>(context);
82 assert(handler);
83 if (!handler)
84 {
85 AWS_LOGSTREAM_ERROR(EVENT_STREAM_DECODER_CLASS_TAG, "Payload received, but decoder encountered internal errors before."
86 "ErrorCode: " << EventStreamErrorsMapper::GetNameForError(handler->GetInternalError()) << ", "
87 "ErrorMessage: " << handler->GetEventPayloadAsString());
88 return;
89 }
90 handler->WriteMessageEventPayload(static_cast<unsigned char*>(payload->buffer), payload->len);
91
92 // Complete payload received
93 if (isFinalSegment == 1)
94 {
95 assert(handler->IsMessageCompleted());
96 handler->OnEvent();
97 handler->Reset();
98 }
99 }
100
101 void EventStreamDecoder::onPreludeReceived(
102 aws_event_stream_streaming_decoder* decoder,
103 aws_event_stream_message_prelude* prelude,
104 void* context)
105 {
106 AWS_UNREFERENCED_PARAM(decoder);
107 auto handler = static_cast<EventStreamHandler*>(context);
108 handler->Reset();
109
110 //Encounter internal error in prelude received.
111 //This error will be handled by OnError callback function later.
112 if (prelude->total_len < prelude->headers_len + 16)
113 {
114 return;
115 }
116 handler->SetMessageMetadata(prelude->total_len, prelude->headers_len,
117 prelude->total_len - prelude->headers_len - 4/*total byte-length*/ - 4/*headers byte-length*/ - 4/*prelude crc*/ - 4/*message crc*/);
118 AWS_LOGSTREAM_TRACE(EVENT_STREAM_DECODER_CLASS_TAG, "Message received, the expected length of the message is: " << prelude->total_len <<
119 " bytes, and the expected length of the header is: " << prelude->headers_len << " bytes");
120
121 //Handle empty message
122 //if (handler->m_message.GetHeadersLength() == 0 && handler->m_message.GetPayloadLength() == 0)
123 if (handler->IsMessageCompleted())
124 {
125 handler->OnEvent();
126 handler->Reset();
127 }
128 }
129
130 void EventStreamDecoder::onHeaderReceived(
131 aws_event_stream_streaming_decoder* decoder,
132 aws_event_stream_message_prelude* prelude,
133 aws_event_stream_header_value_pair* header,
134 void* context)
135 {
136 AWS_UNREFERENCED_PARAM(decoder);
137 AWS_UNREFERENCED_PARAM(prelude);
138 auto handler = static_cast<EventStreamHandler*>(context);
139 assert(handler);
140 if (!handler)
141 {
142 AWS_LOGSTREAM_ERROR(EVENT_STREAM_DECODER_CLASS_TAG, "Payload received, but decoder encountered internal errors before."
143 "ErrorCode: " << EventStreamErrorsMapper::GetNameForError(handler->GetInternalError()) << ", "
144 "ErrorMessage: " << handler->GetEventPayloadAsString());
145 return;
146 }
147
148 // The length of a header = 1 byte (to represent the length of header name) + length of header name + 1 byte (to represent header type)
149 // + 2 bytes (to represent length of header value) + length of header value
150 handler->InsertMessageEventHeader(Aws::String(header->header_name, header->header_name_len),
151 1 + header->header_name_len + 1 + 2 + header->header_value_len, EventHeaderValue(header));
152
153 // Handle messages only have headers, but without payload.
154 //if (handler->m_message.GetHeadersLength() == handler->m_headersBytesReceived() && handler->m_message.GetPayloadLength() == 0)
155 if (handler->IsMessageCompleted())
156 {
157 handler->OnEvent();
158 handler->Reset();
159 }
160 }
161
162 void EventStreamDecoder::onError(
163 aws_event_stream_streaming_decoder* decoder,
164 aws_event_stream_message_prelude* prelude,
165 int error_code,
166 const char* message,
167 void* context)
168 {
169 AWS_UNREFERENCED_PARAM(decoder);
170 AWS_UNREFERENCED_PARAM(prelude);
171 auto handler = static_cast<EventStreamHandler*>(context);
172 handler->SetFailure();
173 handler->SetInternalError(error_code);
174 handler->WriteMessageEventPayload(reinterpret_cast<const unsigned char*>(message), strlen(message));
175 handler->OnEvent();
176 }
177 } // namespace Event
178 } // namespace Utils
179} // namespace Aws
180
181