root/src/modules/rabbitmq_driver.c

Revision 9204e489885537dd7a3f75d030daab3243bfe5cf, 14.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

rabbitmq_driver docs

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
31
32 #include "noit_defines.h"
33 #include "noit_module.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "stratcon_iep.h"
37 #include "noit_conf.h"
38 #include "librabbitmq/amqp.h"
39 #include "librabbitmq/amqp_framing.h"
40 #include "rabbitmq_driver.xmlh"
41
42 #include <poll.h>
43 #include <assert.h>
44
45 #define MAX_CONCURRENCY 16
46 #define MAX_HOSTS 10
47
48 static pthread_mutex_t driver_lock;
49 struct amqp_driver {
50   pthread_t owner;
51   amqp_connection_state_t connection;
52   char exchange[128];
53   char routingkey[256];
54   char username[80];
55   char password[80];
56   char vhost[256];
57   int sockfd;
58   int heartbeat;
59   int nhosts;
60   int nconnects;
61   int hostidx;
62   char hostname[10][256];
63   int port;
64   struct timeval last_hb;
65   int has_error; /* out of band */
66 };
67
68 static struct {
69   noit_atomic64_t basic_returns;
70   noit_atomic64_t connects;
71   noit_atomic64_t inbound_methods;
72   noit_atomic64_t inbound_heartbeats;
73   noit_atomic64_t publications;
74   noit_atomic64_t concurrency;
75   struct amqp_driver thread_states[MAX_CONCURRENCY];
76 } stats;
77 #define BUMPSTAT(a) noit_atomic_inc64(&stats.a)
78
79 static iep_thread_driver_t *noit_rabbimq_allocate() {
80   char *hostname, *cp, *brk;
81   struct amqp_driver *dr = NULL;
82   int i;
83
84   pthread_mutex_lock(&driver_lock);
85   for(i=0; i<MAX_HOSTS; i++) {
86     if(stats.thread_states[i].owner == NULL) {
87       stats.thread_states[i].owner = pthread_self();
88       dr = &stats.thread_states[i];
89       break;
90     }
91   }
92   pthread_mutex_unlock(&driver_lock);
93   if(!dr) return NULL;
94   dr->nconnects = rand();
95 #define GETCONFSTR(w) noit_conf_get_stringbuf(NULL, "/stratcon/iep/mq/" #w, dr->w, sizeof(dr->w))
96   GETCONFSTR(exchange);
97   if(!GETCONFSTR(routingkey))
98     dr->routingkey[0] = '\0';
99   GETCONFSTR(username);
100   GETCONFSTR(password);
101   if(!GETCONFSTR(vhost)) { dr->vhost[0] = '/'; dr->vhost[1] = '\0'; }
102   if(!noit_conf_get_int(NULL, "/stratcon/iep/mq/heartbeat", &dr->heartbeat))
103     dr->heartbeat = 5000;
104   dr->heartbeat = (dr->heartbeat + 999) / 1000;
105
106   noit_conf_get_string(NULL, "/stratcon/iep/mq/hostname", &hostname);
107   if(!hostname) hostname = strdup("127.0.0.1");
108   for(cp = hostname; cp; cp = strchr(cp+1, ',')) dr->nhosts++;
109   if(dr->nhosts > MAX_HOSTS) dr->nhosts = MAX_HOSTS;
110   for(i = 0, cp = strtok_r(hostname, ",", &brk);
111       cp; cp = strtok_r(NULL, ",", &brk), i++)
112     strlcpy(dr->hostname[i], cp, sizeof(dr->hostname[i]));
113   free(hostname);
114
115   if(!noit_conf_get_int(NULL, "/stratcon/iep/mq/port", &dr->port))
116     dr->port = 5672;
117   noit_atomic_inc64(&stats.concurrency);
118   return (iep_thread_driver_t *)dr;
119 }
120 static int noit_rabbimq_disconnect(iep_thread_driver_t *d) {
121   struct amqp_driver *dr = (struct amqp_driver *)d;
122   if(dr->connection) {
123     amqp_destroy_connection(dr->connection);
124     dr->sockfd = -1;
125     dr->connection = NULL;
126     return 0;
127   }
128   return -1;
129 }
130 static void noit_rabbimq_deallocate(iep_thread_driver_t *d) {
131   struct amqp_driver *dr = (struct amqp_driver *)d;
132   int i;
133   noit_rabbimq_disconnect(d);
134   pthread_mutex_lock(&driver_lock);
135   memset(dr, 0, sizeof(dr));
136   pthread_mutex_unlock(&driver_lock);
137   noit_atomic_dec64(&stats.concurrency);
138   free(dr);
139 }
140 static void noit_rabbitmq_read_frame(struct amqp_driver *dr) {
141   struct pollfd p;
142   if(!dr->connection) return;
143   while(1) {
144     memset(&p, 0, sizeof(p));
145     p.fd = dr->sockfd;
146     p.events = POLLIN;
147     if(poll(&p, 1, 0)) {
148       int rv;
149       amqp_frame_t f;
150       rv = amqp_simple_wait_frame(dr->connection, &f);
151       if(rv > 0) {
152         if(f.frame_type == AMQP_FRAME_HEARTBEAT) {
153           BUMPSTAT(inbound_heartbeats);
154           noitL(noit_debug, "amqp <- hearbeat\n");
155         }
156         else if(f.frame_type == AMQP_FRAME_METHOD) {
157           BUMPSTAT(inbound_methods);
158           noitL(noit_error, "amqp <- method [%s]\n", amqp_method_name(f.payload.method.id));
159           dr->has_error = 1;
160           switch(f.payload.method.id) {
161             case AMQP_CHANNEL_CLOSE_METHOD: {
162                 amqp_channel_close_t *m = (amqp_channel_close_t *) f.payload.method.decoded;
163                 noitL(noit_error, "AMQP channel close error %d: %s\n",
164                       m->reply_code, (char *)m->reply_text.bytes);
165               }
166               break;
167             case AMQP_CONNECTION_CLOSE_METHOD: {
168                 amqp_connection_close_t *m = (amqp_connection_close_t *) f.payload.method.decoded;
169                 noitL(noit_error, "AMQP connection close error %d: %s\n",
170                       m->reply_code, (char *)m->reply_text.bytes);
171               }
172               break;
173           }
174         }
175         else {
176           noitL(noit_error, "amqp <- frame [%d]\n", f.frame_type);
177         }
178       }
179       else break;
180     }
181     else break;
182   }
183 }
184 static void noit_rabbitmq_heartbeat(struct amqp_driver *dr) {
185   struct timeval n, d;
186   if(!dr->connection) return;
187   gettimeofday(&n, NULL);
188   sub_timeval(n, dr->last_hb, &d);
189   if(d.tv_sec >= dr->heartbeat) {
190     amqp_frame_t f;
191     f.frame_type = AMQP_FRAME_HEARTBEAT;
192     f.channel = 0;
193     amqp_send_frame(dr->connection, &f);
194     noitL(noit_debug, "amqp -> hearbeat\n");
195     memcpy(&dr->last_hb, &n, sizeof(n));
196   }
197 }
198 static void
199 noit_rabbitmq_brcb(amqp_channel_t channel, amqp_basic_return_t *m, void *closure) {
200   BUMPSTAT(basic_returns);
201   noitL(noit_error, "AMQP return [%d:%.*s]\n", m->reply_code,
202         (int)m->reply_text.len, (char *)m->reply_text.bytes);
203 }
204 static int noit_rabbimq_connect(iep_thread_driver_t *dr) {
205   struct amqp_driver *driver = (struct amqp_driver *)dr;
206
207   if(!driver->connection) {
208     int sidx = driver->nconnects++ % driver->nhosts;
209     struct timeval timeout;
210     amqp_rpc_reply_t r, *rptr;
211
212     noitL(noit_error, "AMQP connect: %s:%d\n",
213           driver->hostname[sidx], driver->port);
214     BUMPSTAT(connects);
215     driver->hostidx = sidx;
216     timeout.tv_sec = driver->heartbeat;
217     timeout.tv_usec = 0;
218     driver->sockfd = amqp_open_socket(driver->hostname[sidx], driver->port, &timeout);
219     if(driver->sockfd < 0) {
220       noitL(noit_error, "AMQP connect failed: %s:%d\n",
221             driver->hostname[sidx], driver->port);
222       return -1;
223     }
224     driver->has_error = 0;
225     driver->connection = amqp_new_connection();
226     amqp_set_basic_return_cb(driver->connection, noit_rabbitmq_brcb, driver);
227     amqp_set_sockfd(driver->connection, driver->sockfd);
228     r = amqp_login(driver->connection,
229                    driver->vhost, 0, 131072, driver->heartbeat,
230                    AMQP_SASL_METHOD_PLAIN,
231                    driver->username, driver->password);
232     if(r.reply_type != AMQP_RESPONSE_NORMAL) {
233       noitL(noit_error, "AMQP login failed\n");
234       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
235       amqp_destroy_connection(driver->connection);
236       driver->connection = NULL;
237       return -1;
238     }
239
240     amqp_channel_open(driver->connection, 1);
241     rptr = amqp_get_rpc_reply();
242     if(rptr->reply_type != AMQP_RESPONSE_NORMAL) {
243       noitL(noit_error, "AMQP channe_open failed\n");
244       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
245       amqp_destroy_connection(driver->connection);
246       driver->connection = NULL;
247       return -1;
248     }
249     gettimeofday(&driver->last_hb, NULL);
250     return 0;
251   }
252   /* 1 means already connected */
253   return 1;
254 }
255
256 /* This is very specific to an internal implementation somewhere...
257  * and thus unlikely to be useful unless people name their checks:
258  * c_<accountid>_<checknumber>::<rest of name>
259  * This code should likley be made generic, perhaps with named
260  * pcre captures.  However, I'm worried about performance.
261  * For now, leave it and understand it is limited usefulness.
262  */
263 static int extract_uuid_from_jlog(const char *payload, size_t payloadlen,
264                                   int *account_id, int *check_id, char *dst) {
265   int i = 0;
266   const char *atab = payload, *u = NULL;
267
268   if(account_id) *account_id = 0;
269   if(check_id) *check_id = 0;
270
271 #define advance_past_tab do { \
272   atab = memchr(atab, '\t', payloadlen - (atab - payload)); \
273   if(!atab) return 0; \
274   atab++; \
275 } while(0)
276
277   /* Tab -> M|S|C */
278   advance_past_tab;
279   /* Tab -> noit IP */
280   advance_past_tab;
281   /* Tab -> timestamp */
282   advance_past_tab;
283   /* Tab -> uuid */
284   u = atab;
285   advance_past_tab;
286   /* Tab -> metric_name */
287   atab--;
288   if(atab - u < UUID_STR_LEN) return 0;
289   if(atab - u > UUID_STR_LEN) {
290     const char *f;
291     f = memchr(u, '`', payloadlen - (u - payload));
292     if(f) {
293       f = memchr(f+1, '`', payloadlen - (f + 1 - payload));
294       if(f) {
295         f++;
296         if(memcmp(f, "c_", 2) == 0) {
297           f += 2;
298           if(account_id) *account_id = atoi(f);
299           f = memchr(f, '_', payloadlen - (f - payload));
300           if(f) {
301             f++;
302             if(check_id) *check_id = atoi(f);
303           }
304         }
305       }
306     }
307   }
308   u = atab - UUID_STR_LEN;
309   while(i<32 && u < atab) {
310     if((*u >= 'a' && *u <= 'f') ||
311        (*u >= '0' && *u <= '9')) {
312       dst[i*2] = '.';
313       dst[i*2 + 1] = *u;
314       i++;
315     }
316     else if(*u != '-') return 0;
317     u++;
318   }
319   dst[i*2] = '\0';
320   return 1;
321 }
322 static int
323 noit_rabbimq_submit(iep_thread_driver_t *dr,
324                     const char *payload, size_t payloadlen) {
325   int rv;
326   amqp_bytes_t body;
327   struct amqp_driver *driver = (struct amqp_driver *)dr;
328   const char *routingkey = driver->routingkey;
329
330   body.len = payloadlen;
331   body.bytes = (char *)payload;
332   if(*payload == 'M' ||
333      *payload == 'S' ||
334      *payload == 'C') {
335     char uuid_str[32 * 2 + 1];
336     int account_id, check_id;
337     if(extract_uuid_from_jlog(payload, payloadlen,
338                               &account_id, &check_id, uuid_str)) {
339       if(*routingkey) {
340         char *replace;
341         int newlen = strlen(driver->routingkey) + 1 + sizeof(uuid_str) + 2 * 32;
342         replace = alloca(newlen);
343         snprintf(replace, newlen, "%s.%d.%d%s", driver->routingkey,
344                  account_id, check_id, uuid_str);
345         routingkey = replace;
346       }
347     }
348   }
349   rv = amqp_basic_publish(driver->connection, 1,
350                           amqp_cstring_bytes(driver->exchange),
351                           amqp_cstring_bytes(routingkey),
352                           1, 0, NULL, body);
353   if(rv < 0) {
354     noitL(noit_error, "AMQP publish failed, disconnecting\n");
355     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
356     amqp_destroy_connection(driver->connection);
357     driver->connection = NULL;
358     return -1;
359   }
360   BUMPSTAT(publications);
361   noit_rabbitmq_heartbeat(driver);
362   noit_rabbitmq_read_frame(driver);
363   amqp_maybe_release_buffers(driver->connection);
364   if(driver->has_error) {
365     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
366     amqp_destroy_connection(driver->connection);
367     driver->connection = NULL;
368     return -1;
369   }
370   return 0;
371 }
372
373 mq_driver_t mq_driver_rabbitmq = {
374   noit_rabbimq_allocate,
375   noit_rabbimq_connect,
376   noit_rabbimq_submit,
377   noit_rabbimq_disconnect,
378   noit_rabbimq_deallocate
379 };
380
381 static int noit_rabbimq_driver_config(noit_module_generic_t *self, noit_hash_table *o) {
382   return 0;
383 }
384 static int noit_rabbimq_driver_onload(noit_image_t *self) {
385   return 0;
386 }
387
388 static int
389 noit_console_show_rabbitmq(noit_console_closure_t ncct,
390                            int argc, char **argv,
391                            noit_console_state_t *dstate,
392                            void *closure) {
393   int i;
394   nc_printf(ncct, " == RabbitMQ ==\n");
395   nc_printf(ncct, " Concurrency:           %llu\n", stats.concurrency);
396   nc_printf(ncct, " Connects:              %llu\n", stats.connects);
397   nc_printf(ncct, " AMQP basic returns:    %llu\n", stats.basic_returns);
398   nc_printf(ncct, " AMQP methods (in):     %llu\n", stats.inbound_methods);
399   nc_printf(ncct, " AMQP heartbeats (in):  %llu\n", stats.inbound_heartbeats);
400   nc_printf(ncct, " AMQP basic publish:    %llu\n", stats.publications);
401   pthread_mutex_lock(&driver_lock);
402   for(i=0;i<MAX_HOSTS;i++) {
403     struct amqp_driver *dr;
404     if(!stats.thread_states[i].owner) continue;
405     dr = &stats.thread_states[i];
406     nc_printf(ncct, "   == connection: %p ==\n", (void *)dr->owner);
407     if(dr->connection)
408       nc_printf(ncct, "     %s@%s:%d (vhost: %s, exchange: %s)\n",
409                 dr->username, dr->hostname[dr->hostidx], dr->port, dr->vhost,
410                 dr->exchange);
411     else
412       nc_printf(ncct, "     not connected\n");
413   }
414   pthread_mutex_unlock(&driver_lock);
415 }
416 static void
417 register_console_rabbitmq_commands() {
418   noit_console_state_t *tl;
419   cmd_info_t *showcmd;
420
421   tl = noit_console_state_initial();
422   showcmd = noit_console_state_get_cmd(tl, "show");
423   assert(showcmd && showcmd->dstate);
424   noit_console_state_add_cmd(showcmd->dstate,
425     NCSCMD("rabbitmq", noit_console_show_rabbitmq, NULL, NULL, NULL));
426 }
427
428 static int noit_rabbimq_driver_init(noit_module_generic_t *self) {
429   pthread_mutex_init(&driver_lock, NULL);
430   memset(&stats, 0, sizeof(stats));
431   stratcon_iep_mq_driver_register("rabbitmq", &mq_driver_rabbitmq);
432   register_console_rabbitmq_commands();
433   return 0;
434 }
435
436 noit_module_generic_t rabbitmq_driver = {
437   {
438     NOIT_GENERIC_MAGIC,
439     NOIT_GENERIC_ABI_VERSION,
440     "rabbitmq_driver",
441     "AMQP driver for IEP MQ submission",
442     rabbitmq_driver_xml_description,
443     noit_rabbimq_driver_onload
444   },
445   noit_rabbimq_driver_config,
446   noit_rabbimq_driver_init
447 };
448
Note: See TracBrowser for help on using the browser.