root/src/modules/rabbitmq_driver.c

Revision 4ed37cf09ac9817ced9312616da97a3a1e90c6b3, 15.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 months ago)

cleanup of modules, verbose structure setting

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
31
32 #include "noit_defines.h"
33 #include "noit_module.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "stratcon_iep.h"
37 #include "noit_conf.h"
38 #include "librabbitmq/amqp.h"
39 #include "librabbitmq/amqp_framing.h"
40 #include "rabbitmq_driver.xmlh"
41
42 #include <poll.h>
43 #include <unistd.h>
44 #include <assert.h>
45
46 #define MAX_CONCURRENCY 16
47 #define MAX_HOSTS 10
48
49 static pthread_mutex_t driver_lock;
50 struct amqp_driver {
51   pthread_t owner;
52   amqp_connection_state_t connection;
53   char exchange[128];
54   char routingkey[256];
55   char username[80];
56   char password[80];
57   char vhost[256];
58   int sockfd;
59   int heartbeat;
60   int nhosts;
61   int nconnects;
62   int hostidx;
63   char hostname[10][256];
64   int port;
65   struct timeval last_hb;
66   int has_error; /* out of band */
67 };
68
69 static struct {
70   noit_atomic64_t basic_returns;
71   noit_atomic64_t connects;
72   noit_atomic64_t inbound_methods;
73   noit_atomic64_t inbound_heartbeats;
74   noit_atomic64_t publications;
75   noit_atomic64_t concurrency;
76   struct amqp_driver thread_states[MAX_CONCURRENCY];
77 } stats;
78 #define BUMPSTAT(a) noit_atomic_inc64(&stats.a)
79
80 static iep_thread_driver_t *noit_rabbimq_allocate(noit_conf_section_t conf) {
81   char *hostname = NULL, *cp, *brk;
82   struct amqp_driver *dr = NULL;
83   int i;
84
85   pthread_mutex_lock(&driver_lock);
86   for(i=0; i<MAX_HOSTS; i++) {
87     if(stats.thread_states[i].owner == (pthread_t)(vpsized_int)NULL) {
88       stats.thread_states[i].owner = pthread_self();
89       dr = &stats.thread_states[i];
90       break;
91     }
92   }
93   pthread_mutex_unlock(&driver_lock);
94   if(!dr) return NULL;
95   dr->nconnects = rand();
96 #define GETCONFSTR(w) noit_conf_get_stringbuf(conf, #w, dr->w, sizeof(dr->w))
97   GETCONFSTR(exchange);
98   if(!GETCONFSTR(routingkey))
99     dr->routingkey[0] = '\0';
100   GETCONFSTR(username);
101   GETCONFSTR(password);
102   if(!GETCONFSTR(vhost)) { dr->vhost[0] = '/'; dr->vhost[1] = '\0'; }
103   if(!noit_conf_get_int(conf, "heartbeat", &dr->heartbeat))
104     dr->heartbeat = 5000;
105   dr->heartbeat = (dr->heartbeat + 999) / 1000;
106
107   (void)noit_conf_get_string(conf, "hostname", &hostname);
108   if(!hostname) hostname = strdup("127.0.0.1");
109   for(cp = hostname; cp; cp = strchr(cp+1, ',')) dr->nhosts++;
110   if(dr->nhosts > MAX_HOSTS) dr->nhosts = MAX_HOSTS;
111   for(i = 0, cp = strtok_r(hostname, ",", &brk);
112       cp; cp = strtok_r(NULL, ",", &brk), i++)
113     strlcpy(dr->hostname[i], cp, sizeof(dr->hostname[i]));
114   free(hostname);
115
116   if(!noit_conf_get_int(conf, "port", &dr->port))
117     dr->port = 5672;
118   noit_atomic_inc64(&stats.concurrency);
119   return (iep_thread_driver_t *)dr;
120 }
121 static int noit_rabbimq_disconnect(iep_thread_driver_t *d) {
122   struct amqp_driver *dr = (struct amqp_driver *)d;
123   if(dr->connection) {
124     amqp_destroy_connection(dr->connection);
125     if(dr->sockfd >= 0) close(dr->sockfd);
126     dr->sockfd = -1;
127     dr->connection = NULL;
128     return 0;
129   }
130   return -1;
131 }
132 static void noit_rabbimq_deallocate(iep_thread_driver_t *d) {
133   struct amqp_driver *dr = (struct amqp_driver *)d;
134   noit_rabbimq_disconnect(d);
135   pthread_mutex_lock(&driver_lock);
136   memset(dr, 0, sizeof(*dr));
137   pthread_mutex_unlock(&driver_lock);
138   noit_atomic_dec64(&stats.concurrency);
139   free(dr);
140 }
141 static void noit_rabbitmq_read_frame(struct amqp_driver *dr) {
142   struct pollfd p;
143   if(!dr->connection) return;
144   while(1) {
145     memset(&p, 0, sizeof(p));
146     p.fd = dr->sockfd;
147     p.events = POLLIN;
148     if(poll(&p, 1, 0)) {
149       int rv;
150       amqp_frame_t f;
151       rv = amqp_simple_wait_frame(dr->connection, &f);
152       if(rv > 0) {
153         if(f.frame_type == AMQP_FRAME_HEARTBEAT) {
154           BUMPSTAT(inbound_heartbeats);
155           noitL(noit_debug, "amqp <- hearbeat\n");
156         }
157         else if(f.frame_type == AMQP_FRAME_METHOD) {
158           BUMPSTAT(inbound_methods);
159           noitL(noit_error, "amqp <- method [%s]\n", amqp_method_name(f.payload.method.id));
160           dr->has_error = 1;
161           switch(f.payload.method.id) {
162             case AMQP_CHANNEL_CLOSE_METHOD: {
163                 amqp_channel_close_t *m = (amqp_channel_close_t *) f.payload.method.decoded;
164                 noitL(noit_error, "AMQP channel close error %d: %s\n",
165                       m->reply_code, (char *)m->reply_text.bytes);
166               }
167               break;
168             case AMQP_CONNECTION_CLOSE_METHOD: {
169                 amqp_connection_close_t *m = (amqp_connection_close_t *) f.payload.method.decoded;
170                 noitL(noit_error, "AMQP connection close error %d: %s\n",
171                       m->reply_code, (char *)m->reply_text.bytes);
172               }
173               break;
174           }
175         }
176         else {
177           noitL(noit_error, "amqp <- frame [%d]\n", f.frame_type);
178         }
179       }
180       else break;
181     }
182     else break;
183   }
184 }
185 static void noit_rabbitmq_heartbeat(struct amqp_driver *dr) {
186   struct timeval n, d;
187   if(!dr->connection) return;
188   gettimeofday(&n, NULL);
189   sub_timeval(n, dr->last_hb, &d);
190   if(d.tv_sec >= dr->heartbeat) {
191     amqp_frame_t f;
192     f.frame_type = AMQP_FRAME_HEARTBEAT;
193     f.channel = 0;
194     amqp_send_frame(dr->connection, &f);
195     noitL(noit_debug, "amqp -> hearbeat\n");
196     memcpy(&dr->last_hb, &n, sizeof(n));
197   }
198 }
199 static void
200 noit_rabbitmq_brcb(amqp_channel_t channel, amqp_basic_return_t *m, void *closure) {
201   BUMPSTAT(basic_returns);
202   noitL(noit_debug, "AMQP return [%d:%.*s]\n", m->reply_code,
203         (int)m->reply_text.len, (char *)m->reply_text.bytes);
204 }
205 static int noit_rabbimq_connect(iep_thread_driver_t *dr) {
206   struct amqp_driver *driver = (struct amqp_driver *)dr;
207
208   if(!driver->connection) {
209     int sidx = driver->nconnects++ % driver->nhosts;
210     struct timeval timeout;
211     amqp_rpc_reply_t r, *rptr;
212
213     noitL(noit_error, "AMQP connect: %s:%d\n",
214           driver->hostname[sidx], driver->port);
215     BUMPSTAT(connects);
216     driver->hostidx = sidx;
217     timeout.tv_sec = driver->heartbeat;
218     timeout.tv_usec = 0;
219     driver->sockfd = amqp_open_socket(driver->hostname[sidx], driver->port, &timeout);
220     if(driver->sockfd < 0) {
221       noitL(noit_error, "AMQP connect failed: %s:%d\n",
222             driver->hostname[sidx], driver->port);
223       return -1;
224     }
225     driver->has_error = 0;
226     driver->connection = amqp_new_connection();
227     amqp_set_basic_return_cb(driver->connection, noit_rabbitmq_brcb, driver);
228     amqp_set_sockfd(driver->connection, driver->sockfd);
229     r = amqp_login(driver->connection,
230                    driver->vhost, 0, 131072, driver->heartbeat,
231                    AMQP_SASL_METHOD_PLAIN,
232                    driver->username, driver->password);
233     if(r.reply_type != AMQP_RESPONSE_NORMAL) {
234       noitL(noit_error, "AMQP login failed\n");
235       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
236       amqp_destroy_connection(driver->connection);
237       if(driver->sockfd >= 0) close(driver->sockfd);
238       driver->sockfd = -1;
239       driver->connection = NULL;
240       return -1;
241     }
242
243     amqp_channel_open(driver->connection, 1);
244     rptr = amqp_get_rpc_reply();
245     if(rptr->reply_type != AMQP_RESPONSE_NORMAL) {
246       noitL(noit_error, "AMQP channe_open failed\n");
247       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
248       amqp_destroy_connection(driver->connection);
249       if(driver->sockfd >= 0) close(driver->sockfd);
250       driver->sockfd = -1;
251       driver->connection = NULL;
252       return -1;
253     }
254     gettimeofday(&driver->last_hb, NULL);
255     return 0;
256   }
257   /* 1 means already connected */
258   return 1;
259 }
260
261 /* This is very specific to an internal implementation somewhere...
262  * and thus unlikely to be useful unless people name their checks:
263  * c_<accountid>_<checknumber>::<rest of name>
264  * This code should likley be made generic, perhaps with named
265  * pcre captures.  However, I'm worried about performance.
266  * For now, leave it and understand it is limited usefulness.
267  */
268 static int extract_uuid_from_jlog(const char *payload, size_t payloadlen,
269                                   int *account_id, int *check_id, char *dst) {
270   int i = 0;
271   const char *atab = payload, *u = NULL;
272
273   if(account_id) *account_id = 0;
274   if(check_id) *check_id = 0;
275
276 #define advance_past_tab do { \
277   atab = memchr(atab, '\t', payloadlen - (atab - payload)); \
278   if(!atab) return 0; \
279   atab++; \
280 } while(0)
281
282   /* Tab -> M|S|C */
283   advance_past_tab;
284   /* Tab -> noit IP */
285   advance_past_tab;
286   /* Tab -> timestamp */
287   advance_past_tab;
288   /* Tab -> uuid */
289   u = atab;
290   advance_past_tab;
291   /* Tab -> metric_name */
292   atab--;
293   if(atab - u < UUID_STR_LEN) return 0;
294   if(atab - u > UUID_STR_LEN) {
295     const char *f;
296     f = memchr(u, '`', payloadlen - (u - payload));
297     if(f) {
298       f = memchr(f+1, '`', payloadlen - (f + 1 - payload));
299       if(f) {
300         f++;
301         if(memcmp(f, "c_", 2) == 0) {
302           f += 2;
303           if(account_id) *account_id = atoi(f);
304           f = memchr(f, '_', payloadlen - (f - payload));
305           if(f) {
306             f++;
307             if(check_id) *check_id = atoi(f);
308           }
309         }
310       }
311     }
312   }
313   u = atab - UUID_STR_LEN;
314   while(i<32 && u < atab) {
315     if((*u >= 'a' && *u <= 'f') ||
316        (*u >= '0' && *u <= '9')) {
317       dst[i*2] = '.';
318       dst[i*2 + 1] = *u;
319       i++;
320     }
321     else if(*u != '-') return 0;
322     u++;
323   }
324   dst[i*2] = '\0';
325   return 1;
326 }
327 static int
328 noit_rabbimq_submit(iep_thread_driver_t *dr,
329                     const char *payload, size_t payloadlen) {
330   int rv;
331   amqp_bytes_t body;
332   struct amqp_driver *driver = (struct amqp_driver *)dr;
333   const char *routingkey = driver->routingkey;
334
335   body.len = payloadlen;
336   body.bytes = (char *)payload;
337   if(*payload == 'M' ||
338      *payload == 'S' ||
339      *payload == 'C' ||
340      (*payload == 'B' && (payload[1] == '1' || payload[1] == '2'))) {
341     char uuid_str[32 * 2 + 1];
342     int account_id, check_id;
343     if(extract_uuid_from_jlog(payload, payloadlen,
344                               &account_id, &check_id, uuid_str)) {
345       if(*routingkey) {
346         char *replace;
347         int newlen = strlen(driver->routingkey) + 1 + sizeof(uuid_str) + 2 * 32;
348         replace = alloca(newlen);
349         snprintf(replace, newlen, "%s.%x.%x.%d.%d%s", driver->routingkey,
350                  account_id%16, (account_id/16)%16, account_id,
351                  check_id, uuid_str);
352         routingkey = replace;
353       }
354     }
355   }
356   rv = amqp_basic_publish(driver->connection, 1,
357                           amqp_cstring_bytes(driver->exchange),
358                           amqp_cstring_bytes(routingkey),
359                           1, 0, NULL, body);
360   if(rv < 0) {
361     noitL(noit_error, "AMQP publish failed, disconnecting\n");
362     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
363     amqp_destroy_connection(driver->connection);
364     if(driver->sockfd >= 0) close(driver->sockfd);
365     driver->sockfd = -1;
366     driver->connection = NULL;
367     return -1;
368   }
369   BUMPSTAT(publications);
370   noit_rabbitmq_heartbeat(driver);
371   noit_rabbitmq_read_frame(driver);
372   amqp_maybe_release_buffers(driver->connection);
373   if(driver->has_error) {
374     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
375     amqp_destroy_connection(driver->connection);
376     if(driver->sockfd >= 0) close(driver->sockfd);
377     driver->sockfd = -1;
378     driver->connection = NULL;
379     return -1;
380   }
381   return 0;
382 }
383
384 mq_driver_t mq_driver_rabbitmq = {
385   noit_rabbimq_allocate,
386   noit_rabbimq_connect,
387   noit_rabbimq_submit,
388   noit_rabbimq_disconnect,
389   noit_rabbimq_deallocate
390 };
391
392 static int noit_rabbimq_driver_config(noit_module_generic_t *self, noit_hash_table *o) {
393   return 0;
394 }
395 static int noit_rabbimq_driver_onload(noit_image_t *self) {
396   return 0;
397 }
398
399 static int
400 noit_console_show_rabbitmq(noit_console_closure_t ncct,
401                            int argc, char **argv,
402                            noit_console_state_t *dstate,
403                            void *closure) {
404   int i;
405   nc_printf(ncct, " == RabbitMQ ==\n");
406   nc_printf(ncct, " Concurrency:           %llu\n", stats.concurrency);
407   nc_printf(ncct, " Connects:              %llu\n", stats.connects);
408   nc_printf(ncct, " AMQP basic returns:    %llu\n", stats.basic_returns);
409   nc_printf(ncct, " AMQP methods (in):     %llu\n", stats.inbound_methods);
410   nc_printf(ncct, " AMQP heartbeats (in):  %llu\n", stats.inbound_heartbeats);
411   nc_printf(ncct, " AMQP basic publish:    %llu\n", stats.publications);
412   pthread_mutex_lock(&driver_lock);
413   for(i=0;i<MAX_HOSTS;i++) {
414     struct amqp_driver *dr;
415     if(!stats.thread_states[i].owner) continue;
416     dr = &stats.thread_states[i];
417     nc_printf(ncct, "   == connection: %p ==\n", (void *)(vpsized_int)dr->owner);
418     if(dr->connection)
419       nc_printf(ncct, "     %s@%s:%d (vhost: %s, exchange: %s)\n",
420                 dr->username, dr->hostname[dr->hostidx], dr->port, dr->vhost,
421                 dr->exchange);
422     else
423       nc_printf(ncct, "     not connected\n");
424   }
425   pthread_mutex_unlock(&driver_lock);
426   return 1;
427 }
428
429 static void
430 register_console_rabbitmq_commands() {
431   noit_console_state_t *tl;
432   cmd_info_t *showcmd;
433
434   tl = noit_console_state_initial();
435   showcmd = noit_console_state_get_cmd(tl, "show");
436   assert(showcmd && showcmd->dstate);
437   noit_console_state_add_cmd(showcmd->dstate,
438     NCSCMD("rabbitmq", noit_console_show_rabbitmq, NULL, NULL, NULL));
439 }
440
441 static int noit_rabbimq_driver_init(noit_module_generic_t *self) {
442   pthread_mutex_init(&driver_lock, NULL);
443   memset(&stats, 0, sizeof(stats));
444   stratcon_iep_mq_driver_register("rabbitmq", &mq_driver_rabbitmq);
445   register_console_rabbitmq_commands();
446   return 0;
447 }
448
449 noit_module_generic_t rabbitmq_driver = {
450   {
451     .magic = NOIT_GENERIC_MAGIC,
452     .version = NOIT_GENERIC_ABI_VERSION,
453     .name = "rabbitmq_driver",
454     .description = "AMQP driver for IEP MQ submission",
455     .xml_description = rabbitmq_driver_xml_description,
456     .onload = noit_rabbimq_driver_onload
457   },
458   noit_rabbimq_driver_config,
459   noit_rabbimq_driver_init
460 };
461
Note: See TracBrowser for help on using the browser.