root/src/modules/librabbitmq/amqp_connection.c

Revision 1e8ae2b61ace7f9ef47a4f5c190bd2df06b04588, 13.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

initial import of a failover-aware AMQP client courtesy of Circonus

  • Property mode set to 100644
Line 
1 #include <stdlib.h>
2 #include <stdio.h>
3 #include <string.h>
4 #include <stdint.h>
5 #include <errno.h>
6
7 #include <unistd.h>
8 #include <sys/uio.h>
9
10 #include "amqp.h"
11 #include "amqp_framing.h"
12 #include "amqp_private.h"
13
14 #include <assert.h>
15
16 #define INITIAL_FRAME_POOL_PAGE_SIZE 65536
17 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072
18 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
19
20 #define ENFORCE_STATE(statevec, statenum)                               \
21   {                                                                     \
22     amqp_connection_state_t _check_state = (statevec);                  \
23     int _wanted_state = (statenum);                                     \
24     amqp_assert(_check_state->state == _wanted_state,                   \
25                 "Programming error: invalid AMQP connection state: expected %d, got %d", \
26                 _wanted_state,                                          \
27                 _check_state->state);                                   \
28   }
29
30 amqp_connection_state_t amqp_new_connection(void) {
31   amqp_connection_state_t state =
32     (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
33
34   if (state == NULL) {
35     return NULL;
36   }
37
38   init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
39   init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
40
41   state->state = CONNECTION_STATE_IDLE;
42
43   state->inbound_buffer.bytes = NULL;
44   state->outbound_buffer.bytes = NULL;
45   if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
46     empty_amqp_pool(&state->frame_pool);
47     empty_amqp_pool(&state->decoding_pool);
48     free(state);
49     return NULL;
50   }
51
52   state->inbound_offset = 0;
53   state->target_size = HEADER_SIZE;
54
55   state->sockfd = -1;
56   state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
57   state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
58   if (state->sock_inbound_buffer.bytes == NULL) {
59     amqp_destroy_connection(state);
60     return NULL;
61   }
62
63   state->sock_inbound_offset = 0;
64   state->sock_inbound_limit = 0;
65
66   state->first_queued_frame = NULL;
67   state->last_queued_frame = NULL;
68
69   return state;
70 }
71
72 int amqp_get_sockfd(amqp_connection_state_t state) {
73   return state->sockfd;
74 }
75
76 void amqp_set_sockfd(amqp_connection_state_t state,
77                      int sockfd)
78 {
79   state->sockfd = sockfd;
80 }
81
82 int amqp_tune_connection(amqp_connection_state_t state,
83                          int channel_max,
84                          int frame_max,
85                          int heartbeat)
86 {
87   void *newbuf;
88
89   ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
90
91   state->channel_max = channel_max;
92   state->frame_max = frame_max;
93   state->heartbeat = heartbeat;
94
95   empty_amqp_pool(&state->frame_pool);
96   init_amqp_pool(&state->frame_pool, frame_max);
97
98   state->inbound_buffer.len = frame_max;
99   state->outbound_buffer.len = frame_max;
100   newbuf = realloc(state->outbound_buffer.bytes, frame_max);
101   if (newbuf == NULL) {
102     amqp_destroy_connection(state);
103     return -ENOMEM;
104   }
105   state->outbound_buffer.bytes = newbuf;
106
107   return 0;
108 }
109
110 int amqp_get_channel_max(amqp_connection_state_t state) {
111   return state->channel_max;
112 }
113
114 void amqp_destroy_connection(amqp_connection_state_t state) {
115   empty_amqp_pool(&state->frame_pool);
116   empty_amqp_pool(&state->decoding_pool);
117   free(state->outbound_buffer.bytes);
118   free(state->sock_inbound_buffer.bytes);
119   free(state);
120 }
121
122 static void return_to_idle(amqp_connection_state_t state) {
123   state->inbound_buffer.bytes = NULL;
124   state->inbound_offset = 0;
125   state->target_size = HEADER_SIZE;
126   state->state = CONNECTION_STATE_IDLE;
127 }
128
129 void amqp_set_basic_return_cb(amqp_connection_state_t state,
130                               amqp_basic_return_fn_t f, void *data) {
131   state->basic_return_callback = f;
132   state->basic_return_callback_data = data;
133 }
134 int amqp_handle_input(amqp_connection_state_t state,
135                       amqp_bytes_t received_data,
136                       amqp_frame_t *decoded_frame)
137 {
138   int total_bytes_consumed = 0;
139   int bytes_consumed;
140
141   /* Returning frame_type of zero indicates either insufficient input,
142      or a complete, ignored frame was read. */
143   decoded_frame->frame_type = 0;
144
145  read_more:
146   if (received_data.len == 0) {
147     return total_bytes_consumed;
148   }
149
150   if (state->state == CONNECTION_STATE_IDLE) {
151     state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
152     state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
153   }
154
155   bytes_consumed = state->target_size - state->inbound_offset;
156   if (received_data.len < bytes_consumed) {
157     bytes_consumed = received_data.len;
158   }
159
160   E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
161   state->inbound_offset += bytes_consumed;
162   total_bytes_consumed += bytes_consumed;
163
164   assert(state->inbound_offset <= state->target_size);
165
166   if (state->inbound_offset < state->target_size) {
167     return total_bytes_consumed;
168   }
169
170   switch (state->state) {
171     case CONNECTION_STATE_WAITING_FOR_HEADER:
172       if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
173           D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
174       {
175         state->target_size = 8;
176         state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
177       } else {
178         state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
179         state->state = CONNECTION_STATE_WAITING_FOR_BODY;
180       }
181
182       /* Wind buffer forward, and try to read some body out of it. */
183       received_data.len -= bytes_consumed;
184       received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
185       goto read_more;
186
187     case CONNECTION_STATE_WAITING_FOR_BODY: {
188       int frame_type = D_8(state->inbound_buffer, 0);
189
190 #if 0
191       printf("recving:\n");
192       amqp_dump(state->inbound_buffer.bytes, state->target_size);
193 #endif
194
195       /* Check frame end marker (footer) */
196       if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
197         return -EINVAL;
198       }
199
200       decoded_frame->channel = D_16(state->inbound_buffer, 1);
201
202       switch (frame_type) {
203         case AMQP_FRAME_METHOD: {
204           amqp_bytes_t encoded;
205
206           /* Four bytes of method ID before the method args. */
207           encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
208           encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
209
210           decoded_frame->frame_type = AMQP_FRAME_METHOD;
211           decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
212           AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
213                                                &state->decoding_pool,
214                                                encoded,
215                                                &decoded_frame->payload.method.decoded));
216           break;
217         }
218
219         case AMQP_FRAME_HEADER: {
220           amqp_bytes_t encoded;
221
222           /* 12 bytes for properties header. */
223           encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
224           encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
225
226           decoded_frame->frame_type = AMQP_FRAME_HEADER;
227           decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
228           decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
229           AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
230                                                    &state->decoding_pool,
231                                                    encoded,
232                                                    &decoded_frame->payload.properties.decoded));
233           break;
234         }
235
236         case AMQP_FRAME_BODY: {
237           size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
238
239           decoded_frame->frame_type = AMQP_FRAME_BODY;
240           decoded_frame->payload.body_fragment.len = fragment_len;
241           decoded_frame->payload.body_fragment.bytes =
242             D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
243           break;
244         }
245
246         case AMQP_FRAME_HEARTBEAT:
247           decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
248           break;
249
250         default:
251           /* Ignore the frame by not changing frame_type away from 0. */
252           break;
253       }
254
255       return_to_idle(state);
256
257       if(decoded_frame->frame_type == AMQP_FRAME_METHOD &&
258          decoded_frame->payload.method.id == AMQP_BASIC_RETURN_METHOD) {
259         amqp_basic_return_t *m = decoded_frame->payload.method.decoded;
260         if(state->basic_return_callback)
261           state->basic_return_callback(decoded_frame->channel, m,
262                                        state->basic_return_callback_data);
263       }
264
265       return total_bytes_consumed;
266     }
267
268     case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
269       decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
270       decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
271       amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
272                   "Invalid protocol header received");
273       decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
274       decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
275       decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
276       decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
277
278       return_to_idle(state);
279       return total_bytes_consumed;
280
281     default:
282       amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
283   }
284 }
285
286 amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
287   return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
288 }
289
290 void amqp_release_buffers(amqp_connection_state_t state) {
291   ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
292
293   amqp_assert(state->first_queued_frame == NULL,
294               "Programming error: attempt to amqp_release_buffers while waiting events enqueued");
295
296   recycle_amqp_pool(&state->frame_pool);
297   recycle_amqp_pool(&state->decoding_pool);
298 }
299
300 void amqp_maybe_release_buffers(amqp_connection_state_t state) {
301   if (amqp_release_buffers_ok(state)) {
302     amqp_release_buffers(state);
303   }
304 }
305
306 static int inner_send_frame(amqp_connection_state_t state,
307                             amqp_frame_t const *frame,
308                             amqp_bytes_t *encoded,
309                             int *payload_len)
310 {
311   int separate_body;
312
313   E_8(state->outbound_buffer, 0, frame->frame_type);
314   E_16(state->outbound_buffer, 1, frame->channel);
315   switch (frame->frame_type) {
316     case AMQP_FRAME_METHOD:
317       E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
318       encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
319       encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
320       *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
321                                                           frame->payload.method.decoded,
322                                                           *encoded)) + 4;
323       separate_body = 0;
324       break;
325
326     case AMQP_FRAME_HEADER:
327       E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
328       E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
329       E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
330       encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
331       encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
332       *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
333                                                               frame->payload.properties.decoded,
334                                                               *encoded)) + 12;
335       separate_body = 0;
336       break;
337
338     case AMQP_FRAME_BODY:
339       *encoded = frame->payload.body_fragment;
340       *payload_len = encoded->len;
341       separate_body = 1;
342       break;
343
344     case AMQP_FRAME_HEARTBEAT:
345       *encoded = AMQP_EMPTY_BYTES;
346       *payload_len = 0;
347       separate_body = 0;
348       break;
349
350     default:
351       return -EINVAL;
352   }
353
354   E_32(state->outbound_buffer, 3, *payload_len);
355   if (!separate_body) {
356     E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
357   }
358
359 #if 0
360   if (separate_body) {
361     printf("sending body frame (header):\n");
362     amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
363     printf("sending body frame (payload):\n");
364     amqp_dump(encoded->bytes, *payload_len);
365   } else {
366     printf("sending:\n");
367     amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
368   }
369 #endif
370
371   return separate_body;
372 }
373
374 int amqp_send_frame(amqp_connection_state_t state,
375                     amqp_frame_t const *frame)
376 {
377   amqp_bytes_t encoded;
378   int payload_len;
379   int separate_body;
380
381   separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
382   switch (separate_body) {
383     case 0:
384       AMQP_CHECK_RESULT(write(state->sockfd,
385                               state->outbound_buffer.bytes,
386                               payload_len + (HEADER_SIZE + FOOTER_SIZE)));
387       return 0;
388
389     case 1:
390       AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
391       AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
392       {
393         assert(FOOTER_SIZE == 1);
394         unsigned char frame_end_byte = AMQP_FRAME_END;
395         AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
396       }
397       return 0;
398
399     default:
400       return separate_body;
401   }
402 }
403
404 int amqp_send_frame_to(amqp_connection_state_t state,
405                        amqp_frame_t const *frame,
406                        amqp_output_fn_t fn,
407                        void *context)
408 {
409   amqp_bytes_t encoded;
410   int payload_len;
411   int separate_body;
412
413   separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
414   switch (separate_body) {
415     case 0:
416       AMQP_CHECK_RESULT(fn(context,
417                            state->outbound_buffer.bytes,
418                            payload_len + (HEADER_SIZE + FOOTER_SIZE)));
419       return 0;
420
421     case 1:
422       AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
423       AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
424       {
425         assert(FOOTER_SIZE == 1);
426         unsigned char frame_end_byte = AMQP_FRAME_END;
427         AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
428       }
429       return 0;
430
431     default:
432       return separate_body;
433   }
434 }
Note: See TracBrowser for help on using the browser.