1 | /* |
2 | * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"). |
5 | * You may not use this file except in compliance with the License. |
6 | * A copy of the License is located at |
7 | * |
8 | * http://aws.amazon.com/apache2.0 |
9 | * |
10 | * or in the "license" file accompanying this file. This file is distributed |
11 | * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
12 | * express or implied. See the License for the specific language governing |
13 | * permissions and limitations under the License. |
14 | */ |
15 | |
16 | #include <aws/event-stream/event_stream.h> |
17 | |
18 | #include <aws/checksums/crc.h> |
19 | |
20 | #include <aws/common/encoding.h> |
21 | |
22 | #include <inttypes.h> |
23 | |
24 | /* max message size is 16MB */ |
25 | #define MAX_MESSAGE_SIZE (16 * 1024 * 1024) |
26 | |
27 | /* max header size is 128kb */ |
28 | #define (128 * 1024) |
29 | #define LIB_NAME "libaws-c-event-stream" |
30 | |
31 | #if _MSC_VER |
32 | # pragma warning(push) |
33 | # pragma warning(disable : 4221) /* aggregate initializer using local variable addresses */ |
34 | # pragma warning(disable : 4204) /* non-constant aggregate initializer */ |
35 | # pragma warning(disable : 4306) /* msft doesn't trust us to do pointer arithmetic. */ |
36 | #endif |
37 | |
38 | static struct aws_error_info s_errors[] = { |
39 | AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH, "Buffer length mismatch" , LIB_NAME), |
40 | AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_INSUFFICIENT_BUFFER_LEN, "insufficient buffer length" , LIB_NAME), |
41 | AWS_DEFINE_ERROR_INFO( |
42 | AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED, |
43 | "a field for the message was too large" , |
44 | LIB_NAME), |
45 | AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, "prelude checksum was incorrect" , LIB_NAME), |
46 | AWS_DEFINE_ERROR_INFO(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE, "message checksum was incorrect" , LIB_NAME), |
47 | AWS_DEFINE_ERROR_INFO( |
48 | AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN, |
49 | "message headers length was incorrect" , |
50 | LIB_NAME), |
51 | AWS_DEFINE_ERROR_INFO( |
52 | AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE, |
53 | "An unknown header type was encountered" , |
54 | LIB_NAME), |
55 | AWS_DEFINE_ERROR_INFO( |
56 | AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE, |
57 | "message parser encountered an illegal state" , |
58 | LIB_NAME), |
59 | }; |
60 | |
61 | static struct aws_error_info_list s_list = { |
62 | .error_list = s_errors, |
63 | .count = sizeof(s_errors) / sizeof(struct aws_error_info), |
64 | }; |
65 | |
66 | static bool s_event_stream_library_initialized = false; |
67 | |
68 | void aws_event_stream_library_init(struct aws_allocator *allocator) { |
69 | if (!s_event_stream_library_initialized) { |
70 | s_event_stream_library_initialized = true; |
71 | aws_common_library_init(allocator); |
72 | aws_register_error_info(&s_list); |
73 | } |
74 | } |
75 | |
76 | void aws_event_stream_library_clean_up(void) { |
77 | if (s_event_stream_library_initialized) { |
78 | s_event_stream_library_initialized = false; |
79 | aws_unregister_error_info(&s_list); |
80 | aws_common_library_clean_up(); |
81 | } |
82 | } |
83 | |
84 | #define TOTAL_LEN_OFFSET 0 |
85 | #define PRELUDE_CRC_OFFSET (sizeof(uint32_t) + sizeof(uint32_t)) |
86 | #define sizeof(uint32_t) |
87 | |
88 | /* Computes the byte length necessary to store the headers represented in the headers list. |
89 | * returns that length. */ |
90 | uint32_t (struct aws_array_list *) { |
91 | if (!headers || !aws_array_list_length(headers)) { |
92 | return 0; |
93 | } |
94 | |
95 | size_t = aws_array_list_length(headers); |
96 | size_t = 0; |
97 | |
98 | for (size_t i = 0; i < headers_count; ++i) { |
99 | struct aws_event_stream_header_value_pair * = NULL; |
100 | |
101 | aws_array_list_get_at_ptr(headers, (void **)&header, i); |
102 | |
103 | headers_len += sizeof(header->header_name_len) + header->header_name_len + 1; |
104 | |
105 | if (header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING || |
106 | header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF) { |
107 | headers_len += sizeof(header->header_value_len); |
108 | } |
109 | |
110 | if (header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_FALSE && |
111 | header->header_value_type != AWS_EVENT_STREAM_HEADER_BOOL_TRUE) { |
112 | headers_len += header->header_value_len; |
113 | } |
114 | } |
115 | |
116 | return (uint32_t)headers_len; |
117 | } |
118 | |
119 | /* adds the headers represented in the headers list to the buffer. |
120 | returns the new buffer offset for use elsewhere. Assumes buffer length is at least the length of the return value |
121 | from compute_headers_length() */ |
122 | size_t (struct aws_array_list *, uint8_t *buffer) { |
123 | if (!headers || !aws_array_list_length(headers)) { |
124 | return 0; |
125 | } |
126 | |
127 | size_t = aws_array_list_length(headers); |
128 | uint8_t *buffer_alias = buffer; |
129 | |
130 | for (size_t i = 0; i < headers_count; ++i) { |
131 | struct aws_event_stream_header_value_pair * = NULL; |
132 | |
133 | aws_array_list_get_at_ptr(headers, (void **)&header, i); |
134 | *buffer_alias = (uint8_t)header->header_name_len; |
135 | buffer_alias++; |
136 | memcpy(buffer_alias, header->header_name, (size_t)header->header_name_len); |
137 | buffer_alias += header->header_name_len; |
138 | *buffer_alias = (uint8_t)header->header_value_type; |
139 | buffer_alias++; |
140 | switch (header->header_value_type) { |
141 | case AWS_EVENT_STREAM_HEADER_BOOL_FALSE: |
142 | case AWS_EVENT_STREAM_HEADER_BOOL_TRUE: |
143 | break; |
144 | case AWS_EVENT_STREAM_HEADER_BYTE: |
145 | *buffer_alias = header->header_value.static_val[0]; |
146 | buffer_alias++; |
147 | break; |
148 | /* additions of integers here assume the endianness conversion has already happened */ |
149 | case AWS_EVENT_STREAM_HEADER_INT16: |
150 | memcpy(buffer_alias, header->header_value.static_val, sizeof(uint16_t)); |
151 | buffer_alias += sizeof(uint16_t); |
152 | break; |
153 | case AWS_EVENT_STREAM_HEADER_INT32: |
154 | memcpy(buffer_alias, header->header_value.static_val, sizeof(uint32_t)); |
155 | buffer_alias += sizeof(uint32_t); |
156 | break; |
157 | case AWS_EVENT_STREAM_HEADER_INT64: |
158 | case AWS_EVENT_STREAM_HEADER_TIMESTAMP: |
159 | memcpy(buffer_alias, header->header_value.static_val, sizeof(uint64_t)); |
160 | buffer_alias += sizeof(uint64_t); |
161 | break; |
162 | case AWS_EVENT_STREAM_HEADER_BYTE_BUF: |
163 | case AWS_EVENT_STREAM_HEADER_STRING: |
164 | aws_write_u16(header->header_value_len, buffer_alias); |
165 | buffer_alias += sizeof(uint16_t); |
166 | memcpy(buffer_alias, header->header_value.variable_len_val, header->header_value_len); |
167 | buffer_alias += header->header_value_len; |
168 | break; |
169 | case AWS_EVENT_STREAM_HEADER_UUID: |
170 | memcpy(buffer_alias, header->header_value.static_val, 16); |
171 | buffer_alias += header->header_value_len; |
172 | break; |
173 | } |
174 | } |
175 | |
176 | return buffer_alias - buffer; |
177 | } |
178 | |
179 | /* Get the headers from the buffer, store them in the headers list. |
180 | * the user's reponsibility to cleanup the list when they are finished with it. |
181 | * no buffer copies happen here, the lifetime of the buffer, must outlive the usage of the headers. |
182 | * returns error codes defined in the public interface. */ |
183 | int (struct aws_array_list *, const uint8_t *buffer, size_t ) { |
184 | |
185 | if (AWS_UNLIKELY(headers_len > MAX_HEADERS_SIZE)) { |
186 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED); |
187 | } |
188 | |
189 | /* iterate the buffer per header. */ |
190 | const uint8_t *buffer_start = buffer; |
191 | while ((size_t)(buffer - buffer_start) < headers_len) { |
192 | struct aws_event_stream_header_value_pair ; |
193 | AWS_ZERO_STRUCT(header); |
194 | |
195 | /* get the header info from the buffer, make sure to increment buffer offset. */ |
196 | header.header_name_len = *buffer; |
197 | buffer += sizeof(header.header_name_len); |
198 | memcpy((void *)header.header_name, buffer, (size_t)header.header_name_len); |
199 | buffer += header.header_name_len; |
200 | header.header_value_type = (enum aws_event_stream_header_value_type) * buffer; |
201 | buffer++; |
202 | |
203 | switch (header.header_value_type) { |
204 | case AWS_EVENT_STREAM_HEADER_BOOL_FALSE: |
205 | header.header_value_len = 0; |
206 | header.header_value.static_val[0] = 0; |
207 | break; |
208 | case AWS_EVENT_STREAM_HEADER_BOOL_TRUE: |
209 | header.header_value_len = 0; |
210 | header.header_value.static_val[0] = 1; |
211 | break; |
212 | case AWS_EVENT_STREAM_HEADER_BYTE: |
213 | header.header_value_len = sizeof(uint8_t); |
214 | header.header_value.static_val[0] = *buffer; |
215 | buffer++; |
216 | break; |
217 | case AWS_EVENT_STREAM_HEADER_INT16: |
218 | header.header_value_len = sizeof(uint16_t); |
219 | memcpy(header.header_value.static_val, buffer, sizeof(uint16_t)); |
220 | buffer += sizeof(uint16_t); |
221 | break; |
222 | case AWS_EVENT_STREAM_HEADER_INT32: |
223 | header.header_value_len = sizeof(uint32_t); |
224 | memcpy(header.header_value.static_val, buffer, sizeof(uint32_t)); |
225 | buffer += sizeof(uint32_t); |
226 | break; |
227 | case AWS_EVENT_STREAM_HEADER_INT64: |
228 | case AWS_EVENT_STREAM_HEADER_TIMESTAMP: |
229 | header.header_value_len = sizeof(uint64_t); |
230 | memcpy(header.header_value.static_val, buffer, sizeof(uint64_t)); |
231 | buffer += sizeof(uint64_t); |
232 | break; |
233 | case AWS_EVENT_STREAM_HEADER_BYTE_BUF: |
234 | case AWS_EVENT_STREAM_HEADER_STRING: |
235 | header.header_value_len = aws_read_u16(buffer); |
236 | buffer += sizeof(header.header_value_len); |
237 | header.header_value.variable_len_val = (uint8_t *)buffer; |
238 | buffer += header.header_value_len; |
239 | break; |
240 | case AWS_EVENT_STREAM_HEADER_UUID: |
241 | header.header_value_len = 16; |
242 | memcpy(header.header_value.static_val, buffer, 16); |
243 | buffer += header.header_value_len; |
244 | break; |
245 | } |
246 | |
247 | if (aws_array_list_push_back(headers, (const void *)&header)) { |
248 | return AWS_OP_ERR; |
249 | } |
250 | } |
251 | |
252 | return AWS_OP_SUCCESS; |
253 | } |
254 | |
255 | /* initialize message with the arguments |
256 | * the underlying buffer will be allocated and payload will be copied. |
257 | * see specification, this code should simply add these fields according to that.*/ |
258 | int aws_event_stream_message_init( |
259 | struct aws_event_stream_message *message, |
260 | struct aws_allocator *alloc, |
261 | struct aws_array_list *, |
262 | struct aws_byte_buf *payload) { |
263 | |
264 | size_t payload_len = payload ? payload->len : 0; |
265 | |
266 | uint32_t = compute_headers_len(headers); |
267 | |
268 | if (AWS_UNLIKELY(headers_length > MAX_HEADERS_SIZE)) { |
269 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED); |
270 | } |
271 | |
272 | uint32_t total_length = |
273 | (uint32_t)(AWS_EVENT_STREAM_PRELUDE_LENGTH + headers_length + payload_len + AWS_EVENT_STREAM_TRAILER_LENGTH); |
274 | |
275 | if (AWS_UNLIKELY(total_length < headers_length || total_length < payload_len)) { |
276 | return aws_raise_error(AWS_ERROR_OVERFLOW_DETECTED); |
277 | } |
278 | |
279 | if (AWS_UNLIKELY(total_length > MAX_MESSAGE_SIZE)) { |
280 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED); |
281 | } |
282 | |
283 | message->alloc = alloc; |
284 | message->message_buffer = aws_mem_acquire(message->alloc, total_length); |
285 | |
286 | if (message->message_buffer) { |
287 | message->owns_buffer = 1; |
288 | aws_write_u32(total_length, message->message_buffer); |
289 | uint8_t *buffer_offset = message->message_buffer + sizeof(total_length); |
290 | aws_write_u32(headers_length, buffer_offset); |
291 | buffer_offset += sizeof(headers_length); |
292 | |
293 | uint32_t running_crc = |
294 | aws_checksums_crc32(message->message_buffer, (int)(buffer_offset - message->message_buffer), 0); |
295 | |
296 | const uint8_t *message_crc_boundary_start = buffer_offset; |
297 | aws_write_u32(running_crc, buffer_offset); |
298 | buffer_offset += sizeof(running_crc); |
299 | |
300 | if (headers_length) { |
301 | buffer_offset += add_headers_to_buffer(headers, buffer_offset); |
302 | } |
303 | |
304 | if (payload) { |
305 | memcpy(buffer_offset, payload->buffer, payload->len); |
306 | buffer_offset += payload->len; |
307 | } |
308 | |
309 | running_crc = aws_checksums_crc32( |
310 | message_crc_boundary_start, (int)(buffer_offset - message_crc_boundary_start), running_crc); |
311 | aws_write_u32(running_crc, buffer_offset); |
312 | |
313 | return AWS_OP_SUCCESS; |
314 | } |
315 | |
316 | return aws_raise_error(AWS_ERROR_OOM); |
317 | } |
318 | |
319 | /* add buffer to the message (non-owning). Verify buffer crcs and that length fields are reasonable. */ |
320 | int aws_event_stream_message_from_buffer( |
321 | struct aws_event_stream_message *message, |
322 | struct aws_allocator *alloc, |
323 | struct aws_byte_buf *buffer) { |
324 | AWS_ASSERT(buffer); |
325 | |
326 | message->alloc = alloc; |
327 | message->owns_buffer = 0; |
328 | |
329 | if (AWS_UNLIKELY(buffer->len < AWS_EVENT_STREAM_PRELUDE_LENGTH + AWS_EVENT_STREAM_TRAILER_LENGTH)) { |
330 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH); |
331 | } |
332 | |
333 | uint32_t message_length = aws_read_u32(buffer->buffer + TOTAL_LEN_OFFSET); |
334 | |
335 | if (AWS_UNLIKELY(message_length != buffer->len)) { |
336 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_BUFFER_LENGTH_MISMATCH); |
337 | } |
338 | |
339 | if (AWS_UNLIKELY(message_length > MAX_MESSAGE_SIZE)) { |
340 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED); |
341 | } |
342 | |
343 | uint32_t running_crc = aws_checksums_crc32(buffer->buffer, (int)PRELUDE_CRC_OFFSET, 0); |
344 | uint32_t prelude_crc = aws_read_u32(buffer->buffer + PRELUDE_CRC_OFFSET); |
345 | |
346 | if (running_crc != prelude_crc) { |
347 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE); |
348 | } |
349 | |
350 | running_crc = aws_checksums_crc32( |
351 | buffer->buffer + PRELUDE_CRC_OFFSET, |
352 | (int)(message_length - PRELUDE_CRC_OFFSET - AWS_EVENT_STREAM_TRAILER_LENGTH), |
353 | running_crc); |
354 | uint32_t message_crc = aws_read_u32(buffer->buffer + message_length - AWS_EVENT_STREAM_TRAILER_LENGTH); |
355 | |
356 | if (running_crc != message_crc) { |
357 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE); |
358 | } |
359 | |
360 | message->message_buffer = buffer->buffer; |
361 | |
362 | if (aws_event_stream_message_headers_len(message) > |
363 | message_length - AWS_EVENT_STREAM_PRELUDE_LENGTH - AWS_EVENT_STREAM_TRAILER_LENGTH) { |
364 | message->message_buffer = 0; |
365 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_INVALID_HEADERS_LEN); |
366 | } |
367 | |
368 | return AWS_OP_SUCCESS; |
369 | } |
370 | |
371 | /* Verify buffer crcs and that length fields are reasonable. Once that is done, the buffer is copied to the message. */ |
372 | int aws_event_stream_message_from_buffer_copy( |
373 | struct aws_event_stream_message *message, |
374 | struct aws_allocator *alloc, |
375 | const struct aws_byte_buf *buffer) { |
376 | int parse_value = aws_event_stream_message_from_buffer(message, alloc, (struct aws_byte_buf *)buffer); |
377 | |
378 | if (!parse_value) { |
379 | message->message_buffer = aws_mem_acquire(alloc, buffer->len); |
380 | |
381 | if (message->message_buffer) { |
382 | memcpy(message->message_buffer, buffer->buffer, buffer->len); |
383 | message->alloc = alloc; |
384 | message->owns_buffer = 1; |
385 | |
386 | return AWS_OP_SUCCESS; |
387 | } |
388 | |
389 | return aws_raise_error(AWS_ERROR_OOM); |
390 | } |
391 | |
392 | return parse_value; |
393 | } |
394 | |
395 | /* if buffer is owned, release the memory. */ |
396 | void aws_event_stream_message_clean_up(struct aws_event_stream_message *message) { |
397 | if (message->message_buffer && message->owns_buffer) { |
398 | aws_mem_release(message->alloc, message->message_buffer); |
399 | } |
400 | } |
401 | |
402 | uint32_t aws_event_stream_message_total_length(const struct aws_event_stream_message *message) { |
403 | return aws_read_u32(message->message_buffer + TOTAL_LEN_OFFSET); |
404 | } |
405 | |
406 | uint32_t (const struct aws_event_stream_message *message) { |
407 | return aws_read_u32(message->message_buffer + HEADER_LEN_OFFSET); |
408 | } |
409 | |
410 | uint32_t aws_event_stream_message_prelude_crc(const struct aws_event_stream_message *message) { |
411 | return aws_read_u32(message->message_buffer + PRELUDE_CRC_OFFSET); |
412 | } |
413 | |
414 | int (const struct aws_event_stream_message *message, struct aws_array_list *) { |
415 | return get_headers_from_buffer( |
416 | headers, |
417 | message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH, |
418 | aws_event_stream_message_headers_len(message)); |
419 | } |
420 | |
421 | const uint8_t *aws_event_stream_message_payload(const struct aws_event_stream_message *message) { |
422 | return message->message_buffer + AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message); |
423 | } |
424 | |
425 | uint32_t aws_event_stream_message_payload_len(const struct aws_event_stream_message *message) { |
426 | return aws_event_stream_message_total_length(message) - |
427 | (AWS_EVENT_STREAM_PRELUDE_LENGTH + aws_event_stream_message_headers_len(message) + |
428 | AWS_EVENT_STREAM_TRAILER_LENGTH); |
429 | } |
430 | |
431 | uint32_t aws_event_stream_message_message_crc(const struct aws_event_stream_message *message) { |
432 | return aws_read_u32( |
433 | message->message_buffer + (aws_event_stream_message_total_length(message) - AWS_EVENT_STREAM_TRAILER_LENGTH)); |
434 | } |
435 | |
436 | const uint8_t *aws_event_stream_message_buffer(const struct aws_event_stream_message *message) { |
437 | return message->message_buffer; |
438 | } |
439 | |
440 | #define DEBUG_STR_PRELUDE_TOTAL_LEN "\"total_length\": " |
441 | #define DEBUG_STR_PRELUDE_HDRS_LEN "\"headers_length\": " |
442 | #define DEBUG_STR_PRELUDE_CRC "\"prelude_crc\": " |
443 | #define DEBUG_STR_MESSAGE_CRC "\"message_crc\": " |
444 | #define "\"name\": " |
445 | #define "\"value\": " |
446 | #define "\"type\": " |
447 | |
448 | int aws_event_stream_message_to_debug_str(FILE *fd, const struct aws_event_stream_message *message) { |
449 | struct aws_array_list ; |
450 | aws_event_stream_headers_list_init(&headers, message->alloc); |
451 | aws_event_stream_message_headers(message, &headers); |
452 | |
453 | fprintf( |
454 | fd, |
455 | "{\n " DEBUG_STR_PRELUDE_TOTAL_LEN "%d,\n " DEBUG_STR_PRELUDE_HDRS_LEN "%d,\n " DEBUG_STR_PRELUDE_CRC |
456 | "%d,\n" , |
457 | aws_event_stream_message_total_length(message), |
458 | aws_event_stream_message_headers_len(message), |
459 | aws_event_stream_message_prelude_crc(message)); |
460 | |
461 | int count = 0; |
462 | |
463 | uint16_t = (uint16_t)aws_array_list_length(&headers); |
464 | |
465 | fprintf(fd, " \"headers\": [" ); |
466 | |
467 | for (uint16_t i = 0; i < headers_count; ++i) { |
468 | struct aws_event_stream_header_value_pair * = NULL; |
469 | |
470 | aws_array_list_get_at_ptr(&headers, (void **)&header, i); |
471 | |
472 | fprintf(fd, " {\n" ); |
473 | |
474 | fprintf(fd, " " DEBUG_STR_HEADER_NAME "\"" ); |
475 | fwrite(header->header_name, sizeof(char), (size_t)header->header_name_len, fd); |
476 | fprintf(fd, "\",\n" ); |
477 | |
478 | fprintf(fd, " " DEBUG_STR_HEADER_TYPE "%d,\n" , header->header_value_type); |
479 | |
480 | if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_FALSE) { |
481 | fprintf(fd, " " DEBUG_STR_HEADER_VALUE "false\n" ); |
482 | } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE) { |
483 | fprintf(fd, " " DEBUG_STR_HEADER_VALUE "true\n" ); |
484 | } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE) { |
485 | int8_t int_value = (int8_t)header->header_value.static_val[0]; |
486 | fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n" , (int)int_value); |
487 | } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT16) { |
488 | int16_t int_value = aws_read_u16(header->header_value.static_val); |
489 | fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n" , (int)int_value); |
490 | } else if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT32) { |
491 | int32_t int_value = (int32_t)aws_read_u32(header->header_value.static_val); |
492 | fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%d\n" , (int)int_value); |
493 | } else if ( |
494 | header->header_value_type == AWS_EVENT_STREAM_HEADER_INT64 || |
495 | header->header_value_type == AWS_EVENT_STREAM_HEADER_TIMESTAMP) { |
496 | int64_t int_value = (int64_t)aws_read_u64(header->header_value.static_val); |
497 | fprintf(fd, " " DEBUG_STR_HEADER_VALUE "%lld\n" , (long long)int_value); |
498 | } else { |
499 | size_t buffer_len = 0; |
500 | aws_base64_compute_encoded_len(header->header_value_len, &buffer_len); |
501 | char *encoded_buffer = (char *)aws_mem_acquire(message->alloc, buffer_len); |
502 | if (!encoded_buffer) { |
503 | return aws_raise_error(AWS_ERROR_OOM); |
504 | } |
505 | |
506 | struct aws_byte_buf encode_output = aws_byte_buf_from_array((uint8_t *)encoded_buffer, buffer_len); |
507 | |
508 | if (header->header_value_type == AWS_EVENT_STREAM_HEADER_UUID) { |
509 | struct aws_byte_cursor to_encode = |
510 | aws_byte_cursor_from_array(header->header_value.static_val, header->header_value_len); |
511 | |
512 | aws_base64_encode(&to_encode, &encode_output); |
513 | } else { |
514 | struct aws_byte_cursor to_encode = |
515 | aws_byte_cursor_from_array(header->header_value.variable_len_val, header->header_value_len); |
516 | aws_base64_encode(&to_encode, &encode_output); |
517 | } |
518 | fprintf(fd, " " DEBUG_STR_HEADER_VALUE "\"%s\"\n" , encoded_buffer); |
519 | aws_mem_release(message->alloc, encoded_buffer); |
520 | } |
521 | |
522 | fprintf(fd, " }" ); |
523 | |
524 | if (count < headers_count - 1) { |
525 | fprintf(fd, "," ); |
526 | } |
527 | fprintf(fd, "\n" ); |
528 | |
529 | count++; |
530 | } |
531 | aws_event_stream_headers_list_cleanup(&headers); |
532 | fprintf(fd, " ],\n" ); |
533 | |
534 | size_t payload_len = aws_event_stream_message_payload_len(message); |
535 | const uint8_t *payload = aws_event_stream_message_payload(message); |
536 | size_t encoded_len = 0; |
537 | aws_base64_compute_encoded_len(payload_len, &encoded_len); |
538 | char *encoded_payload = (char *)aws_mem_acquire(message->alloc, encoded_len); |
539 | |
540 | if (!encoded_payload) { |
541 | return aws_raise_error(AWS_ERROR_OOM); |
542 | } |
543 | |
544 | struct aws_byte_cursor payload_buffer = aws_byte_cursor_from_array(payload, payload_len); |
545 | struct aws_byte_buf encoded_payload_buffer = aws_byte_buf_from_array((uint8_t *)encoded_payload, encoded_len); |
546 | |
547 | aws_base64_encode(&payload_buffer, &encoded_payload_buffer); |
548 | fprintf(fd, " \"payload\": \"%s\",\n" , encoded_payload); |
549 | fprintf(fd, " " DEBUG_STR_MESSAGE_CRC "%d\n}\n" , aws_event_stream_message_message_crc(message)); |
550 | |
551 | return AWS_OP_SUCCESS; |
552 | } |
553 | |
554 | int (struct aws_array_list *, struct aws_allocator *allocator) { |
555 | AWS_ASSERT(headers); |
556 | AWS_ASSERT(allocator); |
557 | |
558 | return aws_array_list_init_dynamic(headers, allocator, 4, sizeof(struct aws_event_stream_header_value_pair)); |
559 | } |
560 | |
561 | void (struct aws_array_list *) { |
562 | AWS_ASSERT(headers); |
563 | |
564 | for (size_t i = 0; i < aws_array_list_length(headers); ++i) { |
565 | struct aws_event_stream_header_value_pair * = NULL; |
566 | aws_array_list_get_at_ptr(headers, (void **)&header, i); |
567 | |
568 | if (header->value_owned) { |
569 | aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val); |
570 | } |
571 | } |
572 | |
573 | aws_array_list_clean_up(headers); |
574 | } |
575 | |
576 | static int ( |
577 | struct aws_array_list *, |
578 | struct aws_event_stream_header_value_pair *, |
579 | const char *name, |
580 | uint8_t name_len, |
581 | uint8_t *value, |
582 | uint16_t value_len, |
583 | int8_t copy) { |
584 | |
585 | memcpy((void *)header->header_name, (void *)name, (size_t)name_len); |
586 | |
587 | if (copy) { |
588 | header->header_value.variable_len_val = aws_mem_acquire(headers->alloc, value_len); |
589 | if (!header->header_value.variable_len_val) { |
590 | return aws_raise_error(AWS_ERROR_OOM); |
591 | } |
592 | |
593 | header->value_owned = 1; |
594 | memcpy((void *)header->header_value.variable_len_val, (void *)value, value_len); |
595 | } else { |
596 | header->value_owned = 0; |
597 | header->header_value.variable_len_val = value; |
598 | } |
599 | |
600 | if (aws_array_list_push_back(headers, (void *)header)) { |
601 | if (header->value_owned) { |
602 | aws_mem_release(headers->alloc, (void *)header->header_value.variable_len_val); |
603 | } |
604 | return AWS_OP_ERR; |
605 | } |
606 | |
607 | return AWS_OP_SUCCESS; |
608 | } |
609 | |
610 | int ( |
611 | struct aws_array_list *, |
612 | const char *name, |
613 | uint8_t name_len, |
614 | const char *value, |
615 | uint16_t value_len, |
616 | int8_t copy) { |
617 | struct aws_event_stream_header_value_pair = {.header_name_len = name_len, |
618 | .header_value_len = value_len, |
619 | .value_owned = copy, |
620 | .header_value_type = AWS_EVENT_STREAM_HEADER_STRING}; |
621 | |
622 | return s_add_variable_len_header(headers, &header, name, name_len, (uint8_t *)value, value_len, copy); |
623 | } |
624 | |
625 | int (struct aws_array_list *, const char *name, uint8_t name_len, int8_t value) { |
626 | struct aws_event_stream_header_value_pair = {.header_name_len = name_len, |
627 | .header_value_len = 1, |
628 | .value_owned = 0, |
629 | .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE, |
630 | .header_value.static_val[0] = (uint8_t)value}; |
631 | |
632 | memcpy((void *)header.header_name, (void *)name, (size_t)name_len); |
633 | |
634 | return aws_array_list_push_back(headers, (void *)&header); |
635 | } |
636 | |
637 | int (struct aws_array_list *, const char *name, uint8_t name_len, int8_t value) { |
638 | struct aws_event_stream_header_value_pair = { |
639 | .header_name_len = name_len, |
640 | .header_value_len = 0, |
641 | .value_owned = 0, |
642 | .header_value_type = value ? AWS_EVENT_STREAM_HEADER_BOOL_TRUE : AWS_EVENT_STREAM_HEADER_BOOL_FALSE, |
643 | }; |
644 | |
645 | memcpy((void *)header.header_name, (void *)name, (size_t)name_len); |
646 | |
647 | return aws_array_list_push_back(headers, (void *)&header); |
648 | } |
649 | |
650 | int ( |
651 | struct aws_array_list *, |
652 | const char *name, |
653 | uint8_t name_len, |
654 | int16_t value) { |
655 | struct aws_event_stream_header_value_pair = { |
656 | .header_name_len = name_len, |
657 | .header_value_len = sizeof(value), |
658 | .value_owned = 0, |
659 | .header_value_type = AWS_EVENT_STREAM_HEADER_INT16, |
660 | }; |
661 | |
662 | memcpy((void *)header.header_name, (void *)name, (size_t)name_len); |
663 | aws_write_u16((uint16_t)value, header.header_value.static_val); |
664 | |
665 | return aws_array_list_push_back(headers, (void *)&header); |
666 | } |
667 | |
668 | int ( |
669 | struct aws_array_list *, |
670 | const char *name, |
671 | uint8_t name_len, |
672 | int32_t value) { |
673 | struct aws_event_stream_header_value_pair = { |
674 | .header_name_len = name_len, |
675 | .header_value_len = sizeof(value), |
676 | .value_owned = 0, |
677 | .header_value_type = AWS_EVENT_STREAM_HEADER_INT32, |
678 | }; |
679 | |
680 | memcpy((void *)header.header_name, (void *)name, (size_t)name_len); |
681 | aws_write_u32((uint32_t)value, header.header_value.static_val); |
682 | |
683 | return aws_array_list_push_back(headers, (void *)&header); |
684 | } |
685 | |
686 | int ( |
687 | struct aws_array_list *, |
688 | const char *name, |
689 | uint8_t name_len, |
690 | int64_t value) { |
691 | struct aws_event_stream_header_value_pair = { |
692 | .header_name_len = name_len, |
693 | .header_value_len = sizeof(value), |
694 | .value_owned = 0, |
695 | .header_value_type = AWS_EVENT_STREAM_HEADER_INT64, |
696 | }; |
697 | |
698 | memcpy((void *)header.header_name, (void *)name, (size_t)name_len); |
699 | aws_write_u64((uint64_t)value, header.header_value.static_val); |
700 | |
701 | return aws_array_list_push_back(headers, (void *)&header); |
702 | } |
703 | |
704 | int ( |
705 | struct aws_array_list *, |
706 | const char *name, |
707 | uint8_t name_len, |
708 | uint8_t *value, |
709 | uint16_t value_len, |
710 | int8_t copy) { |
711 | struct aws_event_stream_header_value_pair = {.header_name_len = name_len, |
712 | .header_value_len = value_len, |
713 | .value_owned = copy, |
714 | .header_value_type = AWS_EVENT_STREAM_HEADER_BYTE_BUF}; |
715 | |
716 | return s_add_variable_len_header(headers, &header, name, name_len, value, value_len, copy); |
717 | } |
718 | |
719 | int ( |
720 | struct aws_array_list *, |
721 | const char *name, |
722 | uint8_t name_len, |
723 | int64_t value) { |
724 | struct aws_event_stream_header_value_pair = { |
725 | .header_name_len = name_len, |
726 | .header_value_len = sizeof(uint64_t), |
727 | .value_owned = 0, |
728 | .header_value_type = AWS_EVENT_STREAM_HEADER_TIMESTAMP, |
729 | }; |
730 | |
731 | memcpy((void *)header.header_name, (void *)name, (size_t)name_len); |
732 | aws_write_u64((uint64_t)value, header.header_value.static_val); |
733 | |
734 | return aws_array_list_push_back(headers, (void *)&header); |
735 | } |
736 | |
737 | int ( |
738 | struct aws_array_list *, |
739 | const char *name, |
740 | uint8_t name_len, |
741 | const uint8_t *value) { |
742 | struct aws_event_stream_header_value_pair = { |
743 | .header_name_len = name_len, |
744 | .header_value_len = 16, |
745 | .value_owned = 0, |
746 | .header_value_type = AWS_EVENT_STREAM_HEADER_UUID, |
747 | }; |
748 | |
749 | memcpy((void *)header.header_name, (void *)name, (size_t)name_len); |
750 | memcpy((void *)header.header_value.static_val, value, 16); |
751 | |
752 | return aws_array_list_push_back(headers, (void *)&header); |
753 | } |
754 | |
755 | struct aws_byte_buf (struct aws_event_stream_header_value_pair *) { |
756 | return aws_byte_buf_from_array((uint8_t *)header->header_name, header->header_name_len); |
757 | } |
758 | |
759 | int8_t (struct aws_event_stream_header_value_pair *) { |
760 | return (int8_t)header->header_value.static_val[0]; |
761 | } |
762 | |
763 | struct aws_byte_buf (struct aws_event_stream_header_value_pair *) { |
764 | return aws_event_stream_header_value_as_bytebuf(header); |
765 | } |
766 | |
767 | int8_t (struct aws_event_stream_header_value_pair *) { |
768 | return header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE ? (int8_t)1 : (int8_t)0; |
769 | } |
770 | |
771 | int16_t (struct aws_event_stream_header_value_pair *) { |
772 | return (int16_t)aws_read_u16(header->header_value.static_val); |
773 | } |
774 | |
775 | int32_t (struct aws_event_stream_header_value_pair *) { |
776 | return (int32_t)aws_read_u32(header->header_value.static_val); |
777 | } |
778 | |
779 | int64_t (struct aws_event_stream_header_value_pair *) { |
780 | return (int64_t)aws_read_u64(header->header_value.static_val); |
781 | } |
782 | |
783 | struct aws_byte_buf (struct aws_event_stream_header_value_pair *) { |
784 | return aws_byte_buf_from_array(header->header_value.variable_len_val, header->header_value_len); |
785 | } |
786 | |
787 | int64_t (struct aws_event_stream_header_value_pair *) { |
788 | return aws_event_stream_header_value_as_int64(header); |
789 | } |
790 | |
791 | struct aws_byte_buf (struct aws_event_stream_header_value_pair *) { |
792 | return aws_byte_buf_from_array(header->header_value.static_val, 16); |
793 | } |
794 | |
795 | uint16_t (struct aws_event_stream_header_value_pair *) { |
796 | return header->header_value_len; |
797 | } |
798 | |
799 | static struct aws_event_stream_message_prelude s_empty_prelude = {.total_len = 0, .headers_len = 0, .prelude_crc = 0}; |
800 | |
801 | static void (struct aws_event_stream_streaming_decoder *decoder, uint8_t ) { |
802 | |
803 | if (free_header_data && decoder->current_header.value_owned) { |
804 | aws_mem_release(decoder->alloc, (void *)decoder->current_header.header_value.variable_len_val); |
805 | } |
806 | |
807 | memset((void *)&decoder->current_header, 0, sizeof(struct aws_event_stream_header_value_pair)); |
808 | } |
809 | |
810 | static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder); |
811 | |
812 | static int s_headers_state( |
813 | struct aws_event_stream_streaming_decoder *decoder, |
814 | const uint8_t *data, |
815 | size_t len, |
816 | size_t *processed); |
817 | |
818 | static int ( |
819 | struct aws_event_stream_streaming_decoder *decoder, |
820 | const uint8_t *data, |
821 | size_t len, |
822 | size_t *processed) { |
823 | |
824 | size_t current_pos = decoder->message_pos; |
825 | |
826 | size_t length_read = current_pos - decoder->current_header_value_offset; |
827 | struct aws_event_stream_header_value_pair * = &decoder->current_header; |
828 | |
829 | if (!length_read) { |
830 | /* save an allocation, this can only happen if the data we were handed is larger than the length of the header |
831 | * value. we don't really need to handle offsets in this case. This expects the user is living by the contract |
832 | * that they cannot act like they own this memory beyond the lifetime of their callback, and they should not |
833 | * mutate it */ |
834 | if (len >= current_header->header_value_len) { |
835 | /* this part works regardless of type since the layout of the union will line up. */ |
836 | current_header->header_value.variable_len_val = (uint8_t *)data; |
837 | current_header->value_owned = 0; |
838 | decoder->on_header(decoder, &decoder->prelude, &decoder->current_header, decoder->user_context); |
839 | *processed += current_header->header_value_len; |
840 | decoder->message_pos += current_header->header_value_len; |
841 | decoder->running_crc = |
842 | aws_checksums_crc32(data, (int)current_header->header_value_len, decoder->running_crc); |
843 | |
844 | s_reset_header_state(decoder, 1); |
845 | decoder->state = s_headers_state; |
846 | return AWS_OP_SUCCESS; |
847 | } |
848 | |
849 | /* a possible optimization later would be to only allocate this once, and then keep reusing the same buffer. for |
850 | * subsequent messages.*/ |
851 | if (current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF || |
852 | current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING) { |
853 | current_header->header_value.variable_len_val = |
854 | aws_mem_acquire(decoder->alloc, decoder->current_header.header_value_len); |
855 | |
856 | if (!current_header->header_value.variable_len_val) { |
857 | return aws_raise_error(AWS_ERROR_OOM); |
858 | } |
859 | |
860 | current_header->value_owned = 1; |
861 | } |
862 | } |
863 | |
864 | size_t max_read = |
865 | len >= current_header->header_value_len - length_read ? current_header->header_value_len - length_read : len; |
866 | |
867 | const uint8_t * = current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF || |
868 | current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING |
869 | ? current_header->header_value.variable_len_val |
870 | : current_header->header_value.static_val; |
871 | |
872 | memcpy((void *)(header_value_alias + length_read), data, max_read); |
873 | decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc); |
874 | |
875 | *processed += max_read; |
876 | decoder->message_pos += max_read; |
877 | length_read += max_read; |
878 | |
879 | if (length_read == current_header->header_value_len) { |
880 | decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context); |
881 | s_reset_header_state(decoder, 1); |
882 | decoder->state = s_headers_state; |
883 | } |
884 | |
885 | return AWS_OP_SUCCESS; |
886 | } |
887 | |
888 | static int ( |
889 | struct aws_event_stream_streaming_decoder *decoder, |
890 | const uint8_t *data, |
891 | size_t len, |
892 | size_t *processed) { |
893 | size_t current_pos = decoder->message_pos; |
894 | |
895 | size_t length_portion_read = current_pos - decoder->current_header_value_offset; |
896 | |
897 | if (length_portion_read < sizeof(uint16_t)) { |
898 | size_t max_to_read = |
899 | len > sizeof(uint16_t) - length_portion_read ? sizeof(uint16_t) - length_portion_read : len; |
900 | memcpy(decoder->working_buffer + length_portion_read, data, max_to_read); |
901 | decoder->running_crc = aws_checksums_crc32(data, (int)max_to_read, decoder->running_crc); |
902 | |
903 | *processed += max_to_read; |
904 | decoder->message_pos += max_to_read; |
905 | |
906 | length_portion_read = decoder->message_pos - decoder->current_header_value_offset; |
907 | } |
908 | |
909 | if (length_portion_read == sizeof(uint16_t)) { |
910 | decoder->current_header.header_value_len = aws_read_u16(decoder->working_buffer); |
911 | decoder->current_header_value_offset = decoder->message_pos; |
912 | decoder->state = s_read_header_value; |
913 | } |
914 | |
915 | return AWS_OP_SUCCESS; |
916 | } |
917 | |
918 | static int ( |
919 | struct aws_event_stream_streaming_decoder *decoder, |
920 | const uint8_t *data, |
921 | size_t len, |
922 | size_t *processed) { |
923 | (void)len; |
924 | uint8_t type = *data; |
925 | decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc); |
926 | *processed += 1; |
927 | decoder->message_pos++; |
928 | decoder->current_header_value_offset++; |
929 | struct aws_event_stream_header_value_pair * = &decoder->current_header; |
930 | |
931 | if (type >= AWS_EVENT_STREAM_HEADER_BOOL_FALSE && type <= AWS_EVENT_STREAM_HEADER_UUID) { |
932 | current_header->header_value_type = (enum aws_event_stream_header_value_type)type; |
933 | |
934 | switch (type) { |
935 | case AWS_EVENT_STREAM_HEADER_STRING: |
936 | case AWS_EVENT_STREAM_HEADER_BYTE_BUF: |
937 | decoder->state = s_read_header_value_len; |
938 | break; |
939 | case AWS_EVENT_STREAM_HEADER_BOOL_FALSE: |
940 | current_header->header_value_len = 0; |
941 | current_header->header_value.static_val[0] = 0; |
942 | decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context); |
943 | s_reset_header_state(decoder, 1); |
944 | break; |
945 | case AWS_EVENT_STREAM_HEADER_BOOL_TRUE: |
946 | current_header->header_value_len = 0; |
947 | current_header->header_value.static_val[0] = 1; |
948 | decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context); |
949 | s_reset_header_state(decoder, 1); |
950 | break; |
951 | case AWS_EVENT_STREAM_HEADER_BYTE: |
952 | current_header->header_value_len = 1; |
953 | decoder->state = s_read_header_value; |
954 | break; |
955 | case AWS_EVENT_STREAM_HEADER_INT16: |
956 | current_header->header_value_len = sizeof(uint16_t); |
957 | decoder->state = s_read_header_value; |
958 | break; |
959 | case AWS_EVENT_STREAM_HEADER_INT32: |
960 | current_header->header_value_len = sizeof(uint32_t); |
961 | decoder->state = s_read_header_value; |
962 | break; |
963 | case AWS_EVENT_STREAM_HEADER_INT64: |
964 | case AWS_EVENT_STREAM_HEADER_TIMESTAMP: |
965 | current_header->header_value_len = sizeof(uint64_t); |
966 | decoder->state = s_read_header_value; |
967 | break; |
968 | case AWS_EVENT_STREAM_HEADER_UUID: |
969 | current_header->header_value_len = 16; |
970 | decoder->state = s_read_header_value; |
971 | break; |
972 | default: |
973 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE); |
974 | } |
975 | |
976 | return AWS_OP_SUCCESS; |
977 | } |
978 | |
979 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_UNKNOWN_HEADER_TYPE); |
980 | } |
981 | |
982 | static int ( |
983 | struct aws_event_stream_streaming_decoder *decoder, |
984 | const uint8_t *data, |
985 | size_t len, |
986 | size_t *processed) { |
987 | size_t current_pos = decoder->message_pos; |
988 | |
989 | size_t length_read = current_pos - decoder->current_header_name_offset; |
990 | |
991 | size_t max_read = len >= decoder->current_header.header_name_len - length_read |
992 | ? decoder->current_header.header_name_len - length_read |
993 | : len; |
994 | memcpy((void *)(decoder->current_header.header_name + length_read), data, max_read); |
995 | decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc); |
996 | |
997 | *processed += max_read; |
998 | decoder->message_pos += max_read; |
999 | length_read += max_read; |
1000 | |
1001 | if (length_read == decoder->current_header.header_name_len) { |
1002 | decoder->state = s_read_header_type; |
1003 | decoder->current_header_value_offset = decoder->message_pos; |
1004 | } |
1005 | |
1006 | return AWS_OP_SUCCESS; |
1007 | } |
1008 | |
1009 | static int ( |
1010 | struct aws_event_stream_streaming_decoder *decoder, |
1011 | const uint8_t *data, |
1012 | size_t len, |
1013 | size_t *processed) { |
1014 | (void)len; |
1015 | decoder->current_header.header_name_len = *data; |
1016 | decoder->message_pos++; |
1017 | decoder->current_header_name_offset++; |
1018 | *processed += 1; |
1019 | decoder->state = s_read_header_name; |
1020 | decoder->running_crc = aws_checksums_crc32(data, 1, decoder->running_crc); |
1021 | |
1022 | return AWS_OP_SUCCESS; |
1023 | } |
1024 | |
1025 | static int ( |
1026 | struct aws_event_stream_streaming_decoder *decoder, |
1027 | const uint8_t *data, |
1028 | size_t len, |
1029 | size_t *processed) /* NOLINT */ { |
1030 | (void)data; |
1031 | (void)len; |
1032 | (void)processed; |
1033 | decoder->state = s_read_header_name_len; |
1034 | decoder->current_header_name_offset = decoder->message_pos; |
1035 | |
1036 | return AWS_OP_SUCCESS; |
1037 | } |
1038 | |
1039 | static int s_payload_state( |
1040 | struct aws_event_stream_streaming_decoder *decoder, |
1041 | const uint8_t *data, |
1042 | size_t len, |
1043 | size_t *processed); |
1044 | |
1045 | /*Handles the initial state for header parsing. |
1046 | will oscillate between multiple other states as well. |
1047 | after all headers have been handled, payload will be set as the next state. */ |
1048 | static int ( |
1049 | struct aws_event_stream_streaming_decoder *decoder, |
1050 | const uint8_t *data, |
1051 | size_t len, |
1052 | size_t *processed) /* NOLINT */ { |
1053 | (void)data; |
1054 | (void)len; |
1055 | (void)processed; |
1056 | |
1057 | size_t current_pos = decoder->message_pos; |
1058 | |
1059 | size_t = decoder->prelude.headers_len + AWS_EVENT_STREAM_PRELUDE_LENGTH; |
1060 | |
1061 | if (current_pos < headers_boundary) { |
1062 | decoder->state = s_start_header; |
1063 | return AWS_OP_SUCCESS; |
1064 | } |
1065 | |
1066 | if (current_pos == headers_boundary) { |
1067 | decoder->state = s_payload_state; |
1068 | return AWS_OP_SUCCESS; |
1069 | } |
1070 | |
1071 | return aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_PARSER_ILLEGAL_STATE); |
1072 | } |
1073 | |
1074 | /* handles reading the trailer. Once it has been read, it will be compared to the running checksum. If successful, |
1075 | * the state will be reset. */ |
1076 | static int s_read_trailer_state( |
1077 | struct aws_event_stream_streaming_decoder *decoder, |
1078 | const uint8_t *data, |
1079 | size_t len, |
1080 | size_t *processed) { |
1081 | |
1082 | size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos; |
1083 | size_t segment_length = len > remaining_amount ? remaining_amount : len; |
1084 | size_t offset = sizeof(uint32_t) - remaining_amount; |
1085 | memcpy(decoder->working_buffer + offset, data, segment_length); |
1086 | decoder->message_pos += segment_length; |
1087 | *processed += segment_length; |
1088 | |
1089 | if (decoder->message_pos == decoder->prelude.total_len) { |
1090 | uint32_t message_crc = aws_read_u32(decoder->working_buffer); |
1091 | |
1092 | if (message_crc == decoder->running_crc) { |
1093 | s_reset_state(decoder); |
1094 | } else { |
1095 | char error_message[70]; |
1096 | snprintf( |
1097 | error_message, |
1098 | sizeof(error_message), |
1099 | "CRC Mismatch. message_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32, |
1100 | message_crc, |
1101 | decoder->running_crc); |
1102 | aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE); |
1103 | decoder->on_error( |
1104 | decoder, |
1105 | &decoder->prelude, |
1106 | AWS_ERROR_EVENT_STREAM_MESSAGE_CHECKSUM_FAILURE, |
1107 | error_message, |
1108 | decoder->user_context); |
1109 | return AWS_OP_ERR; |
1110 | } |
1111 | } |
1112 | |
1113 | return AWS_OP_SUCCESS; |
1114 | } |
1115 | |
1116 | /* handles the reading of the payload up to the final checksum. Sets read_trailer_state as the new state once |
1117 | * the payload has been processed. */ |
1118 | static int s_payload_state( |
1119 | struct aws_event_stream_streaming_decoder *decoder, |
1120 | const uint8_t *data, |
1121 | size_t len, |
1122 | size_t *processed) { |
1123 | |
1124 | if (decoder->message_pos < decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) { |
1125 | size_t remaining_amount = decoder->prelude.total_len - decoder->message_pos - AWS_EVENT_STREAM_TRAILER_LENGTH; |
1126 | size_t segment_length = len > remaining_amount ? remaining_amount : len; |
1127 | int8_t final_segment = |
1128 | (segment_length + decoder->message_pos) == (decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH); |
1129 | struct aws_byte_buf payload_buf = aws_byte_buf_from_array(data, segment_length); |
1130 | decoder->on_payload(decoder, &payload_buf, final_segment, decoder->user_context); |
1131 | decoder->message_pos += segment_length; |
1132 | decoder->running_crc = aws_checksums_crc32(data, (int)segment_length, decoder->running_crc); |
1133 | *processed += segment_length; |
1134 | } |
1135 | |
1136 | if (decoder->message_pos == decoder->prelude.total_len - AWS_EVENT_STREAM_TRAILER_LENGTH) { |
1137 | decoder->state = s_read_trailer_state; |
1138 | } |
1139 | |
1140 | return AWS_OP_SUCCESS; |
1141 | } |
1142 | |
1143 | /* Parses the payload and verifies checksums. Sets the next state if successful. */ |
1144 | static int s_verify_prelude_state( |
1145 | struct aws_event_stream_streaming_decoder *decoder, |
1146 | const uint8_t *data, |
1147 | size_t len, |
1148 | size_t *processed) /* NOLINT */ { |
1149 | (void)data; |
1150 | (void)len; |
1151 | (void)processed; |
1152 | |
1153 | decoder->prelude.headers_len = aws_read_u32(decoder->working_buffer + HEADER_LEN_OFFSET); |
1154 | decoder->prelude.prelude_crc = aws_read_u32(decoder->working_buffer + PRELUDE_CRC_OFFSET); |
1155 | decoder->prelude.total_len = aws_read_u32(decoder->working_buffer + TOTAL_LEN_OFFSET); |
1156 | |
1157 | decoder->running_crc = aws_checksums_crc32(decoder->working_buffer, PRELUDE_CRC_OFFSET, 0); |
1158 | |
1159 | if (AWS_LIKELY(decoder->running_crc == decoder->prelude.prelude_crc)) { |
1160 | |
1161 | if (AWS_UNLIKELY( |
1162 | decoder->prelude.headers_len > MAX_HEADERS_SIZE || decoder->prelude.total_len > MAX_MESSAGE_SIZE)) { |
1163 | aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED); |
1164 | char error_message[] = "Maximum message field size exceeded" ; |
1165 | |
1166 | decoder->on_error( |
1167 | decoder, |
1168 | &decoder->prelude, |
1169 | AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED, |
1170 | error_message, |
1171 | decoder->user_context); |
1172 | return AWS_OP_ERR; |
1173 | } |
1174 | |
1175 | /* Should only call on_prelude() after passing crc check and limitation check, otherwise call on_prelude() with |
1176 | * incorrect prelude is error prune. */ |
1177 | decoder->on_prelude(decoder, &decoder->prelude, decoder->user_context); |
1178 | |
1179 | decoder->running_crc = aws_checksums_crc32( |
1180 | decoder->working_buffer + PRELUDE_CRC_OFFSET, |
1181 | (int)sizeof(decoder->prelude.prelude_crc), |
1182 | decoder->running_crc); |
1183 | memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer)); |
1184 | decoder->state = decoder->prelude.headers_len > 0 ? s_headers_state : s_payload_state; |
1185 | } else { |
1186 | char error_message[70]; |
1187 | snprintf( |
1188 | error_message, |
1189 | sizeof(error_message), |
1190 | "CRC Mismatch. prelude_crc was 0x08%" PRIX32 ", but computed 0x08%" PRIX32, |
1191 | decoder->prelude.prelude_crc, |
1192 | decoder->running_crc); |
1193 | |
1194 | aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE); |
1195 | decoder->on_error( |
1196 | decoder, |
1197 | &decoder->prelude, |
1198 | AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE, |
1199 | error_message, |
1200 | decoder->user_context); |
1201 | return AWS_OP_ERR; |
1202 | } |
1203 | |
1204 | return AWS_OP_SUCCESS; |
1205 | } |
1206 | |
1207 | /* initial state handles up to the reading of the prelude */ |
1208 | static int s_start_state( |
1209 | struct aws_event_stream_streaming_decoder *decoder, |
1210 | const uint8_t *data, |
1211 | size_t len, |
1212 | size_t *processed) { |
1213 | |
1214 | size_t previous_position = decoder->message_pos; |
1215 | if (decoder->message_pos < AWS_EVENT_STREAM_PRELUDE_LENGTH) { |
1216 | if (len >= AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos) { |
1217 | memcpy( |
1218 | decoder->working_buffer + decoder->message_pos, |
1219 | data, |
1220 | AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos); |
1221 | decoder->message_pos += AWS_EVENT_STREAM_PRELUDE_LENGTH - decoder->message_pos; |
1222 | } else { |
1223 | memcpy(decoder->working_buffer + decoder->message_pos, data, len); |
1224 | decoder->message_pos += len; |
1225 | } |
1226 | |
1227 | *processed += decoder->message_pos - previous_position; |
1228 | } |
1229 | |
1230 | if (decoder->message_pos == AWS_EVENT_STREAM_PRELUDE_LENGTH) { |
1231 | decoder->state = s_verify_prelude_state; |
1232 | } |
1233 | |
1234 | return AWS_OP_SUCCESS; |
1235 | } |
1236 | |
1237 | static void s_reset_state(struct aws_event_stream_streaming_decoder *decoder) { |
1238 | decoder->message_pos = 0; |
1239 | decoder->prelude = s_empty_prelude; |
1240 | decoder->running_crc = 0; |
1241 | memset(decoder->working_buffer, 0, sizeof(decoder->working_buffer)); |
1242 | decoder->state = s_start_state; |
1243 | } |
1244 | |
1245 | void aws_event_stream_streaming_decoder_init( |
1246 | struct aws_event_stream_streaming_decoder *decoder, |
1247 | struct aws_allocator *alloc, |
1248 | aws_event_stream_process_on_payload_segment_fn *on_payload_segment, |
1249 | aws_event_stream_prelude_received_fn *on_prelude, |
1250 | aws_event_stream_header_received_fn *, |
1251 | aws_event_stream_on_error_fn *on_error, |
1252 | void *user_data) { |
1253 | |
1254 | s_reset_state(decoder); |
1255 | decoder->alloc = alloc; |
1256 | decoder->on_error = on_error; |
1257 | decoder->on_header = on_header; |
1258 | decoder->on_payload = on_payload_segment; |
1259 | decoder->on_prelude = on_prelude; |
1260 | decoder->user_context = user_data; |
1261 | } |
1262 | |
1263 | void aws_event_stream_streaming_decoder_clean_up(struct aws_event_stream_streaming_decoder *decoder) { |
1264 | s_reset_state(decoder); |
1265 | decoder->on_error = 0; |
1266 | decoder->on_header = 0; |
1267 | decoder->on_payload = 0; |
1268 | decoder->on_prelude = 0; |
1269 | decoder->user_context = 0; |
1270 | } |
1271 | |
1272 | /* Simply sends the data to the state machine until all has been processed or an error is returned. */ |
1273 | int aws_event_stream_streaming_decoder_pump( |
1274 | struct aws_event_stream_streaming_decoder *decoder, |
1275 | const struct aws_byte_buf *data) { |
1276 | |
1277 | size_t processed = 0; |
1278 | int err_val = 0; |
1279 | while (!err_val && data->buffer && data->len && processed < data->len) { |
1280 | err_val = decoder->state(decoder, data->buffer + processed, data->len - processed, &processed); |
1281 | } |
1282 | |
1283 | return err_val; |
1284 | } |
1285 | #if _MSC_VER |
1286 | # pragma warning(pop) |
1287 | #endif |
1288 | |