root/src/modules/rabbitmq_driver.c

Revision d25e46078a2b0f26a7561a53c04d2a00e21150e2, 16.1 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 4 months ago)

Add Filtered Exchange To Stratcon

Added the ability to have a "filtered" exchange in Stratcon. This will
only pass certain checks and metrics through. The checks and/or
metrics required can be specified in XML via a PUT command, which will
then pass the information on to the RabbitMQ and FQ drivers.

Right now, the RabbitMQ driver is not implemented; only the FQ driver
is.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2011, OmniTI Computer Consulting, Inc.
3  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
31
32 #include <mtev_defines.h>
33
34 #include <poll.h>
35 #include <unistd.h>
36 #include <assert.h>
37 #include <errno.h>
38
39 #include <mtev_dso.h>
40 #include <eventer/eventer.h>
41 #include <mtev_log.h>
42 #include <stratcon_iep.h>
43 #include <mtev_conf.h>
44
45 #include "librabbitmq/amqp.h"
46 #include "librabbitmq/amqp_framing.h"
47 #include "rabbitmq_driver.xmlh"
48
49 #define MAX_CONCURRENCY 16
50 #define MAX_HOSTS 10
51 #define DEFAULT_SNDBUF (1 << 20)
52 #define DEFAULT_RCVBUF (1 << 20)
53
54 static socklen_t desired_sndbuf = DEFAULT_SNDBUF;
55 static socklen_t desired_rcvbuf = DEFAULT_RCVBUF;
56
57 static pthread_mutex_t driver_lock;
58 struct amqp_driver {
59   pthread_t owner;
60   amqp_connection_state_t connection;
61   char exchange[128];
62   char routingkey[256];
63   char username[80];
64   char password[80];
65   char vhost[256];
66   int sockfd;
67   int heartbeat;
68   int nhosts;
69   int nconnects;
70   int hostidx;
71   char hostname[10][256];
72   int port;
73   struct timeval last_hb;
74   int has_error; /* out of band */
75 };
76
77 static struct {
78   mtev_atomic64_t basic_returns;
79   mtev_atomic64_t connects;
80   mtev_atomic64_t inbound_methods;
81   mtev_atomic64_t inbound_heartbeats;
82   mtev_atomic64_t publications;
83   mtev_atomic64_t concurrency;
84   struct amqp_driver thread_states[MAX_CONCURRENCY];
85 } stats;
86 #define BUMPSTAT(a) mtev_atomic_inc64(&stats.a)
87
88 static iep_thread_driver_t *noit_rabbimq_allocate(mtev_conf_section_t conf) {
89   char *hostname = NULL, *cp, *brk;
90   struct amqp_driver *dr = NULL;
91   int i;
92
93   pthread_mutex_lock(&driver_lock);
94   for(i=0; i<MAX_HOSTS; i++) {
95     if(stats.thread_states[i].owner == (pthread_t)(vpsized_int)NULL) {
96       stats.thread_states[i].owner = pthread_self();
97       dr = &stats.thread_states[i];
98       break;
99     }
100   }
101   pthread_mutex_unlock(&driver_lock);
102   if(!dr) return NULL;
103   dr->nconnects = rand();
104 #define GETCONFSTR(w) mtev_conf_get_stringbuf(conf, #w, dr->w, sizeof(dr->w))
105   GETCONFSTR(exchange);
106   if(!GETCONFSTR(routingkey))
107     dr->routingkey[0] = '\0';
108   GETCONFSTR(username);
109   GETCONFSTR(password);
110   if(!GETCONFSTR(vhost)) { dr->vhost[0] = '/'; dr->vhost[1] = '\0'; }
111   if(!mtev_conf_get_int(conf, "heartbeat", &dr->heartbeat))
112     dr->heartbeat = 5000;
113   dr->heartbeat = (dr->heartbeat + 999) / 1000;
114
115   (void)mtev_conf_get_string(conf, "hostname", &hostname);
116   if(!hostname) hostname = strdup("127.0.0.1");
117   for(cp = hostname; cp; cp = strchr(cp+1, ',')) dr->nhosts++;
118   if(dr->nhosts > MAX_HOSTS) dr->nhosts = MAX_HOSTS;
119   for(i = 0, cp = strtok_r(hostname, ",", &brk);
120       cp; cp = strtok_r(NULL, ",", &brk), i++)
121     strlcpy(dr->hostname[i], cp, sizeof(dr->hostname[i]));
122   free(hostname);
123
124   if(!mtev_conf_get_int(conf, "port", &dr->port))
125     dr->port = 5672;
126   mtev_atomic_inc64(&stats.concurrency);
127   return (iep_thread_driver_t *)dr;
128 }
129 static int noit_rabbimq_disconnect(iep_thread_driver_t *d) {
130   struct amqp_driver *dr = (struct amqp_driver *)d;
131   if(dr->connection) {
132     amqp_destroy_connection(dr->connection);
133     if(dr->sockfd >= 0) close(dr->sockfd);
134     dr->sockfd = -1;
135     dr->connection = NULL;
136     return 0;
137   }
138   return -1;
139 }
140 static void noit_rabbimq_deallocate(iep_thread_driver_t *d) {
141   struct amqp_driver *dr = (struct amqp_driver *)d;
142   noit_rabbimq_disconnect(d);
143   pthread_mutex_lock(&driver_lock);
144   memset(dr, 0, sizeof(*dr));
145   pthread_mutex_unlock(&driver_lock);
146   mtev_atomic_dec64(&stats.concurrency);
147   free(dr);
148 }
149 static void noit_rabbitmq_set_filters(mq_command_t *command, int count) {
150   /* NOT CURRENTLY IMPLEMENTED */
151 }
152 static void noit_rabbitmq_read_frame(struct amqp_driver *dr) {
153   struct pollfd p;
154   if(!dr->connection) return;
155   while(1) {
156     memset(&p, 0, sizeof(p));
157     p.fd = dr->sockfd;
158     p.events = POLLIN;
159     if(poll(&p, 1, 0)) {
160       int rv;
161       amqp_frame_t f;
162       rv = amqp_simple_wait_frame(dr->connection, &f);
163       if(rv > 0) {
164         if(f.frame_type == AMQP_FRAME_HEARTBEAT) {
165           BUMPSTAT(inbound_heartbeats);
166           mtevL(mtev_debug, "amqp <- hearbeat\n");
167         }
168         else if(f.frame_type == AMQP_FRAME_METHOD) {
169           BUMPSTAT(inbound_methods);
170           mtevL(mtev_error, "amqp <- method [%s]\n", amqp_method_name(f.payload.method.id));
171           dr->has_error = 1;
172           switch(f.payload.method.id) {
173             case AMQP_CHANNEL_CLOSE_METHOD: {
174                 amqp_channel_close_t *m = (amqp_channel_close_t *) f.payload.method.decoded;
175                 mtevL(mtev_error, "AMQP channel close error %d: %s\n",
176                       m->reply_code, (char *)m->reply_text.bytes);
177               }
178               break;
179             case AMQP_CONNECTION_CLOSE_METHOD: {
180                 amqp_connection_close_t *m = (amqp_connection_close_t *) f.payload.method.decoded;
181                 mtevL(mtev_error, "AMQP connection close error %d: %s\n",
182                       m->reply_code, (char *)m->reply_text.bytes);
183               }
184               break;
185           }
186         }
187         else {
188           mtevL(mtev_error, "amqp <- frame [%d]\n", f.frame_type);
189         }
190       }
191       else break;
192     }
193     else break;
194   }
195 }
196 static void noit_rabbitmq_heartbeat(struct amqp_driver *dr) {
197   struct timeval n, d;
198   if(!dr->connection) return;
199   gettimeofday(&n, NULL);
200   sub_timeval(n, dr->last_hb, &d);
201   if(d.tv_sec >= dr->heartbeat) {
202     amqp_frame_t f;
203     f.frame_type = AMQP_FRAME_HEARTBEAT;
204     f.channel = 0;
205     amqp_send_frame(dr->connection, &f);
206     mtevL(mtev_debug, "amqp -> hearbeat\n");
207     memcpy(&dr->last_hb, &n, sizeof(n));
208   }
209 }
210 static void
211 noit_rabbitmq_brcb(amqp_channel_t channel, amqp_basic_return_t *m, void *closure) {
212   BUMPSTAT(basic_returns);
213   mtevL(mtev_debug, "AMQP return [%d:%.*s]\n", m->reply_code,
214         (int)m->reply_text.len, (char *)m->reply_text.bytes);
215 }
216 static int noit_rabbimq_connect(iep_thread_driver_t *dr) {
217   struct amqp_driver *driver = (struct amqp_driver *)dr;
218
219   if(!driver->connection) {
220     int sidx = driver->nconnects++ % driver->nhosts;
221     struct timeval timeout;
222     amqp_rpc_reply_t r, *rptr;
223
224     mtevL(mtev_error, "AMQP connect: %s:%d\n",
225           driver->hostname[sidx], driver->port);
226     BUMPSTAT(connects);
227     driver->hostidx = sidx;
228     timeout.tv_sec = driver->heartbeat;
229     timeout.tv_usec = 0;
230     driver->sockfd = amqp_open_socket(driver->hostname[sidx], driver->port, &timeout);
231     if(driver->sockfd < 0) {
232       mtevL(mtev_error, "AMQP connect failed: %s:%d\n",
233             driver->hostname[sidx], driver->port);
234       return -1;
235     }
236     if(setsockopt(driver->sockfd, SOL_SOCKET, SO_SNDBUF, &desired_sndbuf, sizeof(desired_sndbuf)) < 0)
237       mtevL(mtev_debug, "rabbitmq: setsockopt(SO_SNDBUF, %ld) -> %s\n", desired_sndbuf, strerror(errno));
238     if(setsockopt(driver->sockfd, SOL_SOCKET, SO_RCVBUF, &desired_rcvbuf, sizeof(desired_rcvbuf)) < 0)
239       mtevL(mtev_debug, "rabbitmq: setsockopt(SO_RCVBUF, %ld) -> %s\n", desired_rcvbuf, strerror(errno));
240     driver->has_error = 0;
241     driver->connection = amqp_new_connection();
242     amqp_set_basic_return_cb(driver->connection, noit_rabbitmq_brcb, driver);
243     amqp_set_sockfd(driver->connection, driver->sockfd);
244     r = amqp_login(driver->connection,
245                    driver->vhost, 0, 131072, driver->heartbeat,
246                    AMQP_SASL_METHOD_PLAIN,
247                    driver->username, driver->password);
248     if(r.reply_type != AMQP_RESPONSE_NORMAL) {
249       mtevL(mtev_error, "AMQP login failed\n");
250       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
251       amqp_destroy_connection(driver->connection);
252       if(driver->sockfd >= 0) close(driver->sockfd);
253       driver->sockfd = -1;
254       driver->connection = NULL;
255       return -1;
256     }
257
258     amqp_channel_open(driver->connection, 1);
259     rptr = amqp_get_rpc_reply();
260     if(rptr->reply_type != AMQP_RESPONSE_NORMAL) {
261       mtevL(mtev_error, "AMQP channe_open failed\n");
262       amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
263       amqp_destroy_connection(driver->connection);
264       if(driver->sockfd >= 0) close(driver->sockfd);
265       driver->sockfd = -1;
266       driver->connection = NULL;
267       return -1;
268     }
269     gettimeofday(&driver->last_hb, NULL);
270     return 0;
271   }
272   /* 1 means already connected */
273   return 1;
274 }
275
276 /* This is very specific to an internal implementation somewhere...
277  * and thus unlikely to be useful unless people name their checks:
278  * c_<accountid>_<checknumber>::<rest of name>
279  * This code should likley be made generic, perhaps with named
280  * pcre captures.  However, I'm worried about performance.
281  * For now, leave it and understand it is limited usefulness.
282  */
283 static int extract_uuid_from_jlog(const char *payload, size_t payloadlen,
284                                   int *account_id, int *check_id, char *dst) {
285   int i = 0;
286   const char *atab = payload, *u = NULL;
287
288   if(account_id) *account_id = 0;
289   if(check_id) *check_id = 0;
290
291 #define advance_past_tab do { \
292   atab = memchr(atab, '\t', payloadlen - (atab - payload)); \
293   if(!atab) return 0; \
294   atab++; \
295 } while(0)
296
297   /* Tab -> M|S|C */
298   advance_past_tab;
299   /* Tab -> noit IP */
300   advance_past_tab;
301   /* Tab -> timestamp */
302   advance_past_tab;
303   /* Tab -> uuid */
304   u = atab;
305   advance_past_tab;
306   /* Tab -> metric_name */
307   atab--;
308   if(atab - u < UUID_STR_LEN) return 0;
309   if(atab - u > UUID_STR_LEN) {
310     const char *f;
311     f = memchr(u, '`', payloadlen - (u - payload));
312     if(f) {
313       f = memchr(f+1, '`', payloadlen - (f + 1 - payload));
314       if(f) {
315         f++;
316         if(memcmp(f, "c_", 2) == 0) {
317           f += 2;
318           if(account_id) *account_id = atoi(f);
319           f = memchr(f, '_', payloadlen - (f - payload));
320           if(f) {
321             f++;
322             if(check_id) *check_id = atoi(f);
323           }
324         }
325       }
326     }
327   }
328   u = atab - UUID_STR_LEN;
329   while(i<32 && u < atab) {
330     if((*u >= 'a' && *u <= 'f') ||
331        (*u >= '0' && *u <= '9')) {
332       dst[i*2] = '.';
333       dst[i*2 + 1] = *u;
334       i++;
335     }
336     else if(*u != '-') return 0;
337     u++;
338   }
339   dst[i*2] = '\0';
340   return 1;
341 }
342 static int
343 noit_rabbimq_submit(iep_thread_driver_t *dr,
344                     const char *payload, size_t payloadlen) {
345   int rv;
346   amqp_bytes_t body;
347   struct amqp_driver *driver = (struct amqp_driver *)dr;
348   const char *routingkey = driver->routingkey;
349
350   body.len = payloadlen;
351   body.bytes = (char *)payload;
352   if(*payload == 'M' ||
353      *payload == 'S' ||
354      *payload == 'C' ||
355      (*payload == 'H' && payload[1] == '1') ||
356      (*payload == 'F' && payload[1] == '1') ||
357      (*payload == 'B' && (payload[1] == '1' || payload[1] == '2'))) {
358     char uuid_str[32 * 2 + 1];
359     int account_id, check_id;
360     if(extract_uuid_from_jlog(payload, payloadlen,
361                               &account_id, &check_id, uuid_str)) {
362       if(*routingkey) {
363         char *replace;
364         int newlen = strlen(driver->routingkey) + 1 + sizeof(uuid_str) + 2 * 32;
365         replace = alloca(newlen);
366         snprintf(replace, newlen, "%s.%x.%x.%d.%d%s", driver->routingkey,
367                  account_id%16, (account_id/16)%16, account_id,
368                  check_id, uuid_str);
369         routingkey = replace;
370       }
371     }
372   }
373   rv = amqp_basic_publish(driver->connection, 1,
374                           amqp_cstring_bytes(driver->exchange),
375                           amqp_cstring_bytes(routingkey),
376                           1, 0, NULL, body);
377   if(rv < 0) {
378     mtevL(mtev_error, "AMQP publish failed, disconnecting\n");
379     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
380     amqp_destroy_connection(driver->connection);
381     if(driver->sockfd >= 0) close(driver->sockfd);
382     driver->sockfd = -1;
383     driver->connection = NULL;
384     return -1;
385   }
386   BUMPSTAT(publications);
387   noit_rabbitmq_heartbeat(driver);
388   noit_rabbitmq_read_frame(driver);
389   amqp_maybe_release_buffers(driver->connection);
390   if(driver->has_error) {
391     amqp_connection_close(driver->connection, AMQP_REPLY_SUCCESS);
392     amqp_destroy_connection(driver->connection);
393     if(driver->sockfd >= 0) close(driver->sockfd);
394     driver->sockfd = -1;
395     driver->connection = NULL;
396     return -1;
397   }
398   return 0;
399 }
400
401 mq_driver_t mq_driver_rabbitmq = {
402   noit_rabbimq_allocate,
403   noit_rabbimq_connect,
404   noit_rabbimq_submit,
405   noit_rabbimq_disconnect,
406   noit_rabbimq_deallocate,
407   noit_rabbitmq_set_filters
408 };
409
410 static int noit_rabbimq_driver_config(mtev_dso_generic_t *self, mtev_hash_table *o) {
411   const char *intstr;
412   if(mtev_hash_retr_str(o, "sndbuf", strlen("sndbuf"), &intstr))
413     desired_sndbuf = atoi(intstr);
414   if(mtev_hash_retr_str(o, "rcvbuf", strlen("rcvbuf"), &intstr))
415     desired_rcvbuf = atoi(intstr);
416   return 0;
417 }
418 static int noit_rabbimq_driver_onload(mtev_image_t *self) {
419   return 0;
420 }
421
422 static int
423 noit_console_show_rabbitmq(mtev_console_closure_t ncct,
424                            int argc, char **argv,
425                            mtev_console_state_t *dstate,
426                            void *closure) {
427   int i;
428   nc_printf(ncct, " == RabbitMQ ==\n");
429   nc_printf(ncct, " Concurrency:           %llu\n", stats.concurrency);
430   nc_printf(ncct, " Connects:              %llu\n", stats.connects);
431   nc_printf(ncct, " AMQP basic returns:    %llu\n", stats.basic_returns);
432   nc_printf(ncct, " AMQP methods (in):     %llu\n", stats.inbound_methods);
433   nc_printf(ncct, " AMQP heartbeats (in):  %llu\n", stats.inbound_heartbeats);
434   nc_printf(ncct, " AMQP basic publish:    %llu\n", stats.publications);
435   pthread_mutex_lock(&driver_lock);
436   for(i=0;i<MAX_HOSTS;i++) {
437     struct amqp_driver *dr;
438     if(!stats.thread_states[i].owner) continue;
439     dr = &stats.thread_states[i];
440     nc_printf(ncct, "   == connection: %p ==\n", (void *)(vpsized_int)dr->owner);
441     if(dr->connection)
442       nc_printf(ncct, "     %s@%s:%d (vhost: %s, exchange: %s)\n",
443                 dr->username, dr->hostname[dr->hostidx], dr->port, dr->vhost,
444                 dr->exchange);
445     else
446       nc_printf(ncct, "     not connected\n");
447   }
448   pthread_mutex_unlock(&driver_lock);
449   return 1;
450 }
451
452 static void
453 register_console_rabbitmq_commands() {
454   mtev_console_state_t *tl;
455   cmd_info_t *showcmd;
456
457   tl = mtev_console_state_initial();
458   showcmd = mtev_console_state_get_cmd(tl, "show");
459   assert(showcmd && showcmd->dstate);
460   mtev_console_state_add_cmd(showcmd->dstate,
461     NCSCMD("rabbitmq", noit_console_show_rabbitmq, NULL, NULL, NULL));
462 }
463
464 static int noit_rabbimq_driver_init(mtev_dso_generic_t *self) {
465   pthread_mutex_init(&driver_lock, NULL);
466   memset(&stats, 0, sizeof(stats));
467   stratcon_iep_mq_driver_register("rabbitmq", &mq_driver_rabbitmq);
468   register_console_rabbitmq_commands();
469   return 0;
470 }
471
472 mtev_dso_generic_t rabbitmq_driver = {
473   {
474     .magic = MTEV_GENERIC_MAGIC,
475     .version = MTEV_GENERIC_ABI_VERSION,
476     .name = "rabbitmq_driver",
477     .description = "AMQP driver for IEP MQ submission",
478     .xml_description = rabbitmq_driver_xml_description,
479     .onload = noit_rabbimq_driver_onload
480   },
481   noit_rabbimq_driver_config,
482   noit_rabbimq_driver_init
483 };
484
Note: See TracBrowser for help on using the browser.