root/src/modules/librabbitmq/amqp_private.h

Revision 1e8ae2b61ace7f9ef47a4f5c190bd2df06b04588, 4.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

initial import of a failover-aware AMQP client courtesy of Circonus

  • Property mode set to 100644
Line 
1 #ifndef librabbitmq_amqp_private_h
2 #define librabbitmq_amqp_private_h
3
4 #ifdef __cplusplus
5 extern "C" {
6 #endif
7
8 #include <arpa/inet.h> /* ntohl, htonl, ntohs, htons */
9
10 /*
11  * Connection states:
12  *
13  * - CONNECTION_STATE_IDLE: initial state, and entered again after
14  *   each frame is completed. Means that no bytes of the next frame
15  *   have been seen yet. Connections may only be reconfigured, and the
16  *   connection's pools recycled, when in this state. Whenever we're
17  *   in this state, the inbound_buffer's bytes pointer must be NULL;
18  *   any other state, and it must point to a block of memory allocated
19  *   from the frame_pool.
20  *
21  * - CONNECTION_STATE_WAITING_FOR_HEADER: Some bytes of an incoming
22  *   frame have been seen, but not a complete frame header's worth.
23  *
24  * - CONNECTION_STATE_WAITING_FOR_BODY: A complete frame header has
25  *   been seen, but the frame is not yet complete. When it is
26  *   completed, it will be returned, and the connection will return to
27  *   IDLE state.
28  *
29  * - CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER: The beginning of a
30  *   protocol version header has been seen, but the full eight bytes
31  *   hasn't yet been received. When it is completed, it will be
32  *   returned, and the connection will return to IDLE state.
33  *
34  */
35 typedef enum amqp_connection_state_enum_ {
36   CONNECTION_STATE_IDLE = 0,
37   CONNECTION_STATE_WAITING_FOR_HEADER,
38   CONNECTION_STATE_WAITING_FOR_BODY,
39   CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER
40 } amqp_connection_state_enum;
41
42 /* 7 bytes up front, then payload, then 1 byte footer */
43 #define HEADER_SIZE 7
44 #define FOOTER_SIZE 1
45
46 typedef struct amqp_link_t_ {
47   struct amqp_link_t_ *next;
48   void *data;
49 } amqp_link_t;
50
51 struct amqp_connection_state_t_ {
52   amqp_pool_t frame_pool;
53   amqp_pool_t decoding_pool;
54
55   amqp_connection_state_enum state;
56
57   int channel_max;
58   int frame_max;
59   int heartbeat;
60   amqp_bytes_t inbound_buffer;
61
62   size_t inbound_offset;
63   size_t target_size;
64
65   amqp_bytes_t outbound_buffer;
66
67   int sockfd;
68   amqp_bytes_t sock_inbound_buffer;
69   size_t sock_inbound_offset;
70   size_t sock_inbound_limit;
71
72   amqp_link_t *first_queued_frame;
73   amqp_link_t *last_queued_frame;
74
75   amqp_basic_return_fn_t basic_return_callback;
76   void *basic_return_callback_data;
77 };
78
79 #define CHECK_LIMIT(b, o, l, v) ({ if ((o + l) > (b).len) { return -EFAULT; } (v); })
80 #define BUF_AT(b, o) (&(((uint8_t *) (b).bytes)[o]))
81
82 #define D_8(b, o) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o))
83 #define D_16(b, o) CHECK_LIMIT(b, o, 2, ({uint16_t v; memcpy(&v, BUF_AT(b, o), 2); ntohs(v);}))
84 #define D_32(b, o) CHECK_LIMIT(b, o, 4, ({uint32_t v; memcpy(&v, BUF_AT(b, o), 4); ntohl(v);}))
85 #define D_64(b, o) ({                           \
86   uint64_t hi = D_32(b, o);                     \
87   uint64_t lo = D_32(b, o + 4);                 \
88   hi << 32 | lo;                                \
89 })
90
91 #define D_BYTES(b, o, l) CHECK_LIMIT(b, o, l, BUF_AT(b, o))
92
93 #define E_8(b, o, v) CHECK_LIMIT(b, o, 1, * (uint8_t *) BUF_AT(b, o) = (v))
94 #define E_16(b, o, v) CHECK_LIMIT(b, o, 2, ({uint16_t vv = htons(v); memcpy(BUF_AT(b, o), &vv, 2);}))
95 #define E_32(b, o, v) CHECK_LIMIT(b, o, 4, ({uint32_t vv = htonl(v); memcpy(BUF_AT(b, o), &vv, 4);}))
96 #define E_64(b, o, v) ({                                        \
97       E_32(b, o, (uint32_t) (((uint64_t) v) >> 32));            \
98       E_32(b, o + 4, (uint32_t) (((uint64_t) v) & 0xFFFFFFFF)); \
99     })
100
101 #define E_BYTES(b, o, l, v) CHECK_LIMIT(b, o, l, memcpy(BUF_AT(b, o), (v), (l)))
102
103 extern int amqp_decode_table(amqp_bytes_t encoded,
104                              amqp_pool_t *pool,
105                              amqp_table_t *output,
106                              int *offsetptr);
107
108 extern int amqp_encode_table(amqp_bytes_t encoded,
109                              amqp_table_t *input,
110                              int *offsetptr);
111
112 #define amqp_assert(condition, ...)             \
113   ({                                            \
114     if (!(condition)) {                         \
115       fprintf(stderr, __VA_ARGS__);             \
116       fputc('\n', stderr);                      \
117       abort();                                  \
118     }                                           \
119   })
120
121 #define AMQP_CHECK_RESULT(expr)                 \
122   ({                                            \
123     int _result = (expr);                       \
124     if (_result < 0) return _result;            \
125     _result;                                    \
126   })
127
128 #define AMQP_CHECK_EOF_RESULT(expr)             \
129   ({                                            \
130     int _result = (expr);                       \
131     if (_result <= 0) return _result;           \
132     _result;                                    \
133   })
134
135 #ifndef NDEBUG
136 extern void amqp_dump(void const *buffer, size_t len);
137 #else
138 #define amqp_dump(buffer, len) ((void) 0)
139 #endif
140
141 #ifdef __cplusplus
142 }
143 #endif
144
145 #endif
Note: See TracBrowser for help on using the browser.