root/src/stratcon_jlog_streamer.c

Revision 70c30ef22f6d1312375e0392164d755db5824cb5, 53.8 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 1 week ago)

Use mtevAssert and mtevFatal Instead Of assert() and abort()

Use libmtev calls to safely flush logs and abort rather than calling
the assert and abort calls directly.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <unistd.h>
37 #include <errno.h>
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #ifdef HAVE_SYS_FILIO_H
41 #include <sys/filio.h>
42 #endif
43 #include <netinet/in.h>
44 #include <sys/un.h>
45 #include <arpa/inet.h>
46
47 #include <eventer/eventer.h>
48 #include <mtev_conf.h>
49 #include <mtev_hash.h>
50 #include <mtev_log.h>
51 #include <mtev_getip.h>
52 #include <mtev_rest.h>
53 #include <mtev_json.h>
54
55 #include "noit_mtev_bridge.h"
56 #include "stratcon_dtrace_probes.h"
57 #include "noit_jlog_listener.h"
58 #include "stratcon_datastore.h"
59 #include "stratcon_jlog_streamer.h"
60 #include "stratcon_iep.h"
61
62 pthread_mutex_t noits_lock;
63 mtev_hash_table noits = MTEV_HASH_EMPTY;
64 pthread_mutex_t noit_ip_by_cn_lock;
65 mtev_hash_table noit_ip_by_cn = MTEV_HASH_EMPTY;
66 static uuid_t self_stratcon_id;
67 static char self_stratcon_hostname[256] = "\0";
68 static struct sockaddr_in self_stratcon_ip;
69 static mtev_boolean stratcon_selfcheck_extended_id = mtev_true;
70
71 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
72
73 static const char *feed_type_to_str(int jlog_feed_cmd) {
74   switch(jlog_feed_cmd) {
75     case NOIT_JLOG_DATA_FEED: return "durable/storage";
76     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
77   }
78   return "unknown";
79 }
80
81 #define GET_EXPECTED_CN(nctx, cn) do { \
82   void *vcn; \
83   cn = NULL; \
84   if(nctx->config && \
85      mtev_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
86      cn = vcn; \
87   } \
88 } while(0)
89 #define GET_FEEDTYPE(nctx, feedtype) do { \
90   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
91   feedtype = "unknown"; \
92   if(_jctx->push == stratcon_datastore_push) \
93     feedtype = "storage"; \
94   else if(_jctx->push == stratcon_iep_line_processor) \
95     feedtype = "iep"; \
96 } while(0)
97
98 static int
99 remote_str_sort(const void *a, const void *b) {
100   int rv;
101   mtev_connection_ctx_t * const *actx = a;
102   mtev_connection_ctx_t * const *bctx = b;
103   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
104   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
105   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
106   if(rv) return rv;
107   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
108            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
109 }
110 static void
111 nc_print_noit_conn_brief(mtev_console_closure_t ncct,
112                           mtev_connection_ctx_t *ctx) {
113   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
114   struct timeval now, diff, session_duration;
115   char cmdbuf[4096];
116   const char *feedtype = "unknown";
117   const char *lasttime = "never";
118   const char *config_cn = NULL;
119   void *vcn;
120   if(ctx->last_connect.tv_sec != 0) {
121     time_t r = ctx->last_connect.tv_sec;
122     struct tm tbuf, *tm;
123     tm = gmtime_r(&r, &tbuf);
124     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
125     lasttime = cmdbuf;
126   }
127   nc_printf(ncct, "%s [%s]:\n", ctx->remote_str,
128             ctx->remote_cn ? "connected" :
129                              (ctx->retry_event ? "disconnected" :
130                                                    "connecting"));
131   if(ctx->config &&
132      mtev_hash_retrieve(ctx->config, "cn", strlen("cn"), &vcn)) {
133      config_cn = vcn;
134   }
135   if(config_cn) nc_printf(ncct, "\tcn: %s\n", config_cn);
136   nc_printf(ncct, "\tLast connect: %s\n", lasttime);
137   if(ctx->e) {
138     char buff[128];
139     const char *addrstr = NULL;
140     struct sockaddr_in6 addr6;
141     socklen_t len = sizeof(addr6);
142     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
143       unsigned short port = 0;
144       if(addr6.sin6_family == AF_INET) {
145         addrstr = inet_ntop(addr6.sin6_family,
146                             &((struct sockaddr_in *)&addr6)->sin_addr,
147                             buff, sizeof(buff));
148         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
149         port = ntohs(port);
150       }
151       else if(addr6.sin6_family == AF_INET6) {
152         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
153                             buff, sizeof(buff));
154         port = ntohs(addr6.sin6_port);
155       }
156       if(addrstr != NULL)
157         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
158       else
159         nc_printf(ncct, "\tLocal address not interpretable\n");
160     }
161     else {
162       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
163                 ctx->e->fd, strerror(errno));
164     }
165   }
166   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
167   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
168   gettimeofday(&now, NULL);
169   if(ctx->timeout_event) {
170     sub_timeval(ctx->timeout_event->whence, now, &diff);
171     nc_printf(ncct, "\tTimeout scheduled for %lld.%06us\n",
172               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
173   }
174   if(ctx->retry_event) {
175     sub_timeval(ctx->retry_event->whence, now, &diff);
176     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
177               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
178   }
179   else if(ctx->remote_cn) {
180     nc_printf(ncct, "\tRemote CN: '%s'\n",
181               ctx->remote_cn ? ctx->remote_cn : "???");
182     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
183       struct timeval last;
184       double session_duration_seconds;
185       const char *state = "unknown";
186
187       switch(jctx->state) {
188         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
189         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
190         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
191         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
192         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
193         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
194         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
195       }
196       last.tv_sec = jctx->header.tv_sec;
197       last.tv_usec = jctx->header.tv_usec;
198       sub_timeval(now, last, &diff);
199       sub_timeval(now, ctx->last_connect, &session_duration);
200       session_duration_seconds = session_duration.tv_sec +
201                                  (double)session_duration.tv_usec/1000000.0;
202       nc_printf(ncct, "\tState: %s\n"
203                       "\tNext checkpoint: [%08x:%08x]\n"
204                       "\tLast event: %lld.%06us ago\n"
205                       "\tEvents this session: %llu (%0.2f/s)\n"
206                       "\tOctets this session: %llu (%0.2f/s)\n",
207                 state,
208                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
209                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
210                 jctx->total_events,
211                 (double)jctx->total_events/session_duration_seconds,
212                 jctx->total_bytes_read,
213                 (double)jctx->total_bytes_read/session_duration_seconds);
214     }
215     else {
216       nc_printf(ncct, "\tUnknown type.\n");
217     }
218   }
219 }
220
221 jlog_streamer_ctx_t *
222 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
223   jlog_streamer_ctx_t *ctx;
224   ctx = stratcon_jlog_streamer_ctx_alloc();
225   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
226   ctx->push = stratcon_datastore_push;
227   return ctx;
228 }
229 jlog_streamer_ctx_t *
230 stratcon_jlog_streamer_ctx_alloc(void) {
231   jlog_streamer_ctx_t *ctx;
232   ctx = calloc(1, sizeof(*ctx));
233   return ctx;
234 }
235
236 void
237 jlog_streamer_ctx_free(void *cl) {
238   jlog_streamer_ctx_t *ctx = cl;
239   if(ctx->buffer) free(ctx->buffer);
240   free(ctx);
241 }
242
243 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
244 static int
245 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
246   int len, mask;
247   while(ctx->bytes_read < ctx->bytes_expected) {
248     len = Eread(ctx->buffer + ctx->bytes_read,
249                 ctx->bytes_expected - ctx->bytes_read);
250     if(len < 0) {
251       *newmask = mask;
252       return -1;
253     }
254     /* if we get 0 inside SSL, and there was a real error, we
255      * will actually get a -1 here.
256      * if(len == 0) return ctx->bytes_read;
257      */
258     ctx->total_bytes_read += len;
259     ctx->bytes_read += len;
260   }
261   mtevAssert(ctx->bytes_read == ctx->bytes_expected);
262   return ctx->bytes_read;
263 }
264 #define FULLREAD(e,ctx,size) do { \
265   int mask, len; \
266   if(!ctx->bytes_expected) { \
267     ctx->bytes_expected = size; \
268     if(ctx->buffer) free(ctx->buffer); \
269     ctx->buffer = malloc(size + 1); \
270     if(ctx->buffer == NULL) { \
271       mtevL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
272       goto socket_error; \
273     } \
274     ctx->buffer[size] = '\0'; \
275   } \
276   len = __read_on_ctx(e, ctx, &mask); \
277   if(len < 0) { \
278     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
279     mtevL(noit_error, "[%s] [%s] SSL read error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)", \
280           nctx->remote_cn ? nctx->remote_cn : "(null)", \
281           strerror(errno)); \
282     goto socket_error; \
283   } \
284   ctx->bytes_read = 0; \
285   ctx->bytes_expected = 0; \
286   if(len != size) { \
287     mtevL(noit_error, "[%s] [%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
288           nctx->remote_str ? nctx->remote_str : "(null)", \
289           nctx->remote_cn ? nctx->remote_cn : "(null)", \
290           ctx->state, len, (long unsigned int)size); \
291     goto socket_error; \
292   } \
293 } while(0)
294
295 int
296 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
297                            struct timeval *now) {
298   mtev_connection_ctx_t *nctx = closure;
299   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
300   jlog_streamer_ctx_t dummy;
301   int len;
302   jlog_id n_chkpt;
303   const char *cn_expected, *feedtype;
304   GET_EXPECTED_CN(nctx, cn_expected);
305   GET_FEEDTYPE(nctx, feedtype);
306
307   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
308     if(write(e->fd, e, 0) == -1)
309       mtevL(noit_error, "[%s] [%s] socket error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)",
310             nctx->remote_cn ? nctx->remote_cn : "(null)", strerror(errno));
311  socket_error:
312     ctx->state = JLOG_STREAMER_WANT_INITIATE;
313     ctx->count = 0;
314     ctx->needs_chkpt = 0;
315     ctx->bytes_read = 0;
316     ctx->bytes_expected = 0;
317     if(ctx->buffer) free(ctx->buffer);
318     ctx->buffer = NULL;
319     nctx->schedule_reattempt(nctx, now);
320     nctx->close(nctx, e);
321     return 0;
322   }
323
324   mtev_connection_update_timeout(nctx);
325   while(1) {
326     switch(ctx->state) {
327       case JLOG_STREAMER_WANT_INITIATE:
328         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
329                               sizeof(ctx->jlog_feed_cmd),
330                               &mask, e);
331         if(len < 0) {
332           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
333           mtevL(noit_error, "[%s] [%s] initiating stream failed -> %d/%s.\n",
334                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)", errno, strerror(errno));
335           goto socket_error;
336         }
337         if(len != sizeof(ctx->jlog_feed_cmd)) {
338           mtevL(noit_error, "[%s] [%s] short write [%d/%d] on initiating stream.\n",
339                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)",
340                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
341           goto socket_error;
342         }
343         ctx->state = JLOG_STREAMER_WANT_COUNT;
344         break;
345
346       case JLOG_STREAMER_WANT_ERROR:
347         FULLREAD(e, ctx, 0 - ctx->count);
348         mtevL(noit_error, "[%s] [%s] %.*s\n", nctx->remote_str ? nctx->remote_str : "(null)",
349               nctx->remote_cn ? nctx->remote_cn : "(null)", 0 - ctx->count, ctx->buffer);
350         free(ctx->buffer); ctx->buffer = NULL;
351         goto socket_error;
352         break;
353
354       case JLOG_STREAMER_WANT_COUNT:
355         FULLREAD(e, ctx, sizeof(u_int32_t));
356         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
357         ctx->count = ntohl(dummy.count);
358         ctx->needs_chkpt = 0;
359         free(ctx->buffer); ctx->buffer = NULL;
360         STRATCON_STREAM_COUNT(e->fd, (char *)feedtype,
361                                    nctx->remote_str, (char *)cn_expected,
362                                    ctx->count);
363         if(ctx->count < 0)
364           ctx->state = JLOG_STREAMER_WANT_ERROR;
365         else
366           ctx->state = JLOG_STREAMER_WANT_HEADER;
367         break;
368
369       case JLOG_STREAMER_WANT_HEADER:
370         if(ctx->count == 0) {
371           ctx->state = JLOG_STREAMER_WANT_COUNT;
372           break;
373         }
374         FULLREAD(e, ctx, sizeof(ctx->header));
375         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
376         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
377         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
378         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
379         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
380         ctx->header.message_len = ntohl(dummy.header.message_len);
381         STRATCON_STREAM_HEADER(e->fd, (char *)feedtype,
382                                     nctx->remote_str, (char *)cn_expected,
383                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
384                                     ctx->header.tv_sec, ctx->header.tv_usec,
385                                     ctx->header.message_len);
386         free(ctx->buffer); ctx->buffer = NULL;
387         ctx->state = JLOG_STREAMER_WANT_BODY;
388         break;
389
390       case JLOG_STREAMER_WANT_BODY:
391         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
392         STRATCON_STREAM_BODY(e->fd, (char *)feedtype,
393                                   nctx->remote_str, (char *)cn_expected,
394                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
395                                   ctx->header.tv_sec, ctx->header.tv_usec,
396                                   ctx->buffer);
397         if(ctx->header.message_len > 0) {
398           ctx->needs_chkpt = 1;
399           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
400                     ctx->buffer, NULL);
401         }
402         else if(ctx->buffer)
403           free(ctx->buffer);
404         /* Don't free the buffer, it's used by the datastore process. */
405         ctx->buffer = NULL;
406         ctx->count--;
407         ctx->total_events++;
408         if(ctx->count == 0 && ctx->needs_chkpt) {
409           eventer_t completion_e;
410           eventer_remove_fd(e->fd);
411           completion_e = eventer_alloc();
412           memcpy(completion_e, e, sizeof(*e));
413           nctx->e = completion_e;
414           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
415           ctx->state = JLOG_STREAMER_IS_ASYNC;
416           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
417                     NULL, completion_e);
418           mtevL(noit_debug, "Pushing %s batch async [%s] [%s]: [%u/%u]\n",
419                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
420                 nctx->remote_str ? nctx->remote_str : "(null)",
421                 nctx->remote_cn ? nctx->remote_cn : "(null)",
422                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
423           mtev_connection_disable_timeout(nctx);
424           return 0;
425         }
426         else if(ctx->count == 0)
427           ctx->state = JLOG_STREAMER_WANT_CHKPT;
428         else
429           ctx->state = JLOG_STREAMER_WANT_HEADER;
430         break;
431
432       case JLOG_STREAMER_IS_ASYNC:
433         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
434       case JLOG_STREAMER_WANT_CHKPT:
435         mtevL(noit_debug, "Pushing %s checkpoint [%s] [%s]: [%u/%u]\n",
436               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
437               nctx->remote_str ? nctx->remote_str : "(null)",
438               nctx->remote_cn ? nctx->remote_cn : "(null)",
439               ctx->header.chkpt.log, ctx->header.chkpt.marker);
440         n_chkpt.log = htonl(ctx->header.chkpt.log);
441         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
442
443         /* screw short writes.  I'd rather die than not write my data! */
444         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
445                               &mask, e);
446         if(len < 0) {
447           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
448           goto socket_error;
449         }
450         if(len != sizeof(jlog_id)) {
451           mtevL(noit_error, "[%s] [%s] short write on checkpointing stream.\n",
452             nctx->remote_str ? nctx->remote_str : "(null)",
453             nctx->remote_cn ? nctx->remote_cn : "(null)");
454           goto socket_error;
455         }
456         STRATCON_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
457                                         nctx->remote_str, (char *)cn_expected,
458                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
459         ctx->state = JLOG_STREAMER_WANT_COUNT;
460         break;
461     }
462   }
463   /* never get here */
464 }
465
466
467 int
468 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
469   int rv = -1;
470   void *vip;
471   pthread_mutex_lock(&noit_ip_by_cn_lock);
472   if(mtev_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
473     int new_len;
474     char *new_ip = (char *)vip;
475     new_len = strlen(new_ip);
476     strlcpy(ip, new_ip, len);
477     if(new_len >= len) rv = new_len+1;
478     else rv = 0;
479   }
480   pthread_mutex_unlock(&noit_ip_by_cn_lock);
481   return rv;
482 }
483 void
484 stratcon_jlog_streamer_recache_noit() {
485   int di, cnt;
486   mtev_conf_section_t *noit_configs;
487   noit_configs = mtev_conf_get_sections(NULL, "//noits//noit", &cnt);
488   pthread_mutex_lock(&noit_ip_by_cn_lock);
489   mtev_hash_delete_all(&noit_ip_by_cn, free, free);
490   for(di=0; di<cnt; di++) {
491     char address[64];
492     if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
493                                  address, sizeof(address))) {
494       char expected_cn[256];
495       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
496                                  expected_cn, sizeof(expected_cn)))
497         mtev_hash_store(&noit_ip_by_cn,
498                         strdup(expected_cn), strlen(expected_cn),
499                         strdup(address));
500     }
501   }
502   free(noit_configs);
503   pthread_mutex_unlock(&noit_ip_by_cn_lock);
504 }
505 void
506 stratcon_jlog_streamer_reload(const char *toplevel) {
507   /* flush and repopulate the cn cache */
508   stratcon_jlog_streamer_recache_noit();
509   if(!stratcon_datastore_get_enabled()) return;
510   stratcon_streamer_connection(toplevel, NULL, "noit",
511                                stratcon_jlog_recv_handler,
512                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
513                                NULL,
514                                jlog_streamer_ctx_free);
515 }
516
517 char *
518 stratcon_console_noit_opts(mtev_console_closure_t ncct,
519                            mtev_console_state_stack_t *stack,
520                            mtev_console_state_t *dstate,
521                            int argc, char **argv, int idx) {
522   if(argc == 1) {
523     mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
524     const char *key_id;
525     int klen, i = 0;
526     void *vconn, *vcn;
527     mtev_connection_ctx_t *ctx;
528     mtev_hash_table dedup = MTEV_HASH_EMPTY;
529
530     pthread_mutex_lock(&noits_lock);
531     while(mtev_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
532       ctx = (mtev_connection_ctx_t *)vconn;
533       vcn = NULL;
534       if(ctx->config && mtev_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
535          !mtev_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
536         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
537           if(idx == i) {
538             pthread_mutex_unlock(&noits_lock);
539             mtev_hash_destroy(&dedup, NULL, NULL);
540             return strdup(vcn);
541           }
542           i++;
543         }
544       }
545       if(ctx->remote_str &&
546          !mtev_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
547         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
548           if(idx == i) {
549             pthread_mutex_unlock(&noits_lock);
550             mtev_hash_destroy(&dedup, NULL, NULL);
551             return strdup(ctx->remote_str);
552           }
553           i++;
554         }
555       }
556     }
557     pthread_mutex_unlock(&noits_lock);
558     mtev_hash_destroy(&dedup, NULL, NULL);
559   }
560   if(argc == 2)
561     return mtev_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
562   return NULL;
563 }
564 static int
565 stratcon_console_show_noits(mtev_console_closure_t ncct,
566                             int argc, char **argv,
567                             mtev_console_state_t *dstate,
568                             void *closure) {
569   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
570   const char *key_id, *ecn;
571   int klen, n = 0, i;
572   void *vconn;
573   mtev_connection_ctx_t **ctx;
574
575   if(closure != (void *)0 && argc == 0) {
576     nc_printf(ncct, "takes an argument\n");
577     return 0;
578   }
579   if(closure == (void *)0 && argc > 0) {
580     nc_printf(ncct, "takes no arguments\n");
581     return 0;
582   }
583   pthread_mutex_lock(&noits_lock);
584   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
585   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
586                        &vconn)) {
587     ctx[n] = (mtev_connection_ctx_t *)vconn;
588     if(argc == 0 ||
589        !strcmp(ctx[n]->remote_str, argv[0]) ||
590        (ctx[n]->config && mtev_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
591         !strcmp(ecn, argv[0]))) {
592       mtev_connection_ctx_ref(ctx[n]);
593       n++;
594     }
595   }
596   pthread_mutex_unlock(&noits_lock);
597   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
598   for(i=0; i<n; i++) {
599     nc_print_noit_conn_brief(ncct, ctx[i]);
600     mtev_connection_ctx_deref(ctx[i]);
601   }
602   free(ctx);
603   return 0;
604 }
605
606 static void
607 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
608                        mtev_connection_ctx_t *nctx) {
609   struct timeval last, session_duration, diff;
610   u_int64_t session_duration_ms, last_event_ms;
611   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
612   char str[1024], *wr;
613   int len;
614   const char *cn_expected;
615   const char *feedtype = "unknown";
616
617   GET_FEEDTYPE(nctx, feedtype);
618   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
619
620   GET_EXPECTED_CN(nctx, cn_expected);
621   if(!cn_expected) return;
622
623   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
624            (long unsigned int)now->tv_sec,
625            (long unsigned int)now->tv_usec/1000UL,
626            uuid_str, cn_expected, feedtype);
627   wr = str + strlen(str);
628   len = sizeof(str) - (wr - str);
629
630   /* Now we write NAME TYPE VALUE into wr each time and push it */
631 #define push_noit_m_str(name, value) do { \
632   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
633   stratcon_datastore_push(DS_OP_INSERT, \
634                           (struct sockaddr *)&self_stratcon_ip, \
635                           self_stratcon_hostname, strdup(str), NULL); \
636   stratcon_iep_line_processor(DS_OP_INSERT, \
637                               (struct sockaddr *)&self_stratcon_ip, \
638                               self_stratcon_hostname, strdup(str), NULL); \
639 } while(0)
640 #define push_noit_m_u64(name, value) do { \
641   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
642   stratcon_datastore_push(DS_OP_INSERT, \
643                           (struct sockaddr *)&self_stratcon_ip, \
644                           self_stratcon_hostname, strdup(str), NULL); \
645   stratcon_iep_line_processor(DS_OP_INSERT, \
646                               (struct sockaddr *)&self_stratcon_ip, \
647                               self_stratcon_hostname, strdup(str), NULL); \
648 } while(0)
649
650   last.tv_sec = jctx->header.tv_sec;
651   last.tv_usec = jctx->header.tv_usec;
652   sub_timeval(*now, last, &diff);
653   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
654   sub_timeval(*now, nctx->last_connect, &session_duration);
655   session_duration_ms = session_duration.tv_sec * 1000 +
656                         session_duration.tv_usec / 1000;
657
658   push_noit_m_str("state", nctx->remote_cn ? "connected" :
659                              (nctx->retry_event ? "disconnected" :
660                                                   "connecting"));
661   push_noit_m_u64("last_event_age_ms", last_event_ms);
662   push_noit_m_u64("session_length_ms", session_duration_ms);
663 }
664 static int
665 periodic_noit_metrics(eventer_t e, int mask, void *closure,
666                       struct timeval *now) {
667   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
668   mtev_connection_ctx_t **ctxs;
669   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
670   const char *key_id;
671   void *vconn;
672   int klen, n = 0, i;
673   char str[1024];
674   char uuid_str[1024], tmp_uuid_str[UUID_STR_LEN+1];
675   struct timeval epoch, diff;
676   u_int64_t uptime = 0;
677   char ip_str[128];
678
679   inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
680             sizeof(ip_str));
681
682   uuid_str[0] = '\0';
683   uuid_unparse_lower(self_stratcon_id, tmp_uuid_str);
684   if(stratcon_selfcheck_extended_id) {
685     strlcat(uuid_str, ip_str, sizeof(uuid_str)-37);
686     strlcat(uuid_str, "`selfcheck`selfcheck`", sizeof(uuid_str)-37);
687   }
688   strlcat(uuid_str, tmp_uuid_str, sizeof(uuid_str));
689
690 #define PUSH_BOTH(type, str) do { \
691   stratcon_datastore_push(type, \
692                           (struct sockaddr *)&self_stratcon_ip, \
693                           self_stratcon_hostname, str, NULL); \
694   stratcon_iep_line_processor(type, \
695                               (struct sockaddr *)&self_stratcon_ip, \
696                               self_stratcon_hostname, str, NULL); \
697 } while(0)
698
699   if(closure == NULL) {
700     /* Only do this the first time we get called */
701     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
702              (long unsigned int)now->tv_sec,
703              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
704              self_stratcon_hostname);
705     PUSH_BOTH(DS_OP_INSERT, strdup(str));
706   }
707
708   pthread_mutex_lock(&noits_lock);
709   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
710   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
711                        &vconn)) {
712     ctxs[n] = (mtev_connection_ctx_t *)vconn;
713     mtev_connection_ctx_ref(ctxs[n]);
714     n++;
715   }
716   pthread_mutex_unlock(&noits_lock);
717
718   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
719            (long unsigned int)now->tv_sec,
720            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
721   PUSH_BOTH(DS_OP_INSERT, strdup(str));
722
723   if(eventer_get_epoch(&epoch) != 0)
724     memcpy(&epoch, now, sizeof(epoch));
725   sub_timeval(*now, epoch, &diff);
726   uptime = diff.tv_sec;
727   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
728            (long unsigned int)now->tv_sec,
729            (long unsigned int)now->tv_usec/1000UL,
730            uuid_str, (long long unsigned int)uptime);
731   PUSH_BOTH(DS_OP_INSERT, strdup(str));
732
733   for(i=0; i<n; i++) {
734     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
735     mtev_connection_ctx_deref(ctxs[i]);
736   }
737   free(ctxs);
738   PUSH_BOTH(DS_OP_CHKPT, NULL);
739
740   add_timeval(e->whence, whence, &whence);
741   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
742   return 0;
743 }
744
745 static int
746 rest_show_noits_json(mtev_http_rest_closure_t *restc,
747                      int npats, char **pats) {
748   const char *jsonstr;
749   struct json_object *doc, *nodes, *node;
750   mtev_hash_table seen = MTEV_HASH_EMPTY;
751   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
752   char path[256];
753   const char *key_id;
754   const char *type = NULL, *want_cn = NULL;
755   int klen, n = 0, i, di, cnt;
756   void *vconn;
757   mtev_connection_ctx_t **ctxs;
758   mtev_conf_section_t *noit_configs;
759   struct timeval now, diff, last;
760   mtev_http_request *req = mtev_http_session_request(restc->http_ctx);
761
762   mtev_http_process_querystring(req);
763   type = mtev_http_request_querystring(req, "type");
764   want_cn = mtev_http_request_querystring(req, "cn");
765
766   gettimeofday(&now, NULL);
767
768   pthread_mutex_lock(&noits_lock);
769   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
770   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
771                        &vconn)) {
772     ctxs[n] = (mtev_connection_ctx_t *)vconn;
773     mtev_connection_ctx_ref(ctxs[n]);
774     n++;
775   }
776   pthread_mutex_unlock(&noits_lock);
777   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
778
779   doc = json_object_new_object();
780   nodes = json_object_new_array();
781   json_object_object_add(doc, "nodes", nodes);
782  
783   for(i=0; i<n; i++) {
784     char buff[256];
785     const char *feedtype = "unknown", *state = "unknown";
786     mtev_connection_ctx_t *ctx = ctxs[i];
787     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
788
789     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
790
791     /* If the user requested a specific type and we're not it, skip. */
792     if(type && strcmp(feedtype, type)) {
793         mtev_connection_ctx_deref(ctx);
794         continue;
795     }
796     /* If the user wants a specific CN... limit to that. */
797     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
798         mtev_connection_ctx_deref(ctx);
799         continue;
800     }
801
802     node = json_object_new_object();
803     snprintf(buff, sizeof(buff), "%llu.%06d",
804              (long long unsigned)ctx->last_connect.tv_sec,
805              (int)ctx->last_connect.tv_usec);
806     json_object_object_add(node, "last_connect", json_object_new_string(buff));
807     json_object_object_add(node, "state",
808          json_object_new_string(ctx->remote_cn ?
809                                   "connected" :
810                                   (ctx->retry_event ? "disconnected" :
811                                                       "connecting")));
812     if(ctx->e) {
813       char buff[128];
814       const char *addrstr = NULL;
815       struct sockaddr_in6 addr6;
816       socklen_t len = sizeof(addr6);
817       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
818         unsigned short port = 0;
819         if(addr6.sin6_family == AF_INET) {
820           addrstr = inet_ntop(addr6.sin6_family,
821                               &((struct sockaddr_in *)&addr6)->sin_addr,
822                               buff, sizeof(buff));
823           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
824           port = ntohs(port);
825         }
826         else if(addr6.sin6_family == AF_INET6) {
827           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
828                               buff, sizeof(buff));
829           port = ntohs(addr6.sin6_port);
830         }
831         if(addrstr != NULL) {
832           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
833                    ":%u", port);
834           json_object_object_add(node, "local", json_object_new_string(buff));
835         }
836       }
837     }
838     mtev_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
839                       0, free, NULL);
840     json_object_object_add(node, "remote", json_object_new_string(ctx->remote_str));
841     json_object_object_add(node, "type", json_object_new_string(feedtype));
842     if(ctx->retry_event) {
843       sub_timeval(ctx->retry_event->whence, now, &diff);
844       snprintf(buff, sizeof(buff), "%llu.%06d",
845                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
846       json_object_object_add(node, "next_attempt", json_object_new_string(buff));
847     }
848     else if(ctx->remote_cn) {
849       if(ctx->remote_cn)
850         json_object_object_add(node, "remote_cn", json_object_new_string(ctx->remote_cn));
851  
852       switch(jctx->state) {
853         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
854         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
855         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
856         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
857         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
858         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
859         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
860       }
861       json_object_object_add(node, "state", json_object_new_string(state));
862       snprintf(buff, sizeof(buff), "%08x:%08x",
863                jctx->header.chkpt.log, jctx->header.chkpt.marker);
864       json_object_object_add(node, "checkpoint", json_object_new_string(buff));
865       snprintf(buff, sizeof(buff), "%llu",
866                (unsigned long long)jctx->total_events);
867       json_object_object_add(node, "session_events", json_object_new_string(buff));
868       snprintf(buff, sizeof(buff), "%llu",
869                (unsigned long long)jctx->total_bytes_read);
870       json_object_object_add(node, "session_bytes", json_object_new_string(buff));
871  
872       sub_timeval(now, ctx->last_connect, &diff);
873       snprintf(buff, sizeof(buff), "%lld.%06d",
874                (long long)diff.tv_sec, (int)diff.tv_usec);
875       json_object_object_add(node, "session_duration", json_object_new_string(buff));
876  
877       if(jctx->header.tv_sec) {
878         last.tv_sec = jctx->header.tv_sec;
879         last.tv_usec = jctx->header.tv_usec;
880         snprintf(buff, sizeof(buff), "%llu.%06d",
881                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
882         json_object_object_add(node, "last_event", json_object_new_string(buff));
883         sub_timeval(now, last, &diff);
884         snprintf(buff, sizeof(buff), "%lld.%06d",
885                  (long long)diff.tv_sec, (int)diff.tv_usec);
886         json_object_object_add(node, "last_event_age", json_object_new_string(buff));
887       }
888     }
889     json_object_array_add(nodes, node);
890     mtev_connection_ctx_deref(ctx);
891   }
892   free(ctxs);
893
894   if(!type || !strcmp(type, "configured")) {
895     snprintf(path, sizeof(path), "//noits//noit");
896     noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
897     for(di=0; di<cnt; di++) {
898       char address[64], port_str[32], remote_str[98];
899       char expected_cn_buff[256], *expected_cn = NULL;
900       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
901                                  expected_cn_buff, sizeof(expected_cn_buff)))
902         expected_cn = expected_cn_buff;
903       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
904       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
905                                  address, sizeof(address))) {
906         void *v;
907         if(!mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
908                                    port_str, sizeof(port_str)))
909           strlcpy(port_str, "43191", sizeof(port_str));
910
911         /* If the user wants a specific CN... limit to that. */
912           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
913             continue;
914
915         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
916         if(!mtev_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
917           node = json_object_new_object();
918           json_object_object_add(node, "remote", json_object_new_string(remote_str));
919           json_object_object_add(node, "type", json_object_new_string("configured"));
920           if(expected_cn)
921             json_object_object_add(node, "cn", json_object_new_string(expected_cn));
922           json_object_array_add(nodes, node);
923         }
924       }
925     }
926     free(noit_configs);
927   }
928   mtev_hash_destroy(&seen, free, NULL);
929
930   mtev_http_response_ok(restc->http_ctx, "application/json");
931   jsonstr = json_object_to_json_string(doc);
932   mtev_http_response_append(restc->http_ctx, jsonstr, strlen(jsonstr));
933   mtev_http_response_append(restc->http_ctx, "\n", 1);
934   json_object_put(doc);
935   mtev_http_response_end(restc->http_ctx);
936   return 0;
937 }
938 static int
939 rest_show_noits(mtev_http_rest_closure_t *restc,
940                 int npats, char **pats) {
941   xmlDocPtr doc;
942   xmlNodePtr root;
943   mtev_hash_table *hdrs, seen = MTEV_HASH_EMPTY;
944   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
945   char path[256];
946   const char *key_id, *accepthdr;
947   const char *type = NULL, *want_cn = NULL;
948   int klen, n = 0, i, di, cnt;
949   void *vconn;
950   mtev_connection_ctx_t **ctxs;
951   mtev_conf_section_t *noit_configs;
952   struct timeval now, diff, last;
953   xmlNodePtr node;
954   mtev_http_request *req = mtev_http_session_request(restc->http_ctx);
955
956   if(npats == 1 && !strcmp(pats[0], ".json"))
957     return rest_show_noits_json(restc, npats, pats);
958
959   hdrs = mtev_http_request_headers_table(req);
960   if(mtev_hash_retr_str(hdrs, "accept", strlen("accept"), &accepthdr)) {
961     char buf[256], *brkt, *part;
962     strlcpy(buf, accepthdr, sizeof(buf));
963     for(part = strtok_r(buf, ",", &brkt);
964         part;
965         part = strtok_r(NULL, ",", &brkt)) {
966       while(*part && isspace(*part)) part++;
967       if(!strcmp(part, "application/json")) {
968         return rest_show_noits_json(restc, npats, pats);
969       }
970     }
971   }
972
973   mtev_http_process_querystring(req);
974   type = mtev_http_request_querystring(req, "type");
975   want_cn = mtev_http_request_querystring(req, "cn");
976
977   gettimeofday(&now, NULL);
978
979   pthread_mutex_lock(&noits_lock);
980   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
981   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
982                        &vconn)) {
983     ctxs[n] = (mtev_connection_ctx_t *)vconn;
984     mtev_connection_ctx_ref(ctxs[n]);
985     n++;
986   }
987   pthread_mutex_unlock(&noits_lock);
988   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
989
990   doc = xmlNewDoc((xmlChar *)"1.0");
991   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
992   xmlDocSetRootElement(doc, root);
993
994   for(i=0; i<n; i++) {
995     char buff[256];
996     const char *feedtype = "unknown", *state = "unknown";
997     mtev_connection_ctx_t *ctx = ctxs[i];
998     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
999
1000     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1001
1002     /* If the user requested a specific type and we're not it, skip. */
1003     if(type && strcmp(feedtype, type)) {
1004         mtev_connection_ctx_deref(ctx);
1005         continue;
1006     }
1007     /* If the user wants a specific CN... limit to that. */
1008     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
1009         mtev_connection_ctx_deref(ctx);
1010         continue;
1011     }
1012
1013     node = xmlNewNode(NULL, (xmlChar *)"noit");
1014     snprintf(buff, sizeof(buff), "%llu.%06d",
1015              (long long unsigned)ctx->last_connect.tv_sec,
1016              (int)ctx->last_connect.tv_usec);
1017     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1018     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1019                (xmlChar *)"connected" :
1020                (ctx->retry_event ? (xmlChar *)"disconnected" :
1021                                     (xmlChar *)"connecting"));
1022     if(ctx->e) {
1023       char buff[128];
1024       const char *addrstr = NULL;
1025       struct sockaddr_in6 addr6;
1026       socklen_t len = sizeof(addr6);
1027       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1028         unsigned short port = 0;
1029         if(addr6.sin6_family == AF_INET) {
1030           addrstr = inet_ntop(addr6.sin6_family,
1031                               &((struct sockaddr_in *)&addr6)->sin_addr,
1032                               buff, sizeof(buff));
1033           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
1034           port = ntohs(port);
1035         }
1036         else if(addr6.sin6_family == AF_INET6) {
1037           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1038                               buff, sizeof(buff));
1039           port = ntohs(addr6.sin6_port);
1040         }
1041         if(addrstr != NULL) {
1042           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1043                    ":%u", port);
1044           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1045         }
1046       }
1047     }
1048     mtev_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1049                       0, free, NULL);
1050     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1051     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1052     if(ctx->retry_event) {
1053       sub_timeval(ctx->retry_event->whence, now, &diff);
1054       snprintf(buff, sizeof(buff), "%llu.%06d",
1055                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1056       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1057     }
1058     else if(ctx->remote_cn) {
1059       if(ctx->remote_cn)
1060         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1061  
1062       switch(jctx->state) {
1063         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1064         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1065         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1066         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1067         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1068         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1069         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1070       }
1071       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1072       snprintf(buff, sizeof(buff), "%08x:%08x",
1073                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1074       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1075       snprintf(buff, sizeof(buff), "%llu",
1076                (unsigned long long)jctx->total_events);
1077       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1078       snprintf(buff, sizeof(buff), "%llu",
1079                (unsigned long long)jctx->total_bytes_read);
1080       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1081  
1082       sub_timeval(now, ctx->last_connect, &diff);
1083       snprintf(buff, sizeof(buff), "%lld.%06d",
1084                (long long)diff.tv_sec, (int)diff.tv_usec);
1085       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1086  
1087       if(jctx->header.tv_sec) {
1088         last.tv_sec = jctx->header.tv_sec;
1089         last.tv_usec = jctx->header.tv_usec;
1090         snprintf(buff, sizeof(buff), "%llu.%06d",
1091                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1092         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1093         sub_timeval(now, last, &diff);
1094         snprintf(buff, sizeof(buff), "%lld.%06d",
1095                  (long long)diff.tv_sec, (int)diff.tv_usec);
1096         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1097       }
1098     }
1099
1100     xmlAddChild(root, node);
1101     mtev_connection_ctx_deref(ctx);
1102   }
1103   free(ctxs);
1104
1105   if(!type || !strcmp(type, "configured")) {
1106     snprintf(path, sizeof(path), "//noits//noit");
1107     noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1108     for(di=0; di<cnt; di++) {
1109       char address[64], port_str[32], remote_str[98];
1110       char expected_cn_buff[256], *expected_cn = NULL;
1111       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1112                                  expected_cn_buff, sizeof(expected_cn_buff)))
1113         expected_cn = expected_cn_buff;
1114       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1115       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1116                                  address, sizeof(address))) {
1117         void *v;
1118         if(!mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1119                                    port_str, sizeof(port_str)))
1120           strlcpy(port_str, "43191", sizeof(port_str));
1121
1122         /* If the user wants a specific CN... limit to that. */
1123           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1124             continue;
1125
1126         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1127         if(!mtev_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1128           node = xmlNewNode(NULL, (xmlChar *)"noit");
1129           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1130           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1131           if(expected_cn)
1132             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1133           xmlAddChild(root, node);
1134         }
1135       }
1136     }
1137     free(noit_configs);
1138   }
1139   mtev_hash_destroy(&seen, free, NULL);
1140
1141   mtev_http_response_ok(restc->http_ctx, "text/xml");
1142   mtev_http_response_xml(restc->http_ctx, doc);
1143   mtev_http_response_end(restc->http_ctx);
1144   xmlFreeDoc(doc);
1145   return 0;
1146 }
1147 static int
1148 stratcon_add_noit(const char *target, unsigned short port,
1149                   const char *cn) {
1150   int cnt;
1151   char path[256];
1152   char port_str[6];
1153   mtev_conf_section_t *noit_configs, parent;
1154   xmlNodePtr newnoit, config, cnnode;
1155
1156   snprintf(path, sizeof(path),
1157            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1158   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1159   free(noit_configs);
1160   if(cnt != 0) return -1;
1161
1162   parent = mtev_conf_get_section(NULL, "//noits//include//noits");
1163   if(!parent) parent = mtev_conf_get_section(NULL, "//noits");
1164   if(!parent) return -1;
1165   snprintf(port_str, sizeof(port_str), "%d", port);
1166   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1167   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1168   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1169   xmlAddChild(parent, newnoit);
1170   if(cn) {
1171     config = xmlNewNode(NULL, (xmlChar *)"config");
1172     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1173     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1174     xmlAddChild(config, cnnode);
1175     xmlAddChild(newnoit, config);
1176     pthread_mutex_lock(&noit_ip_by_cn_lock);
1177     mtev_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1178                       strdup(target), free, free);
1179     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1180   }
1181   if(stratcon_datastore_get_enabled())
1182     stratcon_streamer_connection(NULL, target, "noit",
1183                                  stratcon_jlog_recv_handler,
1184                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1185                                  NULL,
1186                                  jlog_streamer_ctx_free);
1187   if(stratcon_iep_get_enabled())
1188     stratcon_streamer_connection(NULL, target, "noit",
1189                                  stratcon_jlog_recv_handler,
1190                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1191                                  NULL,
1192                                  jlog_streamer_ctx_free);
1193   return 1;
1194 }
1195 static int
1196 stratcon_remove_noit(const char *target, unsigned short port, const char *cn) {
1197   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
1198   const char *key_id;
1199   int klen, n = -1, i, cnt = 0;
1200   void *vconn;
1201   mtev_connection_ctx_t **ctx;
1202   mtev_conf_section_t *noit_configs;
1203   char path[256];
1204   char remote_str[256];
1205
1206   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1207
1208   snprintf(path, sizeof(path),
1209            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1210   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1211   for(i=0; i<cnt; i++) {
1212     char expected_cn[256];
1213     if(mtev_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1214                                expected_cn, sizeof(expected_cn))) {
1215       if(!cn || !strcmp(cn, expected_cn)) {
1216         pthread_mutex_lock(&noit_ip_by_cn_lock);
1217         mtev_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1218                          free, free);
1219         pthread_mutex_unlock(&noit_ip_by_cn_lock);
1220       }
1221     }
1222     else if(cn) continue;
1223     CONF_REMOVE(noit_configs[i]);
1224     xmlUnlinkNode(noit_configs[i]);
1225     xmlFreeNode(noit_configs[i]);
1226     n = 0;
1227   }
1228   free(noit_configs);
1229
1230   pthread_mutex_lock(&noits_lock);
1231   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
1232   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
1233                        &vconn)) {
1234     if(!strcmp(((mtev_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1235       ctx[n] = (mtev_connection_ctx_t *)vconn;
1236       mtev_connection_ctx_ref(ctx[n]);
1237       n++;
1238     }
1239   }
1240   pthread_mutex_unlock(&noits_lock);
1241   for(i=0; i<n; i++) {
1242     mtev_connection_ctx_dealloc(ctx[i]); /* once for the record */
1243     mtev_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1244   }
1245   free(ctx);
1246   return n;
1247 }
1248 static int
1249 rest_set_noit(mtev_http_rest_closure_t *restc,
1250               int npats, char **pats) {
1251   const char *cn = NULL;
1252   mtev_http_session_ctx *ctx = restc->http_ctx;
1253   mtev_http_request *req = mtev_http_session_request(ctx);
1254   unsigned short port = 43191;
1255   if(npats < 1 || npats > 2)
1256     mtev_http_response_server_error(ctx, "text/xml");
1257   if(npats == 2) port = atoi(pats[1]);
1258   mtev_http_process_querystring(req);
1259   cn = mtev_http_request_querystring(req, "cn");
1260   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1261     mtev_http_response_ok(ctx, "text/xml");
1262   else
1263     mtev_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1264   if(mtev_conf_write_file(NULL) != 0)
1265     mtevL(noit_error, "local config write failed\n");
1266   mtev_conf_mark_changed();
1267   mtev_http_response_end(ctx);
1268   return 0;
1269 }
1270 static int
1271 rest_delete_noit(mtev_http_rest_closure_t *restc,
1272                  int npats, char **pats) {
1273   mtev_http_session_ctx *ctx = restc->http_ctx;
1274   mtev_http_request *req = mtev_http_session_request(ctx);
1275   unsigned short port = 43191;
1276   if(npats < 1 || npats > 2)
1277     mtev_http_response_server_error(ctx, "text/xml");
1278   if(npats == 2) port = atoi(pats[1]);
1279
1280   const char *want_cn = mtev_http_request_querystring(req, "cn");
1281   if(stratcon_remove_noit(pats[0], port, want_cn) >= 0)
1282     mtev_http_response_ok(ctx, "text/xml");
1283   else
1284     mtev_http_response_not_found(ctx, "text/xml");
1285   if(mtev_conf_write_file(NULL) != 0)
1286     mtevL(noit_error, "local config write failed\n");
1287   mtev_conf_mark_changed();
1288   mtev_http_response_end(ctx);
1289   return 0;
1290 }
1291 static int
1292 stratcon_console_conf_noits(mtev_console_closure_t ncct,
1293                             int argc, char **argv,
1294                             mtev_console_state_t *dstate,
1295                             void *closure) {
1296   char *cp, target[128];
1297   unsigned short port = 43191;
1298   int adding = (int)(vpsized_int)closure;
1299   char *cn = NULL;
1300   if(argc != 1 && argc != 2)
1301     return -1;
1302
1303   if(argc == 2) cn = argv[1];
1304
1305   cp = strchr(argv[0], ':');
1306   if(cp) {
1307     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1308     port = atoi(cp+1);
1309   }
1310   else strlcpy(target, argv[0], sizeof(target));
1311   if(adding) {
1312     if(stratcon_add_noit(target, port, cn) >= 0) {
1313       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1314     }
1315     else {
1316       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1317     }
1318   }
1319   else {
1320     if(stratcon_remove_noit(target, port, cn) >= 0) {
1321       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1322     }
1323     else {
1324       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1325     }
1326   }
1327   return 0;
1328 }
1329
1330 static void
1331 register_console_streamer_commands() {
1332   mtev_console_state_t *tl;
1333   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1334
1335   tl = mtev_console_state_initial();
1336   showcmd = mtev_console_state_get_cmd(tl, "show");
1337   mtevAssert(showcmd && showcmd->dstate);
1338   confcmd = mtev_console_state_get_cmd(tl, "configure");
1339   conftcmd = mtev_console_state_get_cmd(confcmd->dstate, "terminal");
1340   conftnocmd = mtev_console_state_get_cmd(conftcmd->dstate, "no");
1341
1342   mtev_console_state_add_cmd(conftcmd->dstate,
1343     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1344   mtev_console_state_add_cmd(conftnocmd->dstate,
1345     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1346
1347   mtev_console_state_add_cmd(showcmd->dstate,
1348     NCSCMD("noit", stratcon_console_show_noits,
1349            stratcon_console_noit_opts, NULL, (void *)1));
1350   mtev_console_state_add_cmd(showcmd->dstate,
1351     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1352 }
1353
1354 int
1355 stratcon_streamer_connection(const char *toplevel, const char *destination,
1356                              const char *type,
1357                              eventer_func_t handler,
1358                              void *(*handler_alloc)(void), void *handler_ctx,
1359                              void (*handler_free)(void *)) {
1360   return mtev_connections_from_config(&noits, &noits_lock,
1361                                       toplevel, destination, type,
1362                                       handler, handler_alloc, handler_ctx,
1363                                       handler_free);
1364 }
1365
1366 mtev_reverse_acl_decision_t
1367 mtev_reverse_socket_allow_noits(const char *id, acceptor_closure_t *ac) {
1368   if(!strncmp(id, "noit/", 5)) return MTEV_ACL_ALLOW;
1369   return MTEV_ACL_ABSTAIN;
1370 }
1371
1372 void
1373 stratcon_jlog_streamer_init(const char *toplevel) {
1374   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1375   struct in_addr remote;
1376   char uuid_str[UUID_STR_LEN + 1];
1377
1378   mtev_reverse_socket_acl(mtev_reverse_socket_allow_noits);
1379   pthread_mutex_init(&noits_lock, NULL);
1380   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1381   eventer_name_callback("stratcon_jlog_recv_handler",
1382                         stratcon_jlog_recv_handler);
1383   register_console_streamer_commands();
1384   stratcon_jlog_streamer_reload(toplevel);
1385   stratcon_streamer_connection(toplevel, "", "noit", NULL, NULL, NULL, NULL);
1386   mtevAssert(mtev_http_rest_register_auth(
1387     "GET", "/noits/", "^show(.json)?$", rest_show_noits,
1388              mtev_http_rest_client_cert_auth
1389   ) == 0);
1390   mtevAssert(mtev_http_rest_register_auth(
1391     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1392              mtev_http_rest_client_cert_auth
1393   ) == 0);
1394   mtevAssert(mtev_http_rest_register_auth(
1395     "PUT", "/noits/", "^set/([^/:]*):(\\d+)$", rest_set_noit,
1396              mtev_http_rest_client_cert_auth
1397   ) == 0);
1398   mtevAssert(mtev_http_rest_register_auth(
1399     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1400              mtev_http_rest_client_cert_auth
1401   ) == 0);
1402   mtevAssert(mtev_http_rest_register_auth(
1403     "DELETE", "/noits/", "^delete/([^/:]*):(\\d+)$", rest_delete_noit,
1404              mtev_http_rest_client_cert_auth
1405   ) == 0);
1406
1407   uuid_clear(self_stratcon_id);
1408
1409   if(mtev_conf_get_stringbuf(NULL, "/stratcon/@id",
1410                              uuid_str, sizeof(uuid_str)) &&
1411      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1412     int period;
1413     mtev_conf_get_boolean(NULL, "/stratcon/@extended_id",
1414                           &stratcon_selfcheck_extended_id);
1415     /* If a UUID was provided for stratcon itself, we will report metrics
1416      * on a large variety of things (including all noits).
1417      */
1418     if(mtev_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1419        period > 0) {
1420       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1421       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1422     }
1423     self_stratcon_ip.sin_family = AF_INET;
1424     remote.s_addr = 0xffffffff;
1425     mtev_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1426     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1427     eventer_add_in(periodic_noit_metrics, NULL, whence);
1428   }
1429 }
Note: See TracBrowser for help on using the browser.