| 1 |
#ifndef librabbitmq_amqp_private_h |
|---|
| 2 |
#define librabbitmq_amqp_private_h |
|---|
| 3 |
|
|---|
| 4 |
#ifdef __cplusplus |
|---|
| 5 |
extern "C" { |
|---|
| 6 |
#endif |
|---|
| 7 |
|
|---|
| 8 |
#include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */ |
|---|
| 9 |
|
|---|
| 10 |
/* |
|---|
| 11 |
* Connection states: |
|---|
| 12 |
* |
|---|
| 13 |
* - CONNECTION_STATE_IDLE: initial state, and entered again after |
|---|
| 14 |
* each frame is completed. Means that no bytes of the next frame |
|---|
| 15 |
* have been seen yet. Connections may only be reconfigured, and the |
|---|
| 16 |
* connection's pools recycled, when in this state. Whenever we're |
|---|
| 17 |
* in this state, the inbound_buffer's bytes pointer must be NULL; |
|---|
| 18 |
* any other state, and it must point to a block of memory allocated |
|---|
| 19 |
* from the frame_pool. |
|---|
| 20 |
* |
|---|
| 21 |
* - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming |
|---|
| 22 |
* frame have been seen, but not a complete frame header's worth. |
|---|
| 23 |
* |
|---|
| 24 |
* - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has |
|---|
| 25 |
* been seen, but the frame is not yet complete. When it is |
|---|
| 26 |
* completed, it will be returned, and the connection will return to |
|---|
| 27 |
* IDLE state. |
|---|
| 28 |
* |
|---|
| 29 |
* - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a |
|---|
| 30 |
* protocol version header has been seen, but the full eight bytes |
|---|
| 31 |
* hasn't yet been received. When it is completed, it will be |
|---|
| 32 |
* returned, and the connection will return to IDLE state. |
|---|
| 33 |
* |
|---|
| 34 |
*/ |
|---|
| 35 |
typedef enum amqp_connection_state_enum_ { |
|---|
| 36 |
CONNECTION_STATE_IDLE = 0, |
|---|
| 37 |
CONNECTION_STATE_WAITING_FOR_HEADER, |
|---|
| 38 |
CONNECTION_STATE_WAITING_FOR_BODY, |
|---|
| 39 |
CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER |
|---|
| 40 |
} amqp_connection_state_enum; |
|---|
| 41 |
|
|---|
| 42 |
/* 7 bytes up front, then payload, then 1 byte footer */ |
|---|
| 43 |
#define HEADER_SIZE 7 |
|---|
| 44 |
#define FOOTER_SIZE 1 |
|---|
| 45 |
|
|---|
| 46 |
typedef struct amqp_link_t_ { |
|---|
| 47 |
struct amqp_link_t_ *next; |
|---|
| 48 |
void *data; |
|---|
| 49 |
} amqp_link_t; |
|---|
| 50 |
|
|---|
| 51 |
struct amqp_connection_state_t_ { |
|---|
| 52 |
amqp_pool_t frame_pool; |
|---|
| 53 |
amqp_pool_t decoding_pool; |
|---|
| 54 |
|
|---|
| 55 |
amqp_connection_state_enum state; |
|---|
| 56 |
|
|---|
| 57 |
int channel_max; |
|---|
| 58 |
int frame_max; |
|---|
| 59 |
int heartbeat; |
|---|
| 60 |
amqp_bytes_t inbound_buffer; |
|---|
| 61 |
|
|---|
| 62 |
size_t inbound_offset; |
|---|
| 63 |
size_t target_size; |
|---|
| 64 |
|
|---|
| 65 |
amqp_bytes_t outbound_buffer; |
|---|
| 66 |
|
|---|
| 67 |
int sockfd; |
|---|
| 68 |
amqp_bytes_t sock_inbound_buffer; |
|---|
| 69 |
size_t sock_inbound_offset; |
|---|
| 70 |
size_t sock_inbound_limit; |
|---|
| 71 |
|
|---|
| 72 |
amqp_link_t *first_queued_frame; |
|---|
| 73 |
amqp_link_t *last_queued_frame; |
|---|
| 74 |
|
|---|
| 75 |
amqp_basic_return_fn_t basic_return_callback; |
|---|
| 76 |
void *basic_return_callback_data; |
|---|
| 77 |
}; |
|---|
| 78 |
|
|---|
| 79 |
#define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); }) |
|---|
| 80 |
#define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o])) |
|---|
| 81 |
|
|---|
| 82 |
#define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o)) |
|---|
| 83 |
#define D_16(b, o) CHECK_LIMIT(b, o, 2, ({uint16_t v; memcpy(&v, BUF_AT(b, o), 2); ntohs(v);})) |
|---|
| 84 |
#define D_32(b, o) CHECK_LIMIT(b, o, 4, ({uint32_t v; memcpy(&v, BUF_AT(b, o), 4); ntohl(v);})) |
|---|
| 85 |
#define D_64(b, o) ({ \ |
|---|
| 86 |
uint64_t hi = D_32(b, o); \ |
|---|
| 87 |
uint64_t lo = D_32(b, o + 4); \ |
|---|
| 88 |
hi << 32 | lo; \ |
|---|
| 89 |
}) |
|---|
| 90 |
|
|---|
| 91 |
#define D_BYTES(b, o, l) CHECK_LIMIT(b, o, l, BUF_AT(b, o)) |
|---|
| 92 |
|
|---|
| 93 |
#define E_8(b, o, v) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o) = (v)) |
|---|
| 94 |
#define E_16(b, o, v) CHECK_LIMIT(b, o, 2, ({uint16_t vv = htons(v); memcpy(BUF_AT(b, o), &vv, 2);})) |
|---|
| 95 |
#define E_32(b, o, v) CHECK_LIMIT(b, o, 4, ({uint32_t vv = htonl(v); memcpy(BUF_AT(b, o), &vv, 4);})) |
|---|
| 96 |
#define E_64(b, o, v) ({ \ |
|---|
| 97 |
E_32(b, o, (uint32_t) (((uint64_t) v) >> 32)); \ |
|---|
| 98 |
E_32(b, o + 4, (uint32_t) (((uint64_t) v) & 0xFFFFFFFF)); \ |
|---|
| 99 |
}) |
|---|
| 100 |
|
|---|
| 101 |
#define E_BYTES(b, o, l, v) CHECK_LIMIT(b, o, l, memcpy(BUF_AT(b, o), (v), (l))) |
|---|
| 102 |
|
|---|
| 103 |
extern int amqp_decode_table(amqp_bytes_t encoded, |
|---|
| 104 |
amqp_pool_t *pool, |
|---|
| 105 |
amqp_table_t *output, |
|---|
| 106 |
int *offsetptr); |
|---|
| 107 |
|
|---|
| 108 |
extern int amqp_encode_table(amqp_bytes_t encoded, |
|---|
| 109 |
amqp_table_t *input, |
|---|
| 110 |
int *offsetptr); |
|---|
| 111 |
|
|---|
| 112 |
#define amqp_assert(condition, ...) \ |
|---|
| 113 |
({ \ |
|---|
| 114 |
if (!(condition)) { \ |
|---|
| 115 |
fprintf(stderr, __VA_ARGS__); \ |
|---|
| 116 |
fputc('\n', stderr); \ |
|---|
| 117 |
abort(); \ |
|---|
| 118 |
} \ |
|---|
| 119 |
}) |
|---|
| 120 |
|
|---|
| 121 |
#define AMQP_CHECK_RESULT(expr) \ |
|---|
| 122 |
({ \ |
|---|
| 123 |
int _result = (expr); \ |
|---|
| 124 |
if (_result < 0) return _result; \ |
|---|
| 125 |
_result; \ |
|---|
| 126 |
}) |
|---|
| 127 |
|
|---|
| 128 |
#define AMQP_CHECK_EOF_RESULT(expr) \ |
|---|
| 129 |
({ \ |
|---|
| 130 |
int _result = (expr); \ |
|---|
| 131 |
if (_result <= 0) return _result; \ |
|---|
| 132 |
_result; \ |
|---|
| 133 |
}) |
|---|
| 134 |
|
|---|
| 135 |
#ifndef NDEBUG |
|---|
| 136 |
extern void amqp_dump(void const *buffer, size_t len); |
|---|
| 137 |
#else |
|---|
| 138 |
#define amqp_dump(buffer, len) ((void) 0) |
|---|
| 139 |
#endif |
|---|
| 140 |
|
|---|
| 141 |
#ifdef __cplusplus |
|---|
| 142 |
} |
|---|
| 143 |
#endif |
|---|
| 144 |
|
|---|
| 145 |
#endif |
|---|