root/src/modules/librabbitmq/amqp_connection.c

Revision d0a64b649e4eac288431ac20d986830b57fb044d, 13.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

Add support for riemann as the IEP subsystem.
Remove all traces of Esper.
Change the license on all our bits to simply match reconnoiter.
Cleanup copyrights and embelish auditing script.
Updated test 108 to check riemann iep results.

  • Property mode set to 100644
Line 
1 /* MOZILLA PUBLIC LICENSE Version 1.1 -- see LICENSE-MPL-RabbitMQ */
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <stdint.h>
6 #include <errno.h>
7
8 #include <unistd.h>
9 #include <sys/uio.h>
10
11 #include "amqp.h"
12 #include "amqp_framing.h"
13 #include "amqp_private.h"
14
15 #include <assert.h>
16
17 #define INITIAL_FRAME_POOL_PAGE_SIZE 65536
18 #define INITIAL_DECODING_POOL_PAGE_SIZE 131072
19 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
20
21 #define ENFORCE_STATE(statevec, statenum)                               \
22   {                                                                     \
23     amqp_connection_state_t _check_state = (statevec);                  \
24     int _wanted_state = (statenum);                                     \
25     amqp_assert(_check_state->state == _wanted_state,                   \
26                 "Programming error: invalid AMQP connection state: expected %d, got %d", \
27                 _wanted_state,                                          \
28                 _check_state->state);                                   \
29   }
30
31 amqp_connection_state_t amqp_new_connection(void) {
32   amqp_connection_state_t state =
33     (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
34
35   if (state == NULL) {
36     return NULL;
37   }
38
39   init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
40   init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
41
42   state->state = CONNECTION_STATE_IDLE;
43
44   state->inbound_buffer.bytes = NULL;
45   state->outbound_buffer.bytes = NULL;
46   if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
47     return NULL;
48   }
49
50   state->inbound_offset = 0;
51   state->target_size = HEADER_SIZE;
52
53   state->sockfd = -1;
54   state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
55   state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
56   if (state->sock_inbound_buffer.bytes == NULL) {
57     amqp_destroy_connection(state);
58     return NULL;
59   }
60
61   state->sock_inbound_offset = 0;
62   state->sock_inbound_limit = 0;
63
64   state->first_queued_frame = NULL;
65   state->last_queued_frame = NULL;
66
67   return state;
68 }
69
70 int amqp_get_sockfd(amqp_connection_state_t state) {
71   return state->sockfd;
72 }
73
74 void amqp_set_sockfd(amqp_connection_state_t state,
75                      int sockfd)
76 {
77   state->sockfd = sockfd;
78 }
79
80 int amqp_tune_connection(amqp_connection_state_t state,
81                          int channel_max,
82                          int frame_max,
83                          int heartbeat)
84 {
85   void *newbuf;
86
87   ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
88
89   state->channel_max = channel_max;
90   state->frame_max = frame_max;
91   state->heartbeat = heartbeat;
92
93   empty_amqp_pool(&state->frame_pool);
94   init_amqp_pool(&state->frame_pool, frame_max);
95
96   state->inbound_buffer.len = frame_max;
97   state->outbound_buffer.len = frame_max;
98   newbuf = realloc(state->outbound_buffer.bytes, frame_max);
99   if (newbuf == NULL) {
100     amqp_destroy_connection(state);
101     return -ENOMEM;
102   }
103   state->outbound_buffer.bytes = newbuf;
104
105   return 0;
106 }
107
108 int amqp_get_channel_max(amqp_connection_state_t state) {
109   return state->channel_max;
110 }
111
112 void amqp_destroy_connection(amqp_connection_state_t state) {
113   empty_amqp_pool(&state->frame_pool);
114   empty_amqp_pool(&state->decoding_pool);
115   free(state->outbound_buffer.bytes);
116   free(state->sock_inbound_buffer.bytes);
117   free(state);
118 }
119
120 static void return_to_idle(amqp_connection_state_t state) {
121   state->inbound_buffer.bytes = NULL;
122   state->inbound_offset = 0;
123   state->target_size = HEADER_SIZE;
124   state->state = CONNECTION_STATE_IDLE;
125 }
126
127 void amqp_set_basic_return_cb(amqp_connection_state_t state,
128                               amqp_basic_return_fn_t f, void *data) {
129   state->basic_return_callback = f;
130   state->basic_return_callback_data = data;
131 }
132 int amqp_handle_input(amqp_connection_state_t state,
133                       amqp_bytes_t received_data,
134                       amqp_frame_t *decoded_frame)
135 {
136   int total_bytes_consumed = 0;
137   int bytes_consumed;
138
139   /* Returning frame_type of zero indicates either insufficient input,
140      or a complete, ignored frame was read. */
141   decoded_frame->frame_type = 0;
142
143  read_more:
144   if (received_data.len == 0) {
145     return total_bytes_consumed;
146   }
147
148   if (state->state == CONNECTION_STATE_IDLE) {
149     state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
150     state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
151   }
152
153   bytes_consumed = state->target_size - state->inbound_offset;
154   if (received_data.len < bytes_consumed) {
155     bytes_consumed = received_data.len;
156   }
157
158   E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
159   state->inbound_offset += bytes_consumed;
160   total_bytes_consumed += bytes_consumed;
161
162   assert(state->inbound_offset <= state->target_size);
163
164   if (state->inbound_offset < state->target_size) {
165     return total_bytes_consumed;
166   }
167
168   switch (state->state) {
169     case CONNECTION_STATE_WAITING_FOR_HEADER:
170       if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
171           D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
172       {
173         state->target_size = 8;
174         state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
175       } else {
176         state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
177         state->state = CONNECTION_STATE_WAITING_FOR_BODY;
178       }
179
180       /* Wind buffer forward, and try to read some body out of it. */
181       received_data.len -= bytes_consumed;
182       received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
183       goto read_more;
184
185     case CONNECTION_STATE_WAITING_FOR_BODY: {
186       int frame_type = D_8(state->inbound_buffer, 0);
187
188 #if 0
189       printf("recving:\n");
190       amqp_dump(state->inbound_buffer.bytes, state->target_size);
191 #endif
192
193       /* Check frame end marker (footer) */
194       if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
195         return -EINVAL;
196       }
197
198       decoded_frame->channel = D_16(state->inbound_buffer, 1);
199
200       switch (frame_type) {
201         case AMQP_FRAME_METHOD: {
202           amqp_bytes_t encoded;
203
204           /* Four bytes of method ID before the method args. */
205           encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
206           encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
207
208           decoded_frame->frame_type = AMQP_FRAME_METHOD;
209           decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
210           (void)AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
211                                                &state->decoding_pool,
212                                                encoded,
213                                                &decoded_frame->payload.method.decoded));
214           break;
215         }
216
217         case AMQP_FRAME_HEADER: {
218           amqp_bytes_t encoded;
219
220           /* 12 bytes for properties header. */
221           encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
222           encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
223
224           decoded_frame->frame_type = AMQP_FRAME_HEADER;
225           decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
226           decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
227           (void)AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
228                                                    &state->decoding_pool,
229                                                    encoded,
230                                                    &decoded_frame->payload.properties.decoded));
231           break;
232         }
233
234         case AMQP_FRAME_BODY: {
235           size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
236
237           decoded_frame->frame_type = AMQP_FRAME_BODY;
238           decoded_frame->payload.body_fragment.len = fragment_len;
239           decoded_frame->payload.body_fragment.bytes =
240             D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
241           break;
242         }
243
244         case AMQP_FRAME_HEARTBEAT:
245           decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
246           break;
247
248         default:
249           /* Ignore the frame by not changing frame_type away from 0. */
250           break;
251       }
252
253       return_to_idle(state);
254
255       if(decoded_frame->frame_type == AMQP_FRAME_METHOD &&
256          decoded_frame->payload.method.id == AMQP_BASIC_RETURN_METHOD) {
257         amqp_basic_return_t *m = decoded_frame->payload.method.decoded;
258         if(state->basic_return_callback)
259           state->basic_return_callback(decoded_frame->channel, m,
260                                        state->basic_return_callback_data);
261       }
262
263       return total_bytes_consumed;
264     }
265
266     case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
267       decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
268       decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
269       amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
270                   "Invalid protocol header received");
271       decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
272       decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
273       decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
274       decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
275
276       return_to_idle(state);
277       return total_bytes_consumed;
278
279     default:
280       amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
281   }
282 }
283
284 amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
285   return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
286 }
287
288 void amqp_release_buffers(amqp_connection_state_t state) {
289   ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
290
291   amqp_assert(state->first_queued_frame == NULL,
292               "Programming error: attempt to amqp_release_buffers while waiting events enqueued");
293
294   recycle_amqp_pool(&state->frame_pool);
295   recycle_amqp_pool(&state->decoding_pool);
296 }
297
298 void amqp_maybe_release_buffers(amqp_connection_state_t state) {
299   if (amqp_release_buffers_ok(state)) {
300     amqp_release_buffers(state);
301   }
302 }
303
304 static int inner_send_frame(amqp_connection_state_t state,
305                             amqp_frame_t const *frame,
306                             amqp_bytes_t *encoded,
307                             int *payload_len)
308 {
309   int separate_body;
310
311   E_8(state->outbound_buffer, 0, frame->frame_type);
312   E_16(state->outbound_buffer, 1, frame->channel);
313   switch (frame->frame_type) {
314     case AMQP_FRAME_METHOD:
315       E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
316       encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
317       encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
318       *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
319                                                           frame->payload.method.decoded,
320                                                           *encoded)) + 4;
321       separate_body = 0;
322       break;
323
324     case AMQP_FRAME_HEADER:
325       E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
326       E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
327       E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
328       encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
329       encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
330       *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
331                                                               frame->payload.properties.decoded,
332                                                               *encoded)) + 12;
333       separate_body = 0;
334       break;
335
336     case AMQP_FRAME_BODY:
337       *encoded = frame->payload.body_fragment;
338       *payload_len = encoded->len;
339       separate_body = 1;
340       break;
341
342     case AMQP_FRAME_HEARTBEAT:
343       *encoded = AMQP_EMPTY_BYTES;
344       *payload_len = 0;
345       separate_body = 0;
346       break;
347
348     default:
349       return -EINVAL;
350   }
351
352   E_32(state->outbound_buffer, 3, *payload_len);
353   if (!separate_body) {
354     E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
355   }
356
357 #if 0
358   if (separate_body) {
359     printf("sending body frame (header):\n");
360     amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
361     printf("sending body frame (payload):\n");
362     amqp_dump(encoded->bytes, *payload_len);
363   } else {
364     printf("sending:\n");
365     amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
366   }
367 #endif
368
369   return separate_body;
370 }
371
372 int amqp_send_frame(amqp_connection_state_t state,
373                     amqp_frame_t const *frame)
374 {
375   amqp_bytes_t encoded = { .len = 0, .bytes = NULL };
376   int payload_len;
377   int separate_body;
378
379   separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
380   switch (separate_body) {
381     case 0:
382       (void)AMQP_CHECK_RESULT(write(state->sockfd,
383                               state->outbound_buffer.bytes,
384                               payload_len + (HEADER_SIZE + FOOTER_SIZE)));
385       return 0;
386
387     case 1:
388       (void)AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
389       (void)AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
390       {
391         assert(FOOTER_SIZE == 1);
392         unsigned char frame_end_byte = AMQP_FRAME_END;
393         (void)AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
394       }
395       return 0;
396
397     default:
398       return separate_body;
399   }
400 }
401
402 int amqp_send_frame_to(amqp_connection_state_t state,
403                        amqp_frame_t const *frame,
404                        amqp_output_fn_t fn,
405                        void *context)
406 {
407   amqp_bytes_t encoded = { .len = 0, .bytes = NULL };
408   int payload_len;
409   int separate_body;
410
411   separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
412   switch (separate_body) {
413     case 0:
414       (void)AMQP_CHECK_RESULT(fn(context,
415                            state->outbound_buffer.bytes,
416                            payload_len + (HEADER_SIZE + FOOTER_SIZE)));
417       return 0;
418
419     case 1:
420       (void)AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
421       (void)AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
422       {
423         assert(FOOTER_SIZE == 1);
424         unsigned char frame_end_byte = AMQP_FRAME_END;
425         (void)AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
426       }
427       return 0;
428
429     default:
430       return separate_body;
431   }
432 }
Note: See TracBrowser for help on using the browser.