root/src/modules/rabbitmq_driver.c

Revision 1e8ae2b61ace7f9ef47a4f5c190bd2df06b04588, 14.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

initial import of a failover-aware AMQP client courtesy of Circonus

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
31
32 #include "noit_defines.h"
33 #include "noit_module.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "stratcon_iep.h"
37 #include "noit_conf.h"
38 #include "librabbitmq/amqp.h"
39 #include "librabbitmq/amqp_framing.h"
40
41 #include <poll.h>
42 #include <assert.h>
43
44 #define MAX_CONCURRENCY 16
45 #define MAX_HOSTS 10
46
47 static pthread_mutex_t driver_lock;
48 struct amqp_driver {
49   pthread_t owner;
50   amqp_connection_state_t connection;
51   char exchange[128];
52   char routingkey[256];
53   char username[80];
54   char password[80];
55   char vhost[256];
56   int sockfd;
57   int heartbeat;
58   int nhosts;
59   int nconnects;
60   int hostidx;
61   char hostname[10][256];
62   int port;
63   struct timeval last_hb;
64   int has_error; /* out of band */
65 };
66
67 static struct {
68   noit_atomic64_t basic_returns;
69   noit_atomic64_t connects;
70   noit_atomic64_t inbound_methods;
71   noit_atomic64_t inbound_heartbeats;
72   noit_atomic64_t publications;
73   noit_atomic64_t concurrency;
74   struct amqp_driver thread_states[MAX_CONCURRENCY];
75 } stats;
76 #define BUMPSTAT(a) noit_atomic_inc64(&stats.a)
77
78 static iep_thread_driver_t *noit_rabbimq_allocate() {
79   char *hostname, *cp, *brk;
80   struct amqp_driver *dr = NULL;
81   int i;
82
83   pthread_mutex_lock(&driver_lock);
84   for(i=0; i<MAX_HOSTS; i++) {
85     if(stats.thread_states[i].owner == NULL) {
86       stats.thread_states[i].owner = pthread_self();
87       dr = &stats.thread_states[i];
88       break;
89     }
90   }
91   pthread_mutex_unlock(&driver_lock);
92   if(!dr) return NULL;
93   dr->nconnects = rand();
94 #define GETCONFSTR(w) noit_conf_get_stringbuf(NULL, "/stratcon/iep/mq/" #w, dr->w, sizeof(dr->w))
95   GETCONFSTR(exchange);
96   if(!GETCONFSTR(routingkey))
97     dr->routingkey[0] = '\0';
98   GETCONFSTR(username);
99   GETCONFSTR(password);
100   if(!GETCONFSTR(vhost)) { dr->vhost[0] = '/'; dr->vhost[1] = '\0'; }
101   if(!noit_conf_get_int(NULL, "/stratcon/iep/mq/heartbeat", &dr->heartbeat))
102     dr->heartbeat = 5000;
103   dr->heartbeat = (dr->heartbeat + 999) / 1000;
104
105   noit_conf_get_string(NULL, "/stratcon/iep/mq/hostname", &hostname);
106   if(!hostname) hostname = strdup("127.0.0.1");
107   for(cp = hostname; cp; cp = strchr(cp+1, ',')) dr->nhosts++;
108   if(dr->nhosts > MAX_HOSTS) dr->nhosts = MAX_HOSTS;
109   for(i = 0, cp = strtok_r(hostname, ",", &brk);
110       cp; cp = strtok_r(NULL, ",", &brk), i++)
111     strlcpy(dr->hostname[i], cp, sizeof(dr->hostname[i]));
112   free(hostname);
113
114   if(!noit_conf_get_int(NULL, "/stratcon/iep/mq/port", &dr->port))
115     dr->port = 5672;
116   noit_atomic_inc64(&stats.concurrency);
117   return (iep_thread_driver_t *)dr;
118 }
119 static int noit_rabbimq_disconnect(iep_thread_driver_t *d) {
120   struct amqp_driver *dr = (struct amqp_driver *)d;
121   if(dr->connection) {
122     amqp_destroy_connection(dr->connection);
123     dr->sockfd = -1;
124     dr->connection = NULL;
125     return 0;
126   }
127   return -1;
128 }
129 static void noit_rabbimq_deallocate(iep_thread_driver_t *d) {
130   struct amqp_driver *dr = (struct amqp_driver *)d;
131   int i;
132   noit_rabbimq_disconnect(d);
133   pthread_mutex_lock(&driver_lock);
134   memset(dr, 0, sizeof(dr));
135   pthread_mutex_unlock(&driver_lock);
136   noit_atomic_dec64(&stats.concurrency);
137   free(dr);
138 }
139 static void noit_rabbitmq_read_frame(struct amqp_driver *dr) {
140   struct pollfd p;
141   if(!dr->connection) return;
142   while(1) {
143     memset(&p, 0, sizeof(p));
144     p.fd = dr->sockfd;
145     p.events = POLLIN;
146     if(poll(&p, 1, 0)) {
147       int rv;
148       amqp_frame_t f;
149       rv = amqp_simple_wait_frame(dr->connection, &f);
150       if(rv > 0) {
151         if(f.frame_type == AMQP_FRAME_HEARTBEAT) {
152           BUMPSTAT(inbound_heartbeats);
153           noitL(noit_debug, "amqp <- hearbeat\n");
154         }
155         else if(f.frame_type == AMQP_FRAME_METHOD) {
156           BUMPSTAT(inbound_methods);
157           noitL(noit_error, "amqp <- method [%s]\n", amqp_method_name(f.payload.method.id));
158           dr->has_error = 1;
159           switch(f.payload.method.id) {
160             case AMQP_CHANNEL_CLOSE_METHOD: {
161                 amqp_channel_close_t *m = (amqp_channel_close_t *) f.payload.method.decoded;
162                 noitL(noit_error, "AMQP channel close error %d: %s\n",
163                       m->reply_code, (char *)m->reply_text.bytes);
164               }
165               break;
166             case AMQP_CONNECTION_CLOSE_METHOD: {
167                 amqp_connection_close_t *m = (amqp_connection_close_t *) f.payload.method.decoded;
168                 noitL(noit_error, "AMQP connection close error %d: %s\n",
169                       m->reply_code, (char *)m->reply_text.bytes);
170               }
171               break;
172           }
173         }
174         else {
175           noitL(noit_error, "amqp <- frame [%d]\n", f.frame_type);
176         }
177       }
178       else break;
179     }
180     else break;
181   }
182 }
183 static void noit_rabbitmq_heartbeat(struct amqp_driver *dr) {
184   struct timeval n, d;
185   if(!dr->connection) return;
186   gettimeofday(&n, NULL);
187   sub_timeval(n, dr->last_hb, &d);
188   if(d.tv_sec >= dr->heartbeat) {
189     amqp_frame_t f;
190     f.frame_type = AMQP_FRAME_HEARTBEAT;
191     f.channel = 0;
192     amqp_send_frame(dr->connection, &f);
193     noitL(noit_debug, "amqp -> hearbeat\n");
194     memcpy(&dr->last_hb, &n, sizeof(n));
195   }
196 }
197 static void
198 noit_rabbitmq_brcb(amqp_channel_t channel, amqp_basic_return_t *m, void *closure) {
199   BUMPSTAT(basic_returns);
200   noitL(noit_error, "AMQP return [%d:%.*s]\n", m->reply_code,
201         (int)m->reply_text.len, (char *)m->reply_text.bytes);
202 }
203 static int noit_rabbimq_connect(iep_thread_driver_t *dr) {
204   struct amqp_driver *driver = (struct amqp_driver *)dr;
205
206   if(!driver->connection) {
207     int sidx = driver->nconnects++ % driver->nhosts;
208     struct timeval timeout;
209     amqp_rpc_reply_t r, *rptr;
210
211     noitL(noit_error, "AMQP connect: %s:%d\n",
212           driver->hostname[sidx], driver->port);
213     BUMPSTAT(connects);
214     driver->hostidx = sidx;
215     timeout.tv_sec = driver->heartbeat;
216     timeout.tv_usec = 0;
217     driver->sockfd = amqp_open_socket(driver->hostname[sidx], driver->port, &timeout);
218     if(driver->sockfd < 0) {
219       noitL(noit_error, "AMQP connect failed: %s:%d\n",
220             driver->hostname[sidx], driver->port);
221       return -1;
222     }
223     driver->has_error = 0;
224     driver->connection = amqp_new_connection();
225     amqp_set_basic_return_cb(driver->connection, noit_rabbitmq_brcb, driver);
226     amqp_set_sockfd(driver->connection, driver->sockfd);
227     r = amqp_login(driver->connection,
228                    driver->vhost, 0, 131072, driver->heartbeat,
229                    AMQP_SASL_METHOD_PLAIN,
230                    driver->username, driver->password);
231     if(r.reply_type != AMQP_RESPONSE_NORMAL) {
232       noitL(noit_error, "AMQP login failed\n");
233       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
234       amqp_destroy_connection(driver->connection);
235       driver->connection = NULL;
236       return -1;
237     }
238
239     amqp_channel_open(driver->connection, 1);
240     rptr = amqp_get_rpc_reply();
241     if(rptr->reply_type != AMQP_RESPONSE_NORMAL) {
242       noitL(noit_error, "AMQP channe_open failed\n");
243       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
244       amqp_destroy_connection(driver->connection);
245       driver->connection = NULL;
246       return -1;
247     }
248     gettimeofday(&driver->last_hb, NULL);
249     return 0;
250   }
251   /* 1 means already connected */
252   return 1;
253 }
254
255 /* This is very specific to an internal implementation somewhere...
256  * and thus unlikely to be useful unless people name their checks:
257  * c_<accountid>_<checknumber>::<rest of name>
258  * This code should likley be made generic, perhaps with named
259  * pcre captures.  However, I'm worried about performance.
260  * For now, leave it and understand it is limited usefulness.
261  */
262 static int extract_uuid_from_jlog(const char *payload, size_t payloadlen,
263                                   int *account_id, int *check_id, char *dst) {
264   int i = 0;
265   const char *atab = payload, *u = NULL;
266
267   if(account_id) *account_id = 0;
268   if(check_id) *check_id = 0;
269
270 #define advance_past_tab do { \
271   atab = memchr(atab, '\t', payloadlen - (atab - payload)); \
272   if(!atab) return 0; \
273   atab++; \
274 } while(0)
275
276   /* Tab -> M|S|C */
277   advance_past_tab;
278   /* Tab -> noit IP */
279   advance_past_tab;
280   /* Tab -> timestamp */
281   advance_past_tab;
282   /* Tab -> uuid */
283   u = atab;
284   advance_past_tab;
285   /* Tab -> metric_name */
286   atab--;
287   if(atab - u < UUID_STR_LEN) return 0;
288   if(atab - u > UUID_STR_LEN) {
289     const char *f;
290     f = memchr(u, '`', payloadlen - (u - payload));
291     if(f) {
292       f = memchr(f+1, '`', payloadlen - (f + 1 - payload));
293       if(f) {
294         f++;
295         if(memcmp(f, "c_", 2) == 0) {
296           f += 2;
297           if(account_id) *account_id = atoi(f);
298           f = memchr(f, '_', payloadlen - (f - payload));
299           if(f) {
300             f++;
301             if(check_id) *check_id = atoi(f);
302           }
303         }
304       }
305     }
306   }
307   u = atab - UUID_STR_LEN;
308   while(i<32 && u < atab) {
309     if((*u >= 'a' && *u <= 'f') ||
310        (*u >= '0' && *u <= '9')) {
311       dst[i*2] = '.';
312       dst[i*2 + 1] = *u;
313       i++;
314     }
315     else if(*u != '-') return 0;
316     u++;
317   }
318   dst[i*2] = '\0';
319   return 1;
320 }
321 static int
322 noit_rabbimq_submit(iep_thread_driver_t *dr,
323                     const char *payload, size_t payloadlen) {
324   int rv;
325   amqp_bytes_t body;
326   struct amqp_driver *driver = (struct amqp_driver *)dr;
327   const char *routingkey = driver->routingkey;
328
329   body.len = payloadlen;
330   body.bytes = (char *)payload;
331   if(*payload == 'M' ||
332      *payload == 'S' ||
333      *payload == 'C') {
334     char uuid_str[32 * 2 + 1];
335     int account_id, check_id;
336     if(extract_uuid_from_jlog(payload, payloadlen,
337                               &account_id, &check_id, uuid_str)) {
338       if(*routingkey) {
339         char *replace;
340         int newlen = strlen(driver->routingkey) + 1 + sizeof(uuid_str) + 2 * 32;
341         replace = alloca(newlen);
342         snprintf(replace, newlen, "%s.%d.%d%s", driver->routingkey,
343                  account_id, check_id, uuid_str);
344         routingkey = replace;
345       }
346     }
347   }
348   rv = amqp_basic_publish(driver->connection, 1,
349                           amqp_cstring_bytes(driver->exchange),
350                           amqp_cstring_bytes(routingkey),
351                           1, 0, NULL, body);
352   if(rv < 0) {
353     noitL(noit_error, "AMQP publish failed, disconnecting\n");
354     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
355     amqp_destroy_connection(driver->connection);
356     driver->connection = NULL;
357     return -1;
358   }
359   BUMPSTAT(publications);
360   noit_rabbitmq_heartbeat(driver);
361   noit_rabbitmq_read_frame(driver);
362   amqp_maybe_release_buffers(driver->connection);
363   if(driver->has_error) {
364     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
365     amqp_destroy_connection(driver->connection);
366     driver->connection = NULL;
367     return -1;
368   }
369   return 0;
370 }
371
372 mq_driver_t mq_driver_rabbitmq = {
373   noit_rabbimq_allocate,
374   noit_rabbimq_connect,
375   noit_rabbimq_submit,
376   noit_rabbimq_disconnect,
377   noit_rabbimq_deallocate
378 };
379
380 static int noit_rabbimq_driver_config(noit_module_generic_t *self, noit_hash_table *o) {
381   return 0;
382 }
383 static int noit_rabbimq_driver_onload(noit_image_t *self) {
384   return 0;
385 }
386
387 static int
388 noit_console_show_rabbitmq(noit_console_closure_t ncct,
389                            int argc, char **argv,
390                            noit_console_state_t *dstate,
391                            void *closure) {
392   int i;
393   nc_printf(ncct, " == RabbitMQ ==\n");
394   nc_printf(ncct, " Concurrency:           %llu\n", stats.concurrency);
395   nc_printf(ncct, " Connects:              %llu\n", stats.connects);
396   nc_printf(ncct, " AMQP basic returns:    %llu\n", stats.basic_returns);
397   nc_printf(ncct, " AMQP methods (in):     %llu\n", stats.inbound_methods);
398   nc_printf(ncct, " AMQP heartbeats (in):  %llu\n", stats.inbound_heartbeats);
399   nc_printf(ncct, " AMQP basic publish:    %llu\n", stats.publications);
400   pthread_mutex_lock(&driver_lock);
401   for(i=0;i<MAX_HOSTS;i++) {
402     struct amqp_driver *dr;
403     if(!stats.thread_states[i].owner) continue;
404     dr = &stats.thread_states[i];
405     nc_printf(ncct, "   == connection: %p ==\n", (void *)dr->owner);
406     if(dr->connection)
407       nc_printf(ncct, "     %s@%s:%d (vhost: %s, exchange: %s)\n",
408                 dr->username, dr->hostname[dr->hostidx], dr->port, dr->vhost,
409                 dr->exchange);
410     else
411       nc_printf(ncct, "     not connected\n");
412   }
413   pthread_mutex_unlock(&driver_lock);
414 }
415 static void
416 register_console_rabbitmq_commands() {
417   noit_console_state_t *tl;
418   cmd_info_t *showcmd;
419
420   tl = noit_console_state_initial();
421   showcmd = noit_console_state_get_cmd(tl, "show");
422   assert(showcmd && showcmd->dstate);
423   noit_console_state_add_cmd(showcmd->dstate,
424     NCSCMD("rabbitmq", noit_console_show_rabbitmq, NULL, NULL, NULL));
425 }
426
427 static int noit_rabbimq_driver_init(noit_module_generic_t *self) {
428   pthread_mutex_init(&driver_lock, NULL);
429   memset(&stats, 0, sizeof(stats));
430   stratcon_iep_mq_driver_register("rabbitmq", &mq_driver_rabbitmq);
431   register_console_rabbitmq_commands();
432   return 0;
433 }
434
435 noit_module_generic_t rabbitmq_driver = {
436   {
437     NOIT_GENERIC_MAGIC,
438     NOIT_GENERIC_ABI_VERSION,
439     "rabbitmq_driver",
440     "AMQP driver for IEP MQ submission",
441     "",
442     noit_rabbimq_driver_onload
443   },
444   noit_rabbimq_driver_config,
445   noit_rabbimq_driver_init
446 };
447
Note: See TracBrowser for help on using the browser.