1/*
2 * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16#include <aws/event-stream/event_stream.h>
17
18#include <aws/checksums/crc.h>
19
20#include <aws/common/encoding.h>
21
22#include <inttypes.h>
23
24/* max message size is 16MB */
25#define MAX_MESSAGE_SIZE (16 * 1024 * 1024)
26
27/* max header size is 128kb */
28#define MAX_HEADERS_SIZE (128 * 1024)
29#define LIB_NAME "libaws-c-event-stream"
30
31#if _MSC_VER
32# pragma warning(push)
33# pragma warning(disable : 4221) /* aggregate initializer using local variable addresses */
34# pragma warning(disable : 4204) /* non-constant aggregate initializer */
35# pragma warning(disable : 4306) /* msft doesn't trust us to do pointer arithmetic. */
36#endif
37
38static struct aws_error_info s_errors[] = {
39 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH, "Buffer length mismatch", LIB_NAME),
40 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_INSUFFICIENT_BUFFER_LEN, "insufficient buffer length", LIB_NAME),
41 AWS_DEFINE_ERROR_INFO(
42 AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED,
43 "a field for the message was too large",
44 LIB_NAME),
45 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, "prelude checksum was incorrect", LIB_NAME),
46 AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE, "message checksum was incorrect", LIB_NAME),
47 AWS_DEFINE_ERROR_INFO(
48 AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN,
49 "message headers length was incorrect",
50 LIB_NAME),
51 AWS_DEFINE_ERROR_INFO(
52 AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE,
53 "An unknown header type was encountered",
54 LIB_NAME),
55 AWS_DEFINE_ERROR_INFO(
56 AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE,
57 "message parser encountered an illegal state",
58 LIB_NAME),
59};
60
61static struct aws_error_info_list s_list = {
62 .error_list = s_errors,
63 .count = sizeof(s_errors) / sizeof(struct aws_error_info),
64};
65
66static bool s_event_stream_library_initialized = false;
67
68void aws_event_stream_library_init(struct aws_allocator *allocator) {
69 if (!s_event_stream_library_initialized) {
70 s_event_stream_library_initialized = true;
71 aws_common_library_init(allocator);
72 aws_register_error_info(&s_list);
73 }
74}
75
76void aws_event_stream_library_clean_up(void) {
77 if (s_event_stream_library_initialized) {
78 s_event_stream_library_initialized = false;
79 aws_unregister_error_info(&s_list);
80 aws_common_library_clean_up();
81 }
82}
83
84#define TOTAL_LEN_OFFSET 0
85#define PRELUDE_CRC_OFFSET (sizeof(uint32_t) + sizeof(uint32_t))
86#define HEADER_LEN_OFFSET sizeof(uint32_t)
87
88/* Computes the byte length necessary to store the headers represented in the headers list.
89 * returns that length. */
90uint32_t compute_headers_len(struct aws_array_list *headers) {
91 if (!headers || !aws_array_list_length(headers)) {
92 return 0;
93 }
94
95 size_t headers_count = aws_array_list_length(headers);
96 size_t headers_len = 0;
97
98 for (size_t i = 0; i < headers_count; ++i) {
99 struct aws_event_stream_header_value_pair *header = NULL;
100
101 aws_array_list_get_at_ptr(headers, (void **)&header, i);
102
103 headers_len += sizeof(header->header_name_len) + header->header_name_len + 1;
104
105 if (header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING ||
106 header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF) {
107 headers_len += sizeof(header->header_value_len);
108 }
109
110 if (header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_FALSE &&
111 header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_TRUE) {
112 headers_len += header->header_value_len;
113 }
114 }
115
116 return (uint32_t)headers_len;
117}
118
119/* adds the headers represented in the headers list to the buffer.
120 returns the new buffer offset for use elsewhere. Assumes buffer length is at least the length of the return value
121 from compute_headers_length() */
122size_t add_headers_to_buffer(struct aws_array_list *headers, uint8_t *buffer) {
123 if (!headers || !aws_array_list_length(headers)) {
124 return 0;
125 }
126
127 size_t headers_count = aws_array_list_length(headers);
128 uint8_t *buffer_alias = buffer;
129
130 for (size_t i = 0; i < headers_count; ++i) {
131 struct aws_event_stream_header_value_pair *header = NULL;
132
133 aws_array_list_get_at_ptr(headers, (void **)&header, i);
134 *buffer_alias = (uint8_t)header->header_name_len;
135 buffer_alias++;
136 memcpy(buffer_alias, header->header_name, (size_t)header->header_name_len);
137 buffer_alias += header->header_name_len;
138 *buffer_alias = (uint8_t)header->header_value_type;
139 buffer_alias++;
140 switch (header->header_value_type) {
141 case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
142 case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
143 break;
144 case AWS_EVENT_STREAM_HEADER_BYTE:
145 *buffer_alias = header->header_value.static_val[0];
146 buffer_alias++;
147 break;
148 /* additions of integers here assume the endianness conversion has already happened */
149 case AWS_EVENT_STREAM_HEADER_INT16:
150 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint16_t));
151 buffer_alias += sizeof(uint16_t);
152 break;
153 case AWS_EVENT_STREAM_HEADER_INT32:
154 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint32_t));
155 buffer_alias += sizeof(uint32_t);
156 break;
157 case AWS_EVENT_STREAM_HEADER_INT64:
158 case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
159 memcpy(buffer_alias, header->header_value.static_val, sizeof(uint64_t));
160 buffer_alias += sizeof(uint64_t);
161 break;
162 case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
163 case AWS_EVENT_STREAM_HEADER_STRING:
164 aws_write_u16(header->header_value_len, buffer_alias);
165 buffer_alias += sizeof(uint16_t);
166 memcpy(buffer_alias, header->header_value.variable_len_val, header->header_value_len);
167 buffer_alias += header->header_value_len;
168 break;
169 case AWS_EVENT_STREAM_HEADER_UUID:
170 memcpy(buffer_alias, header->header_value.static_val, 16);
171 buffer_alias += header->header_value_len;
172 break;
173 }
174 }
175
176 return buffer_alias - buffer;
177}
178
179/* Get the headers from the buffer, store them in the headers list.
180 * the user's reponsibility to cleanup the list when they are finished with it.
181 * no buffer copies happen here, the lifetime of the buffer, must outlive the usage of the headers.
182 * returns error codes defined in the public interface. */
183int get_headers_from_buffer(struct aws_array_list *headers, const uint8_t *buffer, size_t headers_len) {
184
185 if (AWS_UNLIKELY(headers_len > MAX_HEADERS_SIZE)) {
186 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
187 }
188
189 /* iterate the buffer per header. */
190 const uint8_t *buffer_start = buffer;
191 while ((size_t)(buffer - buffer_start) < headers_len) {
192 struct aws_event_stream_header_value_pair header;
193 AWS_ZERO_STRUCT(header);
194
195 /* get the header info from the buffer, make sure to increment buffer offset. */
196 header.header_name_len = *buffer;
197 buffer += sizeof(header.header_name_len);
198 memcpy((void *)header.header_name, buffer, (size_t)header.header_name_len);
199 buffer += header.header_name_len;
200 header.header_value_type = (enum aws_event_stream_header_value_type) * buffer;
201 buffer++;
202
203 switch (header.header_value_type) {
204 case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
205 header.header_value_len = 0;
206 header.header_value.static_val[0] = 0;
207 break;
208 case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
209 header.header_value_len = 0;
210 header.header_value.static_val[0] = 1;
211 break;
212 case AWS_EVENT_STREAM_HEADER_BYTE:
213 header.header_value_len = sizeof(uint8_t);
214 header.header_value.static_val[0] = *buffer;
215 buffer++;
216 break;
217 case AWS_EVENT_STREAM_HEADER_INT16:
218 header.header_value_len = sizeof(uint16_t);
219 memcpy(header.header_value.static_val, buffer, sizeof(uint16_t));
220 buffer += sizeof(uint16_t);
221 break;
222 case AWS_EVENT_STREAM_HEADER_INT32:
223 header.header_value_len = sizeof(uint32_t);
224 memcpy(header.header_value.static_val, buffer, sizeof(uint32_t));
225 buffer += sizeof(uint32_t);
226 break;
227 case AWS_EVENT_STREAM_HEADER_INT64:
228 case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
229 header.header_value_len = sizeof(uint64_t);
230 memcpy(header.header_value.static_val, buffer, sizeof(uint64_t));
231 buffer += sizeof(uint64_t);
232 break;
233 case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
234 case AWS_EVENT_STREAM_HEADER_STRING:
235 header.header_value_len = aws_read_u16(buffer);
236 buffer += sizeof(header.header_value_len);
237 header.header_value.variable_len_val = (uint8_t *)buffer;
238 buffer += header.header_value_len;
239 break;
240 case AWS_EVENT_STREAM_HEADER_UUID:
241 header.header_value_len = 16;
242 memcpy(header.header_value.static_val, buffer, 16);
243 buffer += header.header_value_len;
244 break;
245 }
246
247 if (aws_array_list_push_back(headers, (const void *)&header)) {
248 return AWS_OP_ERR;
249 }
250 }
251
252 return AWS_OP_SUCCESS;
253}
254
255/* initialize message with the arguments
256 * the underlying buffer will be allocated and payload will be copied.
257 * see specification, this code should simply add these fields according to that.*/
258int aws_event_stream_message_init(
259 struct aws_event_stream_message *message,
260 struct aws_allocator *alloc,
261 struct aws_array_list *headers,
262 struct aws_byte_buf *payload) {
263
264 size_t payload_len = payload ? payload->len : 0;
265
266 uint32_t headers_length = compute_headers_len(headers);
267
268 if (AWS_UNLIKELY(headers_length > MAX_HEADERS_SIZE)) {
269 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
270 }
271
272 uint32_t total_length =
273 (uint32_t)(AWS_EVENT_STREAM_PRELUDE_LENGTH + headers_length + payload_len + AWS_EVENT_STREAM_TRAILER_LENGTH);
274
275 if (AWS_UNLIKELY(total_length < headers_length || total_length < payload_len)) {
276 return aws_raise_error(AWS_ERROR_OVERFLOW_DETECTED);
277 }
278
279 if (AWS_UNLIKELY(total_length > MAX_MESSAGE_SIZE)) {
280 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
281 }
282
283 message->alloc = alloc;
284 message->message_buffer = aws_mem_acquire(message->alloc, total_length);
285
286 if (message->message_buffer) {
287 message->owns_buffer = 1;
288 aws_write_u32(total_length, message->message_buffer);
289 uint8_t *buffer_offset = message->message_buffer + sizeof(total_length);
290 aws_write_u32(headers_length, buffer_offset);
291 buffer_offset += sizeof(headers_length);
292
293 uint32_t running_crc =
294 aws_checksums_crc32(message->message_buffer, (int)(buffer_offset - message->message_buffer), 0);
295
296 const uint8_t *message_crc_boundary_start = buffer_offset;
297 aws_write_u32(running_crc, buffer_offset);
298 buffer_offset += sizeof(running_crc);
299
300 if (headers_length) {
301 buffer_offset += add_headers_to_buffer(headers, buffer_offset);
302 }
303
304 if (payload) {
305 memcpy(buffer_offset, payload->buffer, payload->len);
306 buffer_offset += payload->len;
307 }
308
309 running_crc = aws_checksums_crc32(
310 message_crc_boundary_start, (int)(buffer_offset - message_crc_boundary_start), running_crc);
311 aws_write_u32(running_crc, buffer_offset);
312
313 return AWS_OP_SUCCESS;
314 }
315
316 return aws_raise_error(AWS_ERROR_OOM);
317}
318
319/* add buffer to the message (non-owning). Verify buffer crcs and that length fields are reasonable. */
320int aws_event_stream_message_from_buffer(
321 struct aws_event_stream_message *message,
322 struct aws_allocator *alloc,
323 struct aws_byte_buf *buffer) {
324 AWS_ASSERT(buffer);
325
326 message->alloc = alloc;
327 message->owns_buffer = 0;
328
329 if (AWS_UNLIKELY(buffer->len < AWS_EVENT_STREAM_PRELUDE_LENGTH + AWS_EVENT_STREAM_TRAILER_LENGTH)) {
330 return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH);
331 }
332
333 uint32_t message_length = aws_read_u32(buffer->buffer + TOTAL_LEN_OFFSET);
334
335 if (AWS_UNLIKELY(message_length != buffer->len)) {
336 return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH);
337 }
338
339 if (AWS_UNLIKELY(message_length > MAX_MESSAGE_SIZE)) {
340 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
341 }
342
343 uint32_t running_crc = aws_checksums_crc32(buffer->buffer, (int)PRELUDE_CRC_OFFSET, 0);
344 uint32_t prelude_crc = aws_read_u32(buffer->buffer + PRELUDE_CRC_OFFSET);
345
346 if (running_crc != prelude_crc) {
347 return aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
348 }
349
350 running_crc = aws_checksums_crc32(
351 buffer->buffer + PRELUDE_CRC_OFFSET,
352 (int)(message_length - PRELUDE_CRC_OFFSET - AWS_EVENT_STREAM_TRAILER_LENGTH),
353 running_crc);
354 uint32_t message_crc = aws_read_u32(buffer->buffer + message_length - AWS_EVENT_STREAM_TRAILER_LENGTH);
355
356 if (running_crc != message_crc) {
357 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE);
358 }
359
360 message->message_buffer = buffer->buffer;
361
362 if (aws_event_stream_message_headers_len(message) >
363 message_length - AWS_EVENT_STREAM_PRELUDE_LENGTH - AWS_EVENT_STREAM_TRAILER_LENGTH) {
364 message->message_buffer = 0;
365 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN);
366 }
367
368 return AWS_OP_SUCCESS;
369}
370
371/* Verify buffer crcs and that length fields are reasonable. Once that is done, the buffer is copied to the message. */
372int aws_event_stream_message_from_buffer_copy(
373 struct aws_event_stream_message *message,
374 struct aws_allocator *alloc,
375 const struct aws_byte_buf *buffer) {
376 int parse_value = aws_event_stream_message_from_buffer(message, alloc, (struct aws_byte_buf *)buffer);
377
378 if (!parse_value) {
379 message->message_buffer = aws_mem_acquire(alloc, buffer->len);
380
381 if (message->message_buffer) {
382 memcpy(message->message_buffer, buffer->buffer, buffer->len);
383 message->alloc = alloc;
384 message->owns_buffer = 1;
385
386 return AWS_OP_SUCCESS;
387 }
388
389 return aws_raise_error(AWS_ERROR_OOM);
390 }
391
392 return parse_value;
393}
394
395/* if buffer is owned, release the memory. */
396void aws_event_stream_message_clean_up(struct aws_event_stream_message *message) {
397 if (message->message_buffer && message->owns_buffer) {
398 aws_mem_release(message->alloc, message->message_buffer);
399 }
400}
401
402uint32_t aws_event_stream_message_total_length(const struct aws_event_stream_message *message) {
403 return aws_read_u32(message->message_buffer + TOTAL_LEN_OFFSET);
404}
405
406uint32_t aws_event_stream_message_headers_len(const struct aws_event_stream_message *message) {
407 return aws_read_u32(message->message_buffer + HEADER_LEN_OFFSET);
408}
409
410uint32_t aws_event_stream_message_prelude_crc(const struct aws_event_stream_message *message) {
411 return aws_read_u32(message->message_buffer + PRELUDE_CRC_OFFSET);
412}
413
414int aws_event_stream_message_headers(const struct aws_event_stream_message *message, struct aws_array_list *headers) {
415 return get_headers_from_buffer(
416 headers,
417 message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH,
418 aws_event_stream_message_headers_len(message));
419}
420
421const uint8_t *aws_event_stream_message_payload(const struct aws_event_stream_message *message) {
422 return message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message);
423}
424
425uint32_t aws_event_stream_message_payload_len(const struct aws_event_stream_message *message) {
426 return aws_event_stream_message_total_length(message) -
427 (AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message) +
428 AWS_EVENT_STREAM_TRAILER_LENGTH);
429}
430
431uint32_t aws_event_stream_message_message_crc(const struct aws_event_stream_message *message) {
432 return aws_read_u32(
433 message->message_buffer + (aws_event_stream_message_total_length(message) - AWS_EVENT_STREAM_TRAILER_LENGTH));
434}
435
436const uint8_t *aws_event_stream_message_buffer(const struct aws_event_stream_message *message) {
437 return message->message_buffer;
438}
439
440#define DEBUG_STR_PRELUDE_TOTAL_LEN "\"total_length\": "
441#define DEBUG_STR_PRELUDE_HDRS_LEN "\"headers_length\": "
442#define DEBUG_STR_PRELUDE_CRC "\"prelude_crc\": "
443#define DEBUG_STR_MESSAGE_CRC "\"message_crc\": "
444#define DEBUG_STR_HEADER_NAME "\"name\": "
445#define DEBUG_STR_HEADER_VALUE "\"value\": "
446#define DEBUG_STR_HEADER_TYPE "\"type\": "
447
448int aws_event_stream_message_to_debug_str(FILE *fd, const struct aws_event_stream_message *message) {
449 struct aws_array_list headers;
450 aws_event_stream_headers_list_init(&headers, message->alloc);
451 aws_event_stream_message_headers(message, &headers);
452
453 fprintf(
454 fd,
455 "{\n " DEBUG_STR_PRELUDE_TOTAL_LEN "%d,\n " DEBUG_STR_PRELUDE_HDRS_LEN "%d,\n " DEBUG_STR_PRELUDE_CRC
456 "%d,\n",
457 aws_event_stream_message_total_length(message),
458 aws_event_stream_message_headers_len(message),
459 aws_event_stream_message_prelude_crc(message));
460
461 int count = 0;
462
463 uint16_t headers_count = (uint16_t)aws_array_list_length(&headers);
464
465 fprintf(fd, " \"headers\": [");
466
467 for (uint16_t i = 0; i < headers_count; ++i) {
468 struct aws_event_stream_header_value_pair *header = NULL;
469
470 aws_array_list_get_at_ptr(&headers, (void **)&header, i);
471
472 fprintf(fd, " {\n");
473
474 fprintf(fd, " " DEBUG_STR_HEADER_NAME "\"");
475 fwrite(header->header_name, sizeof(char), (size_t)header->header_name_len, fd);
476 fprintf(fd, "\",\n");
477
478 fprintf(fd, " " DEBUG_STR_HEADER_TYPE "%d,\n", header->header_value_type);
479
480 if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_FALSE) {
481 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "false\n");
482 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE) {
483 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "true\n");
484 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE) {
485 int8_t int_value = (int8_t)header->header_value.static_val[0];
486 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
487 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT16) {
488 int16_t int_value = aws_read_u16(header->header_value.static_val);
489 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
490 } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT32) {
491 int32_t int_value = (int32_t)aws_read_u32(header->header_value.static_val);
492 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n", (int)int_value);
493 } else if (
494 header->header_value_type == AWS_EVENT_STREAM_HEADER_INT64 ||
495 header->header_value_type == AWS_EVENT_STREAM_HEADER_TIMESTAMP) {
496 int64_t int_value = (int64_t)aws_read_u64(header->header_value.static_val);
497 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%lld\n", (long long)int_value);
498 } else {
499 size_t buffer_len = 0;
500 aws_base64_compute_encoded_len(header->header_value_len, &buffer_len);
501 char *encoded_buffer = (char *)aws_mem_acquire(message->alloc, buffer_len);
502 if (!encoded_buffer) {
503 return aws_raise_error(AWS_ERROR_OOM);
504 }
505
506 struct aws_byte_buf encode_output = aws_byte_buf_from_array((uint8_t *)encoded_buffer, buffer_len);
507
508 if (header->header_value_type == AWS_EVENT_STREAM_HEADER_UUID) {
509 struct aws_byte_cursor to_encode =
510 aws_byte_cursor_from_array(header->header_value.static_val, header->header_value_len);
511
512 aws_base64_encode(&to_encode, &encode_output);
513 } else {
514 struct aws_byte_cursor to_encode =
515 aws_byte_cursor_from_array(header->header_value.variable_len_val, header->header_value_len);
516 aws_base64_encode(&to_encode, &encode_output);
517 }
518 fprintf(fd, " " DEBUG_STR_HEADER_VALUE "\"%s\"\n", encoded_buffer);
519 aws_mem_release(message->alloc, encoded_buffer);
520 }
521
522 fprintf(fd, " }");
523
524 if (count < headers_count - 1) {
525 fprintf(fd, ",");
526 }
527 fprintf(fd, "\n");
528
529 count++;
530 }
531 aws_event_stream_headers_list_cleanup(&headers);
532 fprintf(fd, " ],\n");
533
534 size_t payload_len = aws_event_stream_message_payload_len(message);
535 const uint8_t *payload = aws_event_stream_message_payload(message);
536 size_t encoded_len = 0;
537 aws_base64_compute_encoded_len(payload_len, &encoded_len);
538 char *encoded_payload = (char *)aws_mem_acquire(message->alloc, encoded_len);
539
540 if (!encoded_payload) {
541 return aws_raise_error(AWS_ERROR_OOM);
542 }
543
544 struct aws_byte_cursor payload_buffer = aws_byte_cursor_from_array(payload, payload_len);
545 struct aws_byte_buf encoded_payload_buffer = aws_byte_buf_from_array((uint8_t *)encoded_payload, encoded_len);
546
547 aws_base64_encode(&payload_buffer, &encoded_payload_buffer);
548 fprintf(fd, " \"payload\": \"%s\",\n", encoded_payload);
549 fprintf(fd, " " DEBUG_STR_MESSAGE_CRC "%d\n}\n", aws_event_stream_message_message_crc(message));
550
551 return AWS_OP_SUCCESS;
552}
553
554int aws_event_stream_headers_list_init(struct aws_array_list *headers, struct aws_allocator *allocator) {
555 AWS_ASSERT(headers);
556 AWS_ASSERT(allocator);
557
558 return aws_array_list_init_dynamic(headers, allocator, 4, sizeof(struct aws_event_stream_header_value_pair));
559}
560
561void aws_event_stream_headers_list_cleanup(struct aws_array_list *headers) {
562 AWS_ASSERT(headers);
563
564 for (size_t i = 0; i < aws_array_list_length(headers); ++i) {
565 struct aws_event_stream_header_value_pair *header = NULL;
566 aws_array_list_get_at_ptr(headers, (void **)&header, i);
567
568 if (header->value_owned) {
569 aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val);
570 }
571 }
572
573 aws_array_list_clean_up(headers);
574}
575
576static int s_add_variable_len_header(
577 struct aws_array_list *headers,
578 struct aws_event_stream_header_value_pair *header,
579 const char *name,
580 uint8_t name_len,
581 uint8_t *value,
582 uint16_t value_len,
583 int8_t copy) {
584
585 memcpy((void *)header->header_name, (void *)name, (size_t)name_len);
586
587 if (copy) {
588 header->header_value.variable_len_val = aws_mem_acquire(headers->alloc, value_len);
589 if (!header->header_value.variable_len_val) {
590 return aws_raise_error(AWS_ERROR_OOM);
591 }
592
593 header->value_owned = 1;
594 memcpy((void *)header->header_value.variable_len_val, (void *)value, value_len);
595 } else {
596 header->value_owned = 0;
597 header->header_value.variable_len_val = value;
598 }
599
600 if (aws_array_list_push_back(headers, (void *)header)) {
601 if (header->value_owned) {
602 aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val);
603 }
604 return AWS_OP_ERR;
605 }
606
607 return AWS_OP_SUCCESS;
608}
609
610int aws_event_stream_add_string_header(
611 struct aws_array_list *headers,
612 const char *name,
613 uint8_t name_len,
614 const char *value,
615 uint16_t value_len,
616 int8_t copy) {
617 struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
618 .header_value_len = value_len,
619 .value_owned = copy,
620 .header_value_type = AWS_EVENT_STREAM_HEADER_STRING};
621
622 return s_add_variable_len_header(headers, &header, name, name_len, (uint8_t *)value, value_len, copy);
623}
624
625int aws_event_stream_add_byte_header(struct aws_array_list *headers, const char *name, uint8_t name_len, int8_t value) {
626 struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
627 .header_value_len = 1,
628 .value_owned = 0,
629 .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE,
630 .header_value.static_val[0] = (uint8_t)value};
631
632 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
633
634 return aws_array_list_push_back(headers, (void *)&header);
635}
636
637int aws_event_stream_add_bool_header(struct aws_array_list *headers, const char *name, uint8_t name_len, int8_t value) {
638 struct aws_event_stream_header_value_pair header = {
639 .header_name_len = name_len,
640 .header_value_len = 0,
641 .value_owned = 0,
642 .header_value_type = value ? AWS_EVENT_STREAM_HEADER_BOOL_TRUE : AWS_EVENT_STREAM_HEADER_BOOL_FALSE,
643 };
644
645 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
646
647 return aws_array_list_push_back(headers, (void *)&header);
648}
649
650int aws_event_stream_add_int16_header(
651 struct aws_array_list *headers,
652 const char *name,
653 uint8_t name_len,
654 int16_t value) {
655 struct aws_event_stream_header_value_pair header = {
656 .header_name_len = name_len,
657 .header_value_len = sizeof(value),
658 .value_owned = 0,
659 .header_value_type = AWS_EVENT_STREAM_HEADER_INT16,
660 };
661
662 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
663 aws_write_u16((uint16_t)value, header.header_value.static_val);
664
665 return aws_array_list_push_back(headers, (void *)&header);
666}
667
668int aws_event_stream_add_int32_header(
669 struct aws_array_list *headers,
670 const char *name,
671 uint8_t name_len,
672 int32_t value) {
673 struct aws_event_stream_header_value_pair header = {
674 .header_name_len = name_len,
675 .header_value_len = sizeof(value),
676 .value_owned = 0,
677 .header_value_type = AWS_EVENT_STREAM_HEADER_INT32,
678 };
679
680 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
681 aws_write_u32((uint32_t)value, header.header_value.static_val);
682
683 return aws_array_list_push_back(headers, (void *)&header);
684}
685
686int aws_event_stream_add_int64_header(
687 struct aws_array_list *headers,
688 const char *name,
689 uint8_t name_len,
690 int64_t value) {
691 struct aws_event_stream_header_value_pair header = {
692 .header_name_len = name_len,
693 .header_value_len = sizeof(value),
694 .value_owned = 0,
695 .header_value_type = AWS_EVENT_STREAM_HEADER_INT64,
696 };
697
698 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
699 aws_write_u64((uint64_t)value, header.header_value.static_val);
700
701 return aws_array_list_push_back(headers, (void *)&header);
702}
703
704int aws_event_stream_add_bytebuf_header(
705 struct aws_array_list *headers,
706 const char *name,
707 uint8_t name_len,
708 uint8_t *value,
709 uint16_t value_len,
710 int8_t copy) {
711 struct aws_event_stream_header_value_pair header = {.header_name_len = name_len,
712 .header_value_len = value_len,
713 .value_owned = copy,
714 .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE_BUF};
715
716 return s_add_variable_len_header(headers, &header, name, name_len, value, value_len, copy);
717}
718
719int aws_event_stream_add_timestamp_header(
720 struct aws_array_list *headers,
721 const char *name,
722 uint8_t name_len,
723 int64_t value) {
724 struct aws_event_stream_header_value_pair header = {
725 .header_name_len = name_len,
726 .header_value_len = sizeof(uint64_t),
727 .value_owned = 0,
728 .header_value_type = AWS_EVENT_STREAM_HEADER_TIMESTAMP,
729 };
730
731 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
732 aws_write_u64((uint64_t)value, header.header_value.static_val);
733
734 return aws_array_list_push_back(headers, (void *)&header);
735}
736
737int aws_event_stream_add_uuid_header(
738 struct aws_array_list *headers,
739 const char *name,
740 uint8_t name_len,
741 const uint8_t *value) {
742 struct aws_event_stream_header_value_pair header = {
743 .header_name_len = name_len,
744 .header_value_len = 16,
745 .value_owned = 0,
746 .header_value_type = AWS_EVENT_STREAM_HEADER_UUID,
747 };
748
749 memcpy((void *)header.header_name, (void *)name, (size_t)name_len);
750 memcpy((void *)header.header_value.static_val, value, 16);
751
752 return aws_array_list_push_back(headers, (void *)&header);
753}
754
755struct aws_byte_buf aws_event_stream_header_name(struct aws_event_stream_header_value_pair *header) {
756 return aws_byte_buf_from_array((uint8_t *)header->header_name, header->header_name_len);
757}
758
759int8_t aws_event_stream_header_value_as_byte(struct aws_event_stream_header_value_pair *header) {
760 return (int8_t)header->header_value.static_val[0];
761}
762
763struct aws_byte_buf aws_event_stream_header_value_as_string(struct aws_event_stream_header_value_pair *header) {
764 return aws_event_stream_header_value_as_bytebuf(header);
765}
766
767int8_t aws_event_stream_header_value_as_bool(struct aws_event_stream_header_value_pair *header) {
768 return header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE ? (int8_t)1 : (int8_t)0;
769}
770
771int16_t aws_event_stream_header_value_as_int16(struct aws_event_stream_header_value_pair *header) {
772 return (int16_t)aws_read_u16(header->header_value.static_val);
773}
774
775int32_t aws_event_stream_header_value_as_int32(struct aws_event_stream_header_value_pair *header) {
776 return (int32_t)aws_read_u32(header->header_value.static_val);
777}
778
779int64_t aws_event_stream_header_value_as_int64(struct aws_event_stream_header_value_pair *header) {
780 return (int64_t)aws_read_u64(header->header_value.static_val);
781}
782
783struct aws_byte_buf aws_event_stream_header_value_as_bytebuf(struct aws_event_stream_header_value_pair *header) {
784 return aws_byte_buf_from_array(header->header_value.variable_len_val, header->header_value_len);
785}
786
787int64_t aws_event_stream_header_value_as_timestamp(struct aws_event_stream_header_value_pair *header) {
788 return aws_event_stream_header_value_as_int64(header);
789}
790
791struct aws_byte_buf aws_event_stream_header_value_as_uuid(struct aws_event_stream_header_value_pair *header) {
792 return aws_byte_buf_from_array(header->header_value.static_val, 16);
793}
794
795uint16_t aws_event_stream_header_value_length(struct aws_event_stream_header_value_pair *header) {
796 return header->header_value_len;
797}
798
799static struct aws_event_stream_message_prelude s_empty_prelude = {.total_len = 0, .headers_len = 0, .prelude_crc = 0};
800
801static void s_reset_header_state(struct aws_event_stream_streaming_decoder *decoder, uint8_t free_header_data) {
802
803 if (free_header_data && decoder->current_header.value_owned) {
804 aws_mem_release(decoder->alloc, (void *)decoder->current_header.header_value.variable_len_val);
805 }
806
807 memset((void *)&decoder->current_header, 0, sizeof(struct aws_event_stream_header_value_pair));
808}
809
810static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder);
811
812static int s_headers_state(
813 struct aws_event_stream_streaming_decoder *decoder,
814 const uint8_t *data,
815 size_t len,
816 size_t *processed);
817
818static int s_read_header_value(
819 struct aws_event_stream_streaming_decoder *decoder,
820 const uint8_t *data,
821 size_t len,
822 size_t *processed) {
823
824 size_t current_pos = decoder->message_pos;
825
826 size_t length_read = current_pos - decoder->current_header_value_offset;
827 struct aws_event_stream_header_value_pair *current_header = &decoder->current_header;
828
829 if (!length_read) {
830 /* save an allocation, this can only happen if the data we were handed is larger than the length of the header
831 * value. we don't really need to handle offsets in this case. This expects the user is living by the contract
832 * that they cannot act like they own this memory beyond the lifetime of their callback, and they should not
833 * mutate it */
834 if (len >= current_header->header_value_len) {
835 /* this part works regardless of type since the layout of the union will line up. */
836 current_header->header_value.variable_len_val = (uint8_t *)data;
837 current_header->value_owned = 0;
838 decoder->on_header(decoder, &decoder->prelude, &decoder->current_header, decoder->user_context);
839 *processed += current_header->header_value_len;
840 decoder->message_pos += current_header->header_value_len;
841 decoder->running_crc =
842 aws_checksums_crc32(data, (int)current_header->header_value_len, decoder->running_crc);
843
844 s_reset_header_state(decoder, 1);
845 decoder->state = s_headers_state;
846 return AWS_OP_SUCCESS;
847 }
848
849 /* a possible optimization later would be to only allocate this once, and then keep reusing the same buffer. for
850 * subsequent messages.*/
851 if (current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
852 current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING) {
853 current_header->header_value.variable_len_val =
854 aws_mem_acquire(decoder->alloc, decoder->current_header.header_value_len);
855
856 if (!current_header->header_value.variable_len_val) {
857 return aws_raise_error(AWS_ERROR_OOM);
858 }
859
860 current_header->value_owned = 1;
861 }
862 }
863
864 size_t max_read =
865 len >= current_header->header_value_len - length_read ? current_header->header_value_len - length_read : len;
866
867 const uint8_t *header_value_alias = current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
868 current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING
869 ? current_header->header_value.variable_len_val
870 : current_header->header_value.static_val;
871
872 memcpy((void *)(header_value_alias + length_read), data, max_read);
873 decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc);
874
875 *processed += max_read;
876 decoder->message_pos += max_read;
877 length_read += max_read;
878
879 if (length_read == current_header->header_value_len) {
880 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
881 s_reset_header_state(decoder, 1);
882 decoder->state = s_headers_state;
883 }
884
885 return AWS_OP_SUCCESS;
886}
887
888static int s_read_header_value_len(
889 struct aws_event_stream_streaming_decoder *decoder,
890 const uint8_t *data,
891 size_t len,
892 size_t *processed) {
893 size_t current_pos = decoder->message_pos;
894
895 size_t length_portion_read = current_pos - decoder->current_header_value_offset;
896
897 if (length_portion_read < sizeof(uint16_t)) {
898 size_t max_to_read =
899 len > sizeof(uint16_t) - length_portion_read ? sizeof(uint16_t) - length_portion_read : len;
900 memcpy(decoder->working_buffer + length_portion_read, data, max_to_read);
901 decoder->running_crc = aws_checksums_crc32(data, (int)max_to_read, decoder->running_crc);
902
903 *processed += max_to_read;
904 decoder->message_pos += max_to_read;
905
906 length_portion_read = decoder->message_pos - decoder->current_header_value_offset;
907 }
908
909 if (length_portion_read == sizeof(uint16_t)) {
910 decoder->current_header.header_value_len = aws_read_u16(decoder->working_buffer);
911 decoder->current_header_value_offset = decoder->message_pos;
912 decoder->state = s_read_header_value;
913 }
914
915 return AWS_OP_SUCCESS;
916}
917
918static int s_read_header_type(
919 struct aws_event_stream_streaming_decoder *decoder,
920 const uint8_t *data,
921 size_t len,
922 size_t *processed) {
923 (void)len;
924 uint8_t type = *data;
925 decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc);
926 *processed += 1;
927 decoder->message_pos++;
928 decoder->current_header_value_offset++;
929 struct aws_event_stream_header_value_pair *current_header = &decoder->current_header;
930
931 if (type >= AWS_EVENT_STREAM_HEADER_BOOL_FALSE && type <= AWS_EVENT_STREAM_HEADER_UUID) {
932 current_header->header_value_type = (enum aws_event_stream_header_value_type)type;
933
934 switch (type) {
935 case AWS_EVENT_STREAM_HEADER_STRING:
936 case AWS_EVENT_STREAM_HEADER_BYTE_BUF:
937 decoder->state = s_read_header_value_len;
938 break;
939 case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:
940 current_header->header_value_len = 0;
941 current_header->header_value.static_val[0] = 0;
942 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
943 s_reset_header_state(decoder, 1);
944 break;
945 case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:
946 current_header->header_value_len = 0;
947 current_header->header_value.static_val[0] = 1;
948 decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
949 s_reset_header_state(decoder, 1);
950 break;
951 case AWS_EVENT_STREAM_HEADER_BYTE:
952 current_header->header_value_len = 1;
953 decoder->state = s_read_header_value;
954 break;
955 case AWS_EVENT_STREAM_HEADER_INT16:
956 current_header->header_value_len = sizeof(uint16_t);
957 decoder->state = s_read_header_value;
958 break;
959 case AWS_EVENT_STREAM_HEADER_INT32:
960 current_header->header_value_len = sizeof(uint32_t);
961 decoder->state = s_read_header_value;
962 break;
963 case AWS_EVENT_STREAM_HEADER_INT64:
964 case AWS_EVENT_STREAM_HEADER_TIMESTAMP:
965 current_header->header_value_len = sizeof(uint64_t);
966 decoder->state = s_read_header_value;
967 break;
968 case AWS_EVENT_STREAM_HEADER_UUID:
969 current_header->header_value_len = 16;
970 decoder->state = s_read_header_value;
971 break;
972 default:
973 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE);
974 }
975
976 return AWS_OP_SUCCESS;
977 }
978
979 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE);
980}
981
982static int s_read_header_name(
983 struct aws_event_stream_streaming_decoder *decoder,
984 const uint8_t *data,
985 size_t len,
986 size_t *processed) {
987 size_t current_pos = decoder->message_pos;
988
989 size_t length_read = current_pos - decoder->current_header_name_offset;
990
991 size_t max_read = len >= decoder->current_header.header_name_len - length_read
992 ? decoder->current_header.header_name_len - length_read
993 : len;
994 memcpy((void *)(decoder->current_header.header_name + length_read), data, max_read);
995 decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc);
996
997 *processed += max_read;
998 decoder->message_pos += max_read;
999 length_read += max_read;
1000
1001 if (length_read == decoder->current_header.header_name_len) {
1002 decoder->state = s_read_header_type;
1003 decoder->current_header_value_offset = decoder->message_pos;
1004 }
1005
1006 return AWS_OP_SUCCESS;
1007}
1008
1009static int s_read_header_name_len(
1010 struct aws_event_stream_streaming_decoder *decoder,
1011 const uint8_t *data,
1012 size_t len,
1013 size_t *processed) {
1014 (void)len;
1015 decoder->current_header.header_name_len = *data;
1016 decoder->message_pos++;
1017 decoder->current_header_name_offset++;
1018 *processed += 1;
1019 decoder->state = s_read_header_name;
1020 decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc);
1021
1022 return AWS_OP_SUCCESS;
1023}
1024
1025static int s_start_header(
1026 struct aws_event_stream_streaming_decoder *decoder,
1027 const uint8_t *data,
1028 size_t len,
1029 size_t *processed) /* NOLINT */ {
1030 (void)data;
1031 (void)len;
1032 (void)processed;
1033 decoder->state = s_read_header_name_len;
1034 decoder->current_header_name_offset = decoder->message_pos;
1035
1036 return AWS_OP_SUCCESS;
1037}
1038
1039static int s_payload_state(
1040 struct aws_event_stream_streaming_decoder *decoder,
1041 const uint8_t *data,
1042 size_t len,
1043 size_t *processed);
1044
1045/*Handles the initial state for header parsing.
1046 will oscillate between multiple other states as well.
1047 after all headers have been handled, payload will be set as the next state. */
1048static int s_headers_state(
1049 struct aws_event_stream_streaming_decoder *decoder,
1050 const uint8_t *data,
1051 size_t len,
1052 size_t *processed) /* NOLINT */ {
1053 (void)data;
1054 (void)len;
1055 (void)processed;
1056
1057 size_t current_pos = decoder->message_pos;
1058
1059 size_t headers_boundary = decoder->prelude.headers_len + AWS_EVENT_STREAM_PRELUDE_LENGTH;
1060
1061 if (current_pos < headers_boundary) {
1062 decoder->state = s_start_header;
1063 return AWS_OP_SUCCESS;
1064 }
1065
1066 if (current_pos == headers_boundary) {
1067 decoder->state = s_payload_state;
1068 return AWS_OP_SUCCESS;
1069 }
1070
1071 return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE);
1072}
1073
1074/* handles reading the trailer. Once it has been read, it will be compared to the running checksum. If successful,
1075 * the state will be reset. */
1076static int s_read_trailer_state(
1077 struct aws_event_stream_streaming_decoder *decoder,
1078 const uint8_t *data,
1079 size_t len,
1080 size_t *processed) {
1081
1082 size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos;
1083 size_t segment_length = len > remaining_amount ? remaining_amount : len;
1084 size_t offset = sizeof(uint32_t) - remaining_amount;
1085 memcpy(decoder->working_buffer + offset, data, segment_length);
1086 decoder->message_pos += segment_length;
1087 *processed += segment_length;
1088
1089 if (decoder->message_pos == decoder->prelude.total_len) {
1090 uint32_t message_crc = aws_read_u32(decoder->working_buffer);
1091
1092 if (message_crc == decoder->running_crc) {
1093 s_reset_state(decoder);
1094 } else {
1095 char error_message[70];
1096 snprintf(
1097 error_message,
1098 sizeof(error_message),
1099 "CRC Mismatch. message_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32,
1100 message_crc,
1101 decoder->running_crc);
1102 aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE);
1103 decoder->on_error(
1104 decoder,
1105 &decoder->prelude,
1106 AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE,
1107 error_message,
1108 decoder->user_context);
1109 return AWS_OP_ERR;
1110 }
1111 }
1112
1113 return AWS_OP_SUCCESS;
1114}
1115
1116/* handles the reading of the payload up to the final checksum. Sets read_trailer_state as the new state once
1117 * the payload has been processed. */
1118static int s_payload_state(
1119 struct aws_event_stream_streaming_decoder *decoder,
1120 const uint8_t *data,
1121 size_t len,
1122 size_t *processed) {
1123
1124 if (decoder->message_pos < decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) {
1125 size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos - AWS_EVENT_STREAM_TRAILER_LENGTH;
1126 size_t segment_length = len > remaining_amount ? remaining_amount : len;
1127 int8_t final_segment =
1128 (segment_length + decoder->message_pos) == (decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH);
1129 struct aws_byte_buf payload_buf = aws_byte_buf_from_array(data, segment_length);
1130 decoder->on_payload(decoder, &payload_buf, final_segment, decoder->user_context);
1131 decoder->message_pos += segment_length;
1132 decoder->running_crc = aws_checksums_crc32(data, (int)segment_length, decoder->running_crc);
1133 *processed += segment_length;
1134 }
1135
1136 if (decoder->message_pos == decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) {
1137 decoder->state = s_read_trailer_state;
1138 }
1139
1140 return AWS_OP_SUCCESS;
1141}
1142
1143/* Parses the payload and verifies checksums. Sets the next state if successful. */
1144static int s_verify_prelude_state(
1145 struct aws_event_stream_streaming_decoder *decoder,
1146 const uint8_t *data,
1147 size_t len,
1148 size_t *processed) /* NOLINT */ {
1149 (void)data;
1150 (void)len;
1151 (void)processed;
1152
1153 decoder->prelude.headers_len = aws_read_u32(decoder->working_buffer + HEADER_LEN_OFFSET);
1154 decoder->prelude.prelude_crc = aws_read_u32(decoder->working_buffer + PRELUDE_CRC_OFFSET);
1155 decoder->prelude.total_len = aws_read_u32(decoder->working_buffer + TOTAL_LEN_OFFSET);
1156
1157 decoder->running_crc = aws_checksums_crc32(decoder->working_buffer, PRELUDE_CRC_OFFSET, 0);
1158
1159 if (AWS_LIKELY(decoder->running_crc == decoder->prelude.prelude_crc)) {
1160
1161 if (AWS_UNLIKELY(
1162 decoder->prelude.headers_len > MAX_HEADERS_SIZE || decoder->prelude.total_len > MAX_MESSAGE_SIZE)) {
1163 aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
1164 char error_message[] = "Maximum message field size exceeded";
1165
1166 decoder->on_error(
1167 decoder,
1168 &decoder->prelude,
1169 AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED,
1170 error_message,
1171 decoder->user_context);
1172 return AWS_OP_ERR;
1173 }
1174
1175 /* Should only call on_prelude() after passing crc check and limitation check, otherwise call on_prelude() with
1176 * incorrect prelude is error prune. */
1177 decoder->on_prelude(decoder, &decoder->prelude, decoder->user_context);
1178
1179 decoder->running_crc = aws_checksums_crc32(
1180 decoder->working_buffer + PRELUDE_CRC_OFFSET,
1181 (int)sizeof(decoder->prelude.prelude_crc),
1182 decoder->running_crc);
1183 memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer));
1184 decoder->state = decoder->prelude.headers_len > 0 ? s_headers_state : s_payload_state;
1185 } else {
1186 char error_message[70];
1187 snprintf(
1188 error_message,
1189 sizeof(error_message),
1190 "CRC Mismatch. prelude_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32,
1191 decoder->prelude.prelude_crc,
1192 decoder->running_crc);
1193
1194 aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
1195 decoder->on_error(
1196 decoder,
1197 &decoder->prelude,
1198 AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE,
1199 error_message,
1200 decoder->user_context);
1201 return AWS_OP_ERR;
1202 }
1203
1204 return AWS_OP_SUCCESS;
1205}
1206
1207/* initial state handles up to the reading of the prelude */
1208static int s_start_state(
1209 struct aws_event_stream_streaming_decoder *decoder,
1210 const uint8_t *data,
1211 size_t len,
1212 size_t *processed) {
1213
1214 size_t previous_position = decoder->message_pos;
1215 if (decoder->message_pos < AWS_EVENT_STREAM_PRELUDE_LENGTH) {
1216 if (len >= AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos) {
1217 memcpy(
1218 decoder->working_buffer + decoder->message_pos,
1219 data,
1220 AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos);
1221 decoder->message_pos += AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos;
1222 } else {
1223 memcpy(decoder->working_buffer + decoder->message_pos, data, len);
1224 decoder->message_pos += len;
1225 }
1226
1227 *processed += decoder->message_pos - previous_position;
1228 }
1229
1230 if (decoder->message_pos == AWS_EVENT_STREAM_PRELUDE_LENGTH) {
1231 decoder->state = s_verify_prelude_state;
1232 }
1233
1234 return AWS_OP_SUCCESS;
1235}
1236
1237static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder) {
1238 decoder->message_pos = 0;
1239 decoder->prelude = s_empty_prelude;
1240 decoder->running_crc = 0;
1241 memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer));
1242 decoder->state = s_start_state;
1243}
1244
1245void aws_event_stream_streaming_decoder_init(
1246 struct aws_event_stream_streaming_decoder *decoder,
1247 struct aws_allocator *alloc,
1248 aws_event_stream_process_on_payload_segment_fn *on_payload_segment,
1249 aws_event_stream_prelude_received_fn *on_prelude,
1250 aws_event_stream_header_received_fn *on_header,
1251 aws_event_stream_on_error_fn *on_error,
1252 void *user_data) {
1253
1254 s_reset_state(decoder);
1255 decoder->alloc = alloc;
1256 decoder->on_error = on_error;
1257 decoder->on_header = on_header;
1258 decoder->on_payload = on_payload_segment;
1259 decoder->on_prelude = on_prelude;
1260 decoder->user_context = user_data;
1261}
1262
1263void aws_event_stream_streaming_decoder_clean_up(struct aws_event_stream_streaming_decoder *decoder) {
1264 s_reset_state(decoder);
1265 decoder->on_error = 0;
1266 decoder->on_header = 0;
1267 decoder->on_payload = 0;
1268 decoder->on_prelude = 0;
1269 decoder->user_context = 0;
1270}
1271
1272/* Simply sends the data to the state machine until all has been processed or an error is returned. */
1273int aws_event_stream_streaming_decoder_pump(
1274 struct aws_event_stream_streaming_decoder *decoder,
1275 const struct aws_byte_buf *data) {
1276
1277 size_t processed = 0;
1278 int err_val = 0;
1279 while (!err_val && data->buffer && data->len && processed < data->len) {
1280 err_val = decoder->state(decoder, data->buffer + processed, data->len - processed, &processed);
1281 }
1282
1283 return err_val;
1284}
1285#if _MSC_VER
1286# pragma warning(pop)
1287#endif
1288