root/src/stratcon_jlog_streamer.c

Revision 2ff4db5a6730270eb30827e23883ed354c42ddf6, 53.9 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 2 weeks ago)

Explicitly Initialize Mtev Hash Tables

Rather than using MTEV_HASH_EMPTY or not calling any initialization at
all, explicitly initialize hash tables.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <unistd.h>
37 #include <errno.h>
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #ifdef HAVE_SYS_FILIO_H
41 #include <sys/filio.h>
42 #endif
43 #include <netinet/in.h>
44 #include <sys/un.h>
45 #include <arpa/inet.h>
46
47 #include <eventer/eventer.h>
48 #include <mtev_conf.h>
49 #include <mtev_hash.h>
50 #include <mtev_log.h>
51 #include <mtev_getip.h>
52 #include <mtev_rest.h>
53 #include <mtev_json.h>
54
55 #include "noit_mtev_bridge.h"
56 #include "stratcon_dtrace_probes.h"
57 #include "noit_jlog_listener.h"
58 #include "stratcon_datastore.h"
59 #include "stratcon_jlog_streamer.h"
60 #include "stratcon_iep.h"
61
62 pthread_mutex_t noits_lock;
63 mtev_hash_table noits;
64 pthread_mutex_t noit_ip_by_cn_lock;
65 mtev_hash_table noit_ip_by_cn;
66 static uuid_t self_stratcon_id;
67 static char self_stratcon_hostname[256] = "\0";
68 static struct sockaddr_in self_stratcon_ip;
69 static mtev_boolean stratcon_selfcheck_extended_id = mtev_true;
70
71 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
72
73 static const char *feed_type_to_str(int jlog_feed_cmd) {
74   switch(jlog_feed_cmd) {
75     case NOIT_JLOG_DATA_FEED: return "durable/storage";
76     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
77   }
78   return "unknown";
79 }
80
81 #define GET_EXPECTED_CN(nctx, cn) do { \
82   void *vcn; \
83   cn = NULL; \
84   if(nctx->config && \
85      mtev_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
86      cn = vcn; \
87   } \
88 } while(0)
89 #define GET_FEEDTYPE(nctx, feedtype) do { \
90   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
91   feedtype = "unknown"; \
92   if(_jctx->push == stratcon_datastore_push) \
93     feedtype = "storage"; \
94   else if(_jctx->push == stratcon_iep_line_processor) \
95     feedtype = "iep"; \
96 } while(0)
97
98 static int
99 remote_str_sort(const void *a, const void *b) {
100   int rv;
101   mtev_connection_ctx_t * const *actx = a;
102   mtev_connection_ctx_t * const *bctx = b;
103   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
104   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
105   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
106   if(rv) return rv;
107   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
108            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
109 }
110 static void
111 nc_print_noit_conn_brief(mtev_console_closure_t ncct,
112                           mtev_connection_ctx_t *ctx) {
113   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
114   struct timeval now, diff, session_duration;
115   char cmdbuf[4096];
116   const char *feedtype = "unknown";
117   const char *lasttime = "never";
118   const char *config_cn = NULL;
119   void *vcn;
120   if(ctx->last_connect.tv_sec != 0) {
121     time_t r = ctx->last_connect.tv_sec;
122     struct tm tbuf, *tm;
123     tm = gmtime_r(&r, &tbuf);
124     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
125     lasttime = cmdbuf;
126   }
127   nc_printf(ncct, "%s [%s]:\n", ctx->remote_str,
128             ctx->remote_cn ? "connected" :
129                              (ctx->retry_event ? "disconnected" :
130                                                    "connecting"));
131   if(ctx->config &&
132      mtev_hash_retrieve(ctx->config, "cn", strlen("cn"), &vcn)) {
133      config_cn = vcn;
134   }
135   if(config_cn) nc_printf(ncct, "\tcn: %s\n", config_cn);
136   nc_printf(ncct, "\tLast connect: %s\n", lasttime);
137   if(ctx->e) {
138     char buff[128];
139     const char *addrstr = NULL;
140     struct sockaddr_in6 addr6;
141     socklen_t len = sizeof(addr6);
142     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
143       unsigned short port = 0;
144       if(addr6.sin6_family == AF_INET) {
145         addrstr = inet_ntop(addr6.sin6_family,
146                             &((struct sockaddr_in *)&addr6)->sin_addr,
147                             buff, sizeof(buff));
148         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
149         port = ntohs(port);
150       }
151       else if(addr6.sin6_family == AF_INET6) {
152         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
153                             buff, sizeof(buff));
154         port = ntohs(addr6.sin6_port);
155       }
156       if(addrstr != NULL)
157         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
158       else
159         nc_printf(ncct, "\tLocal address not interpretable\n");
160     }
161     else {
162       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
163                 ctx->e->fd, strerror(errno));
164     }
165   }
166   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
167   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
168   gettimeofday(&now, NULL);
169   if(ctx->timeout_event) {
170     sub_timeval(ctx->timeout_event->whence, now, &diff);
171     nc_printf(ncct, "\tTimeout scheduled for %lld.%06us\n",
172               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
173   }
174   if(ctx->retry_event) {
175     sub_timeval(ctx->retry_event->whence, now, &diff);
176     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
177               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
178   }
179   else if(ctx->remote_cn) {
180     nc_printf(ncct, "\tRemote CN: '%s'\n",
181               ctx->remote_cn ? ctx->remote_cn : "???");
182     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
183       struct timeval last;
184       double session_duration_seconds;
185       const char *state = "unknown";
186
187       switch(jctx->state) {
188         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
189         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
190         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
191         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
192         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
193         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
194         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
195       }
196       last.tv_sec = jctx->header.tv_sec;
197       last.tv_usec = jctx->header.tv_usec;
198       sub_timeval(now, last, &diff);
199       sub_timeval(now, ctx->last_connect, &session_duration);
200       session_duration_seconds = session_duration.tv_sec +
201                                  (double)session_duration.tv_usec/1000000.0;
202       nc_printf(ncct, "\tState: %s\n"
203                       "\tNext checkpoint: [%08x:%08x]\n"
204                       "\tLast event: %lld.%06us ago\n"
205                       "\tEvents this session: %llu (%0.2f/s)\n"
206                       "\tOctets this session: %llu (%0.2f/s)\n",
207                 state,
208                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
209                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
210                 jctx->total_events,
211                 (double)jctx->total_events/session_duration_seconds,
212                 jctx->total_bytes_read,
213                 (double)jctx->total_bytes_read/session_duration_seconds);
214     }
215     else {
216       nc_printf(ncct, "\tUnknown type.\n");
217     }
218   }
219 }
220
221 jlog_streamer_ctx_t *
222 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
223   jlog_streamer_ctx_t *ctx;
224   ctx = stratcon_jlog_streamer_ctx_alloc();
225   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
226   ctx->push = stratcon_datastore_push;
227   return ctx;
228 }
229 jlog_streamer_ctx_t *
230 stratcon_jlog_streamer_ctx_alloc(void) {
231   jlog_streamer_ctx_t *ctx;
232   ctx = calloc(1, sizeof(*ctx));
233   return ctx;
234 }
235
236 void
237 jlog_streamer_ctx_free(void *cl) {
238   jlog_streamer_ctx_t *ctx = cl;
239   if(ctx->buffer) free(ctx->buffer);
240   free(ctx);
241 }
242
243 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
244 static int
245 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
246   int len, mask;
247   while(ctx->bytes_read < ctx->bytes_expected) {
248     len = Eread(ctx->buffer + ctx->bytes_read,
249                 ctx->bytes_expected - ctx->bytes_read);
250     if(len < 0) {
251       *newmask = mask;
252       return -1;
253     }
254     /* if we get 0 inside SSL, and there was a real error, we
255      * will actually get a -1 here.
256      * if(len == 0) return ctx->bytes_read;
257      */
258     ctx->total_bytes_read += len;
259     ctx->bytes_read += len;
260   }
261   mtevAssert(ctx->bytes_read == ctx->bytes_expected);
262   return ctx->bytes_read;
263 }
264 #define FULLREAD(e,ctx,size) do { \
265   int mask, len; \
266   if(!ctx->bytes_expected) { \
267     ctx->bytes_expected = size; \
268     if(ctx->buffer) free(ctx->buffer); \
269     ctx->buffer = malloc(size + 1); \
270     if(ctx->buffer == NULL) { \
271       mtevL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
272       goto socket_error; \
273     } \
274     ctx->buffer[size] = '\0'; \
275   } \
276   len = __read_on_ctx(e, ctx, &mask); \
277   if(len < 0) { \
278     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
279     mtevL(noit_error, "[%s] [%s] SSL read error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)", \
280           nctx->remote_cn ? nctx->remote_cn : "(null)", \
281           strerror(errno)); \
282     goto socket_error; \
283   } \
284   ctx->bytes_read = 0; \
285   ctx->bytes_expected = 0; \
286   if(len != size) { \
287     mtevL(noit_error, "[%s] [%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
288           nctx->remote_str ? nctx->remote_str : "(null)", \
289           nctx->remote_cn ? nctx->remote_cn : "(null)", \
290           ctx->state, len, (long unsigned int)size); \
291     goto socket_error; \
292   } \
293 } while(0)
294
295 int
296 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
297                            struct timeval *now) {
298   mtev_connection_ctx_t *nctx = closure;
299   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
300   jlog_streamer_ctx_t dummy;
301   int len;
302   jlog_id n_chkpt;
303   const char *cn_expected, *feedtype;
304   GET_EXPECTED_CN(nctx, cn_expected);
305   GET_FEEDTYPE(nctx, feedtype);
306
307   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
308     if(write(e->fd, e, 0) == -1)
309       mtevL(noit_error, "[%s] [%s] socket error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)",
310             nctx->remote_cn ? nctx->remote_cn : "(null)", strerror(errno));
311  socket_error:
312     ctx->state = JLOG_STREAMER_WANT_INITIATE;
313     ctx->count = 0;
314     ctx->needs_chkpt = 0;
315     ctx->bytes_read = 0;
316     ctx->bytes_expected = 0;
317     if(ctx->buffer) free(ctx->buffer);
318     ctx->buffer = NULL;
319     nctx->schedule_reattempt(nctx, now);
320     nctx->close(nctx, e);
321     return 0;
322   }
323
324   mtev_connection_update_timeout(nctx);
325   while(1) {
326     switch(ctx->state) {
327       case JLOG_STREAMER_WANT_INITIATE:
328         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
329                               sizeof(ctx->jlog_feed_cmd),
330                               &mask, e);
331         if(len < 0) {
332           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
333           mtevL(noit_error, "[%s] [%s] initiating stream failed -> %d/%s.\n",
334                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)", errno, strerror(errno));
335           goto socket_error;
336         }
337         if(len != sizeof(ctx->jlog_feed_cmd)) {
338           mtevL(noit_error, "[%s] [%s] short write [%d/%d] on initiating stream.\n",
339                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)",
340                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
341           goto socket_error;
342         }
343         ctx->state = JLOG_STREAMER_WANT_COUNT;
344         break;
345
346       case JLOG_STREAMER_WANT_ERROR:
347         FULLREAD(e, ctx, 0 - ctx->count);
348         mtevL(noit_error, "[%s] [%s] %.*s\n", nctx->remote_str ? nctx->remote_str : "(null)",
349               nctx->remote_cn ? nctx->remote_cn : "(null)", 0 - ctx->count, ctx->buffer);
350         free(ctx->buffer); ctx->buffer = NULL;
351         goto socket_error;
352         break;
353
354       case JLOG_STREAMER_WANT_COUNT:
355         FULLREAD(e, ctx, sizeof(u_int32_t));
356         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
357         ctx->count = ntohl(dummy.count);
358         ctx->needs_chkpt = 0;
359         free(ctx->buffer); ctx->buffer = NULL;
360         STRATCON_STREAM_COUNT(e->fd, (char *)feedtype,
361                                    nctx->remote_str, (char *)cn_expected,
362                                    ctx->count);
363         if(ctx->count < 0)
364           ctx->state = JLOG_STREAMER_WANT_ERROR;
365         else
366           ctx->state = JLOG_STREAMER_WANT_HEADER;
367         break;
368
369       case JLOG_STREAMER_WANT_HEADER:
370         if(ctx->count == 0) {
371           ctx->state = JLOG_STREAMER_WANT_COUNT;
372           break;
373         }
374         FULLREAD(e, ctx, sizeof(ctx->header));
375         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
376         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
377         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
378         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
379         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
380         ctx->header.message_len = ntohl(dummy.header.message_len);
381         STRATCON_STREAM_HEADER(e->fd, (char *)feedtype,
382                                     nctx->remote_str, (char *)cn_expected,
383                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
384                                     ctx->header.tv_sec, ctx->header.tv_usec,
385                                     ctx->header.message_len);
386         free(ctx->buffer); ctx->buffer = NULL;
387         ctx->state = JLOG_STREAMER_WANT_BODY;
388         break;
389
390       case JLOG_STREAMER_WANT_BODY:
391         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
392         STRATCON_STREAM_BODY(e->fd, (char *)feedtype,
393                                   nctx->remote_str, (char *)cn_expected,
394                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
395                                   ctx->header.tv_sec, ctx->header.tv_usec,
396                                   ctx->buffer);
397         if(ctx->header.message_len > 0) {
398           ctx->needs_chkpt = 1;
399           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
400                     ctx->buffer, NULL);
401         }
402         else if(ctx->buffer)
403           free(ctx->buffer);
404         /* Don't free the buffer, it's used by the datastore process. */
405         ctx->buffer = NULL;
406         ctx->count--;
407         ctx->total_events++;
408         if(ctx->count == 0 && ctx->needs_chkpt) {
409           eventer_t completion_e;
410           eventer_remove_fd(e->fd);
411           completion_e = eventer_alloc();
412           memcpy(completion_e, e, sizeof(*e));
413           nctx->e = completion_e;
414           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
415           ctx->state = JLOG_STREAMER_IS_ASYNC;
416           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
417                     NULL, completion_e);
418           mtevL(noit_debug, "Pushing %s batch async [%s] [%s]: [%u/%u]\n",
419                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
420                 nctx->remote_str ? nctx->remote_str : "(null)",
421                 nctx->remote_cn ? nctx->remote_cn : "(null)",
422                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
423           mtev_connection_disable_timeout(nctx);
424           return 0;
425         }
426         else if(ctx->count == 0)
427           ctx->state = JLOG_STREAMER_WANT_CHKPT;
428         else
429           ctx->state = JLOG_STREAMER_WANT_HEADER;
430         break;
431
432       case JLOG_STREAMER_IS_ASYNC:
433         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
434       case JLOG_STREAMER_WANT_CHKPT:
435         mtevL(noit_debug, "Pushing %s checkpoint [%s] [%s]: [%u/%u]\n",
436               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
437               nctx->remote_str ? nctx->remote_str : "(null)",
438               nctx->remote_cn ? nctx->remote_cn : "(null)",
439               ctx->header.chkpt.log, ctx->header.chkpt.marker);
440         n_chkpt.log = htonl(ctx->header.chkpt.log);
441         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
442
443         /* screw short writes.  I'd rather die than not write my data! */
444         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
445                               &mask, e);
446         if(len < 0) {
447           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
448           goto socket_error;
449         }
450         if(len != sizeof(jlog_id)) {
451           mtevL(noit_error, "[%s] [%s] short write on checkpointing stream.\n",
452             nctx->remote_str ? nctx->remote_str : "(null)",
453             nctx->remote_cn ? nctx->remote_cn : "(null)");
454           goto socket_error;
455         }
456         STRATCON_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
457                                         nctx->remote_str, (char *)cn_expected,
458                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
459         ctx->state = JLOG_STREAMER_WANT_COUNT;
460         break;
461     }
462   }
463   /* never get here */
464 }
465
466
467 int
468 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
469   int rv = -1;
470   void *vip;
471   pthread_mutex_lock(&noit_ip_by_cn_lock);
472   if(mtev_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
473     int new_len;
474     char *new_ip = (char *)vip;
475     new_len = strlen(new_ip);
476     strlcpy(ip, new_ip, len);
477     if(new_len >= len) rv = new_len+1;
478     else rv = 0;
479   }
480   pthread_mutex_unlock(&noit_ip_by_cn_lock);
481   return rv;
482 }
483 void
484 stratcon_jlog_streamer_recache_noit() {
485   int di, cnt;
486   mtev_conf_section_t *noit_configs;
487   noit_configs = mtev_conf_get_sections(NULL, "//noits//noit", &cnt);
488   pthread_mutex_lock(&noit_ip_by_cn_lock);
489   mtev_hash_delete_all(&noit_ip_by_cn, free, free);
490   for(di=0; di<cnt; di++) {
491     char address[64];
492     if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
493                                  address, sizeof(address))) {
494       char expected_cn[256];
495       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
496                                  expected_cn, sizeof(expected_cn)))
497         mtev_hash_store(&noit_ip_by_cn,
498                         strdup(expected_cn), strlen(expected_cn),
499                         strdup(address));
500     }
501   }
502   free(noit_configs);
503   pthread_mutex_unlock(&noit_ip_by_cn_lock);
504 }
505 void
506 stratcon_jlog_streamer_reload(const char *toplevel) {
507   /* flush and repopulate the cn cache */
508   stratcon_jlog_streamer_recache_noit();
509   if(!stratcon_datastore_get_enabled()) return;
510   stratcon_streamer_connection(toplevel, NULL, "noit",
511                                stratcon_jlog_recv_handler,
512                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
513                                NULL,
514                                jlog_streamer_ctx_free);
515 }
516
517 char *
518 stratcon_console_noit_opts(mtev_console_closure_t ncct,
519                            mtev_console_state_stack_t *stack,
520                            mtev_console_state_t *dstate,
521                            int argc, char **argv, int idx) {
522   if(argc == 1) {
523     mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
524     const char *key_id;
525     int klen, i = 0;
526     void *vconn, *vcn;
527     mtev_connection_ctx_t *ctx;
528     mtev_hash_table dedup;
529
530     mtev_hash_init(&dedup);
531
532     pthread_mutex_lock(&noits_lock);
533     while(mtev_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
534       ctx = (mtev_connection_ctx_t *)vconn;
535       vcn = NULL;
536       if(ctx->config && mtev_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
537          !mtev_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
538         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
539           if(idx == i) {
540             pthread_mutex_unlock(&noits_lock);
541             mtev_hash_destroy(&dedup, NULL, NULL);
542             return strdup(vcn);
543           }
544           i++;
545         }
546       }
547       if(ctx->remote_str &&
548          !mtev_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
549         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
550           if(idx == i) {
551             pthread_mutex_unlock(&noits_lock);
552             mtev_hash_destroy(&dedup, NULL, NULL);
553             return strdup(ctx->remote_str);
554           }
555           i++;
556         }
557       }
558     }
559     pthread_mutex_unlock(&noits_lock);
560     mtev_hash_destroy(&dedup, NULL, NULL);
561   }
562   if(argc == 2)
563     return mtev_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
564   return NULL;
565 }
566 static int
567 stratcon_console_show_noits(mtev_console_closure_t ncct,
568                             int argc, char **argv,
569                             mtev_console_state_t *dstate,
570                             void *closure) {
571   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
572   const char *key_id, *ecn;
573   int klen, n = 0, i;
574   void *vconn;
575   mtev_connection_ctx_t **ctx;
576
577   if(closure != (void *)0 && argc == 0) {
578     nc_printf(ncct, "takes an argument\n");
579     return 0;
580   }
581   if(closure == (void *)0 && argc > 0) {
582     nc_printf(ncct, "takes no arguments\n");
583     return 0;
584   }
585   pthread_mutex_lock(&noits_lock);
586   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
587   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
588                        &vconn)) {
589     ctx[n] = (mtev_connection_ctx_t *)vconn;
590     if(argc == 0 ||
591        !strcmp(ctx[n]->remote_str, argv[0]) ||
592        (ctx[n]->config && mtev_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
593         !strcmp(ecn, argv[0]))) {
594       mtev_connection_ctx_ref(ctx[n]);
595       n++;
596     }
597   }
598   pthread_mutex_unlock(&noits_lock);
599   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
600   for(i=0; i<n; i++) {
601     nc_print_noit_conn_brief(ncct, ctx[i]);
602     mtev_connection_ctx_deref(ctx[i]);
603   }
604   free(ctx);
605   return 0;
606 }
607
608 static void
609 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
610                        mtev_connection_ctx_t *nctx) {
611   struct timeval last, session_duration, diff;
612   u_int64_t session_duration_ms, last_event_ms;
613   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
614   char str[1024], *wr;
615   int len;
616   const char *cn_expected;
617   const char *feedtype = "unknown";
618
619   GET_FEEDTYPE(nctx, feedtype);
620   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
621
622   GET_EXPECTED_CN(nctx, cn_expected);
623   if(!cn_expected) return;
624
625   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
626            (long unsigned int)now->tv_sec,
627            (long unsigned int)now->tv_usec/1000UL,
628            uuid_str, cn_expected, feedtype);
629   wr = str + strlen(str);
630   len = sizeof(str) - (wr - str);
631
632   /* Now we write NAME TYPE VALUE into wr each time and push it */
633 #define push_noit_m_str(name, value) do { \
634   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
635   stratcon_datastore_push(DS_OP_INSERT, \
636                           (struct sockaddr *)&self_stratcon_ip, \
637                           self_stratcon_hostname, strdup(str), NULL); \
638   stratcon_iep_line_processor(DS_OP_INSERT, \
639                               (struct sockaddr *)&self_stratcon_ip, \
640                               self_stratcon_hostname, strdup(str), NULL); \
641 } while(0)
642 #define push_noit_m_u64(name, value) do { \
643   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
644   stratcon_datastore_push(DS_OP_INSERT, \
645                           (struct sockaddr *)&self_stratcon_ip, \
646                           self_stratcon_hostname, strdup(str), NULL); \
647   stratcon_iep_line_processor(DS_OP_INSERT, \
648                               (struct sockaddr *)&self_stratcon_ip, \
649                               self_stratcon_hostname, strdup(str), NULL); \
650 } while(0)
651
652   last.tv_sec = jctx->header.tv_sec;
653   last.tv_usec = jctx->header.tv_usec;
654   sub_timeval(*now, last, &diff);
655   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
656   sub_timeval(*now, nctx->last_connect, &session_duration);
657   session_duration_ms = session_duration.tv_sec * 1000 +
658                         session_duration.tv_usec / 1000;
659
660   push_noit_m_str("state", nctx->remote_cn ? "connected" :
661                              (nctx->retry_event ? "disconnected" :
662                                                   "connecting"));
663   push_noit_m_u64("last_event_age_ms", last_event_ms);
664   push_noit_m_u64("session_length_ms", session_duration_ms);
665 }
666 static int
667 periodic_noit_metrics(eventer_t e, int mask, void *closure,
668                       struct timeval *now) {
669   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
670   mtev_connection_ctx_t **ctxs;
671   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
672   const char *key_id;
673   void *vconn;
674   int klen, n = 0, i;
675   char str[1024];
676   char uuid_str[1024], tmp_uuid_str[UUID_STR_LEN+1];
677   struct timeval epoch, diff;
678   u_int64_t uptime = 0;
679   char ip_str[128];
680
681   inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
682             sizeof(ip_str));
683
684   uuid_str[0] = '\0';
685   uuid_unparse_lower(self_stratcon_id, tmp_uuid_str);
686   if(stratcon_selfcheck_extended_id) {
687     strlcat(uuid_str, ip_str, sizeof(uuid_str)-37);
688     strlcat(uuid_str, "`selfcheck`selfcheck`", sizeof(uuid_str)-37);
689   }
690   strlcat(uuid_str, tmp_uuid_str, sizeof(uuid_str));
691
692 #define PUSH_BOTH(type, str) do { \
693   stratcon_datastore_push(type, \
694                           (struct sockaddr *)&self_stratcon_ip, \
695                           self_stratcon_hostname, str, NULL); \
696   stratcon_iep_line_processor(type, \
697                               (struct sockaddr *)&self_stratcon_ip, \
698                               self_stratcon_hostname, str, NULL); \
699 } while(0)
700
701   if(closure == NULL) {
702     /* Only do this the first time we get called */
703     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
704              (long unsigned int)now->tv_sec,
705              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
706              self_stratcon_hostname);
707     PUSH_BOTH(DS_OP_INSERT, strdup(str));
708   }
709
710   pthread_mutex_lock(&noits_lock);
711   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
712   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
713                        &vconn)) {
714     ctxs[n] = (mtev_connection_ctx_t *)vconn;
715     mtev_connection_ctx_ref(ctxs[n]);
716     n++;
717   }
718   pthread_mutex_unlock(&noits_lock);
719
720   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
721            (long unsigned int)now->tv_sec,
722            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
723   PUSH_BOTH(DS_OP_INSERT, strdup(str));
724
725   if(eventer_get_epoch(&epoch) != 0)
726     memcpy(&epoch, now, sizeof(epoch));
727   sub_timeval(*now, epoch, &diff);
728   uptime = diff.tv_sec;
729   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
730            (long unsigned int)now->tv_sec,
731            (long unsigned int)now->tv_usec/1000UL,
732            uuid_str, (long long unsigned int)uptime);
733   PUSH_BOTH(DS_OP_INSERT, strdup(str));
734
735   for(i=0; i<n; i++) {
736     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
737     mtev_connection_ctx_deref(ctxs[i]);
738   }
739   free(ctxs);
740   PUSH_BOTH(DS_OP_CHKPT, NULL);
741
742   add_timeval(e->whence, whence, &whence);
743   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
744   return 0;
745 }
746
747 static int
748 rest_show_noits_json(mtev_http_rest_closure_t *restc,
749                      int npats, char **pats) {
750   const char *jsonstr;
751   struct json_object *doc, *nodes, *node;
752   mtev_hash_table seen;
753   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
754   char path[256];
755   const char *key_id;
756   const char *type = NULL, *want_cn = NULL;
757   int klen, n = 0, i, di, cnt;
758   void *vconn;
759   mtev_connection_ctx_t **ctxs;
760   mtev_conf_section_t *noit_configs;
761   struct timeval now, diff, last;
762   mtev_http_request *req = mtev_http_session_request(restc->http_ctx);
763
764   mtev_hash_init(&seen);
765
766   mtev_http_process_querystring(req);
767   type = mtev_http_request_querystring(req, "type");
768   want_cn = mtev_http_request_querystring(req, "cn");
769
770   gettimeofday(&now, NULL);
771
772   pthread_mutex_lock(&noits_lock);
773   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
774   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
775                        &vconn)) {
776     ctxs[n] = (mtev_connection_ctx_t *)vconn;
777     mtev_connection_ctx_ref(ctxs[n]);
778     n++;
779   }
780   pthread_mutex_unlock(&noits_lock);
781   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
782
783   doc = json_object_new_object();
784   nodes = json_object_new_array();
785   json_object_object_add(doc, "nodes", nodes);
786  
787   for(i=0; i<n; i++) {
788     char buff[256];
789     const char *feedtype = "unknown", *state = "unknown";
790     mtev_connection_ctx_t *ctx = ctxs[i];
791     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
792
793     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
794
795     /* If the user requested a specific type and we're not it, skip. */
796     if(type && strcmp(feedtype, type)) {
797         mtev_connection_ctx_deref(ctx);
798         continue;
799     }
800     /* If the user wants a specific CN... limit to that. */
801     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
802         mtev_connection_ctx_deref(ctx);
803         continue;
804     }
805
806     node = json_object_new_object();
807     snprintf(buff, sizeof(buff), "%llu.%06d",
808              (long long unsigned)ctx->last_connect.tv_sec,
809              (int)ctx->last_connect.tv_usec);
810     json_object_object_add(node, "last_connect", json_object_new_string(buff));
811     json_object_object_add(node, "state",
812          json_object_new_string(ctx->remote_cn ?
813                                   "connected" :
814                                   (ctx->retry_event ? "disconnected" :
815                                                       "connecting")));
816     if(ctx->e) {
817       char buff[128];
818       const char *addrstr = NULL;
819       struct sockaddr_in6 addr6;
820       socklen_t len = sizeof(addr6);
821       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
822         unsigned short port = 0;
823         if(addr6.sin6_family == AF_INET) {
824           addrstr = inet_ntop(addr6.sin6_family,
825                               &((struct sockaddr_in *)&addr6)->sin_addr,
826                               buff, sizeof(buff));
827           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
828           port = ntohs(port);
829         }
830         else if(addr6.sin6_family == AF_INET6) {
831           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
832                               buff, sizeof(buff));
833           port = ntohs(addr6.sin6_port);
834         }
835         if(addrstr != NULL) {
836           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
837                    ":%u", port);
838           json_object_object_add(node, "local", json_object_new_string(buff));
839         }
840       }
841     }
842     mtev_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
843                       0, free, NULL);
844     json_object_object_add(node, "remote", json_object_new_string(ctx->remote_str));
845     json_object_object_add(node, "type", json_object_new_string(feedtype));
846     if(ctx->retry_event) {
847       sub_timeval(ctx->retry_event->whence, now, &diff);
848       snprintf(buff, sizeof(buff), "%llu.%06d",
849                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
850       json_object_object_add(node, "next_attempt", json_object_new_string(buff));
851     }
852     else if(ctx->remote_cn) {
853       if(ctx->remote_cn)
854         json_object_object_add(node, "remote_cn", json_object_new_string(ctx->remote_cn));
855  
856       switch(jctx->state) {
857         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
858         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
859         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
860         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
861         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
862         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
863         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
864       }
865       json_object_object_add(node, "state", json_object_new_string(state));
866       snprintf(buff, sizeof(buff), "%08x:%08x",
867                jctx->header.chkpt.log, jctx->header.chkpt.marker);
868       json_object_object_add(node, "checkpoint", json_object_new_string(buff));
869       snprintf(buff, sizeof(buff), "%llu",
870                (unsigned long long)jctx->total_events);
871       json_object_object_add(node, "session_events", json_object_new_string(buff));
872       snprintf(buff, sizeof(buff), "%llu",
873                (unsigned long long)jctx->total_bytes_read);
874       json_object_object_add(node, "session_bytes", json_object_new_string(buff));
875  
876       sub_timeval(now, ctx->last_connect, &diff);
877       snprintf(buff, sizeof(buff), "%lld.%06d",
878                (long long)diff.tv_sec, (int)diff.tv_usec);
879       json_object_object_add(node, "session_duration", json_object_new_string(buff));
880  
881       if(jctx->header.tv_sec) {
882         last.tv_sec = jctx->header.tv_sec;
883         last.tv_usec = jctx->header.tv_usec;
884         snprintf(buff, sizeof(buff), "%llu.%06d",
885                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
886         json_object_object_add(node, "last_event", json_object_new_string(buff));
887         sub_timeval(now, last, &diff);
888         snprintf(buff, sizeof(buff), "%lld.%06d",
889                  (long long)diff.tv_sec, (int)diff.tv_usec);
890         json_object_object_add(node, "last_event_age", json_object_new_string(buff));
891       }
892     }
893     json_object_array_add(nodes, node);
894     mtev_connection_ctx_deref(ctx);
895   }
896   free(ctxs);
897
898   if(!type || !strcmp(type, "configured")) {
899     snprintf(path, sizeof(path), "//noits//noit");
900     noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
901     for(di=0; di<cnt; di++) {
902       char address[64], port_str[32], remote_str[98];
903       char expected_cn_buff[256], *expected_cn = NULL;
904       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
905                                  expected_cn_buff, sizeof(expected_cn_buff)))
906         expected_cn = expected_cn_buff;
907       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
908       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
909                                  address, sizeof(address))) {
910         void *v;
911         if(!mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
912                                    port_str, sizeof(port_str)))
913           strlcpy(port_str, "43191", sizeof(port_str));
914
915         /* If the user wants a specific CN... limit to that. */
916           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
917             continue;
918
919         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
920         if(!mtev_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
921           node = json_object_new_object();
922           json_object_object_add(node, "remote", json_object_new_string(remote_str));
923           json_object_object_add(node, "type", json_object_new_string("configured"));
924           if(expected_cn)
925             json_object_object_add(node, "cn", json_object_new_string(expected_cn));
926           json_object_array_add(nodes, node);
927         }
928       }
929     }
930     free(noit_configs);
931   }
932   mtev_hash_destroy(&seen, free, NULL);
933
934   mtev_http_response_ok(restc->http_ctx, "application/json");
935   jsonstr = json_object_to_json_string(doc);
936   mtev_http_response_append(restc->http_ctx, jsonstr, strlen(jsonstr));
937   mtev_http_response_append(restc->http_ctx, "\n", 1);
938   json_object_put(doc);
939   mtev_http_response_end(restc->http_ctx);
940   return 0;
941 }
942 static int
943 rest_show_noits(mtev_http_rest_closure_t *restc,
944                 int npats, char **pats) {
945   xmlDocPtr doc;
946   xmlNodePtr root;
947   mtev_hash_table *hdrs, seen;
948   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
949   char path[256];
950   const char *key_id, *accepthdr;
951   const char *type = NULL, *want_cn = NULL;
952   int klen, n = 0, i, di, cnt;
953   void *vconn;
954   mtev_connection_ctx_t **ctxs;
955   mtev_conf_section_t *noit_configs;
956   struct timeval now, diff, last;
957   xmlNodePtr node;
958   mtev_http_request *req = mtev_http_session_request(restc->http_ctx);
959
960   mtev_hash_init(&seen);
961
962   if(npats == 1 && !strcmp(pats[0], ".json"))
963     return rest_show_noits_json(restc, npats, pats);
964
965   hdrs = mtev_http_request_headers_table(req);
966   if(mtev_hash_retr_str(hdrs, "accept", strlen("accept"), &accepthdr)) {
967     char buf[256], *brkt, *part;
968     strlcpy(buf, accepthdr, sizeof(buf));
969     for(part = strtok_r(buf, ",", &brkt);
970         part;
971         part = strtok_r(NULL, ",", &brkt)) {
972       while(*part && isspace(*part)) part++;
973       if(!strcmp(part, "application/json")) {
974         return rest_show_noits_json(restc, npats, pats);
975       }
976     }
977   }
978
979   mtev_http_process_querystring(req);
980   type = mtev_http_request_querystring(req, "type");
981   want_cn = mtev_http_request_querystring(req, "cn");
982
983   gettimeofday(&now, NULL);
984
985   pthread_mutex_lock(&noits_lock);
986   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
987   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
988                        &vconn)) {
989     ctxs[n] = (mtev_connection_ctx_t *)vconn;
990     mtev_connection_ctx_ref(ctxs[n]);
991     n++;
992   }
993   pthread_mutex_unlock(&noits_lock);
994   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
995
996   doc = xmlNewDoc((xmlChar *)"1.0");
997   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
998   xmlDocSetRootElement(doc, root);
999
1000   for(i=0; i<n; i++) {
1001     char buff[256];
1002     const char *feedtype = "unknown", *state = "unknown";
1003     mtev_connection_ctx_t *ctx = ctxs[i];
1004     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
1005
1006     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1007
1008     /* If the user requested a specific type and we're not it, skip. */
1009     if(type && strcmp(feedtype, type)) {
1010         mtev_connection_ctx_deref(ctx);
1011         continue;
1012     }
1013     /* If the user wants a specific CN... limit to that. */
1014     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
1015         mtev_connection_ctx_deref(ctx);
1016         continue;
1017     }
1018
1019     node = xmlNewNode(NULL, (xmlChar *)"noit");
1020     snprintf(buff, sizeof(buff), "%llu.%06d",
1021              (long long unsigned)ctx->last_connect.tv_sec,
1022              (int)ctx->last_connect.tv_usec);
1023     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1024     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1025                (xmlChar *)"connected" :
1026                (ctx->retry_event ? (xmlChar *)"disconnected" :
1027                                     (xmlChar *)"connecting"));
1028     if(ctx->e) {
1029       char buff[128];
1030       const char *addrstr = NULL;
1031       struct sockaddr_in6 addr6;
1032       socklen_t len = sizeof(addr6);
1033       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1034         unsigned short port = 0;
1035         if(addr6.sin6_family == AF_INET) {
1036           addrstr = inet_ntop(addr6.sin6_family,
1037                               &((struct sockaddr_in *)&addr6)->sin_addr,
1038                               buff, sizeof(buff));
1039           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
1040           port = ntohs(port);
1041         }
1042         else if(addr6.sin6_family == AF_INET6) {
1043           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1044                               buff, sizeof(buff));
1045           port = ntohs(addr6.sin6_port);
1046         }
1047         if(addrstr != NULL) {
1048           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1049                    ":%u", port);
1050           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1051         }
1052       }
1053     }
1054     mtev_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1055                       0, free, NULL);
1056     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1057     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1058     if(ctx->retry_event) {
1059       sub_timeval(ctx->retry_event->whence, now, &diff);
1060       snprintf(buff, sizeof(buff), "%llu.%06d",
1061                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1062       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1063     }
1064     else if(ctx->remote_cn) {
1065       if(ctx->remote_cn)
1066         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1067  
1068       switch(jctx->state) {
1069         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1070         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1071         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1072         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1073         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1074         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1075         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1076       }
1077       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1078       snprintf(buff, sizeof(buff), "%08x:%08x",
1079                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1080       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1081       snprintf(buff, sizeof(buff), "%llu",
1082                (unsigned long long)jctx->total_events);
1083       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1084       snprintf(buff, sizeof(buff), "%llu",
1085                (unsigned long long)jctx->total_bytes_read);
1086       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1087  
1088       sub_timeval(now, ctx->last_connect, &diff);
1089       snprintf(buff, sizeof(buff), "%lld.%06d",
1090                (long long)diff.tv_sec, (int)diff.tv_usec);
1091       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1092  
1093       if(jctx->header.tv_sec) {
1094         last.tv_sec = jctx->header.tv_sec;
1095         last.tv_usec = jctx->header.tv_usec;
1096         snprintf(buff, sizeof(buff), "%llu.%06d",
1097                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1098         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1099         sub_timeval(now, last, &diff);
1100         snprintf(buff, sizeof(buff), "%lld.%06d",
1101                  (long long)diff.tv_sec, (int)diff.tv_usec);
1102         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1103       }
1104     }
1105
1106     xmlAddChild(root, node);
1107     mtev_connection_ctx_deref(ctx);
1108   }
1109   free(ctxs);
1110
1111   if(!type || !strcmp(type, "configured")) {
1112     snprintf(path, sizeof(path), "//noits//noit");
1113     noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1114     for(di=0; di<cnt; di++) {
1115       char address[64], port_str[32], remote_str[98];
1116       char expected_cn_buff[256], *expected_cn = NULL;
1117       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1118                                  expected_cn_buff, sizeof(expected_cn_buff)))
1119         expected_cn = expected_cn_buff;
1120       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1121       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1122                                  address, sizeof(address))) {
1123         void *v;
1124         if(!mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1125                                    port_str, sizeof(port_str)))
1126           strlcpy(port_str, "43191", sizeof(port_str));
1127
1128         /* If the user wants a specific CN... limit to that. */
1129           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1130             continue;
1131
1132         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1133         if(!mtev_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1134           node = xmlNewNode(NULL, (xmlChar *)"noit");
1135           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1136           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1137           if(expected_cn)
1138             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1139           xmlAddChild(root, node);
1140         }
1141       }
1142     }
1143     free(noit_configs);
1144   }
1145   mtev_hash_destroy(&seen, free, NULL);
1146
1147   mtev_http_response_ok(restc->http_ctx, "text/xml");
1148   mtev_http_response_xml(restc->http_ctx, doc);
1149   mtev_http_response_end(restc->http_ctx);
1150   xmlFreeDoc(doc);
1151   return 0;
1152 }
1153 static int
1154 stratcon_add_noit(const char *target, unsigned short port,
1155                   const char *cn) {
1156   int cnt;
1157   char path[256];
1158   char port_str[6];
1159   mtev_conf_section_t *noit_configs, parent;
1160   xmlNodePtr newnoit, config, cnnode;
1161
1162   snprintf(path, sizeof(path),
1163            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1164   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1165   free(noit_configs);
1166   if(cnt != 0) return -1;
1167
1168   parent = mtev_conf_get_section(NULL, "//noits//include//noits");
1169   if(!parent) parent = mtev_conf_get_section(NULL, "//noits");
1170   if(!parent) return -1;
1171   snprintf(port_str, sizeof(port_str), "%d", port);
1172   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1173   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1174   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1175   xmlAddChild(parent, newnoit);
1176   if(cn) {
1177     config = xmlNewNode(NULL, (xmlChar *)"config");
1178     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1179     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1180     xmlAddChild(config, cnnode);
1181     xmlAddChild(newnoit, config);
1182     pthread_mutex_lock(&noit_ip_by_cn_lock);
1183     mtev_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1184                       strdup(target), free, free);
1185     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1186   }
1187   if(stratcon_datastore_get_enabled())
1188     stratcon_streamer_connection(NULL, target, "noit",
1189                                  stratcon_jlog_recv_handler,
1190                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1191                                  NULL,
1192                                  jlog_streamer_ctx_free);
1193   if(stratcon_iep_get_enabled())
1194     stratcon_streamer_connection(NULL, target, "noit",
1195                                  stratcon_jlog_recv_handler,
1196                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1197                                  NULL,
1198                                  jlog_streamer_ctx_free);
1199   return 1;
1200 }
1201 static int
1202 stratcon_remove_noit(const char *target, unsigned short port, const char *cn) {
1203   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
1204   const char *key_id;
1205   int klen, n = -1, i, cnt = 0;
1206   void *vconn;
1207   mtev_connection_ctx_t **ctx;
1208   mtev_conf_section_t *noit_configs;
1209   char path[256];
1210   char remote_str[256];
1211
1212   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1213
1214   snprintf(path, sizeof(path),
1215            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1216   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1217   for(i=0; i<cnt; i++) {
1218     char expected_cn[256];
1219     if(mtev_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1220                                expected_cn, sizeof(expected_cn))) {
1221       if(!cn || !strcmp(cn, expected_cn)) {
1222         pthread_mutex_lock(&noit_ip_by_cn_lock);
1223         mtev_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1224                          free, free);
1225         pthread_mutex_unlock(&noit_ip_by_cn_lock);
1226       }
1227     }
1228     else if(cn) continue;
1229     CONF_REMOVE(noit_configs[i]);
1230     xmlUnlinkNode(noit_configs[i]);
1231     xmlFreeNode(noit_configs[i]);
1232     n = 0;
1233   }
1234   free(noit_configs);
1235
1236   pthread_mutex_lock(&noits_lock);
1237   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
1238   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
1239                        &vconn)) {
1240     if(!strcmp(((mtev_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1241       ctx[n] = (mtev_connection_ctx_t *)vconn;
1242       mtev_connection_ctx_ref(ctx[n]);
1243       n++;
1244     }
1245   }
1246   pthread_mutex_unlock(&noits_lock);
1247   for(i=0; i<n; i++) {
1248     mtev_connection_ctx_dealloc(ctx[i]); /* once for the record */
1249     mtev_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1250   }
1251   free(ctx);
1252   return n;
1253 }
1254 static int
1255 rest_set_noit(mtev_http_rest_closure_t *restc,
1256               int npats, char **pats) {
1257   const char *cn = NULL;
1258   mtev_http_session_ctx *ctx = restc->http_ctx;
1259   mtev_http_request *req = mtev_http_session_request(ctx);
1260   unsigned short port = 43191;
1261   if(npats < 1 || npats > 2)
1262     mtev_http_response_server_error(ctx, "text/xml");
1263   if(npats == 2) port = atoi(pats[1]);
1264   mtev_http_process_querystring(req);
1265   cn = mtev_http_request_querystring(req, "cn");
1266   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1267     mtev_http_response_ok(ctx, "text/xml");
1268   else
1269     mtev_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1270   if(mtev_conf_write_file(NULL) != 0)
1271     mtevL(noit_error, "local config write failed\n");
1272   mtev_conf_mark_changed();
1273   mtev_http_response_end(ctx);
1274   return 0;
1275 }
1276 static int
1277 rest_delete_noit(mtev_http_rest_closure_t *restc,
1278                  int npats, char **pats) {
1279   mtev_http_session_ctx *ctx = restc->http_ctx;
1280   mtev_http_request *req = mtev_http_session_request(ctx);
1281   unsigned short port = 43191;
1282   if(npats < 1 || npats > 2)
1283     mtev_http_response_server_error(ctx, "text/xml");
1284   if(npats == 2) port = atoi(pats[1]);
1285
1286   const char *want_cn = mtev_http_request_querystring(req, "cn");
1287   if(stratcon_remove_noit(pats[0], port, want_cn) >= 0)
1288     mtev_http_response_ok(ctx, "text/xml");
1289   else
1290     mtev_http_response_not_found(ctx, "text/xml");
1291   if(mtev_conf_write_file(NULL) != 0)
1292     mtevL(noit_error, "local config write failed\n");
1293   mtev_conf_mark_changed();
1294   mtev_http_response_end(ctx);
1295   return 0;
1296 }
1297 static int
1298 stratcon_console_conf_noits(mtev_console_closure_t ncct,
1299                             int argc, char **argv,
1300                             mtev_console_state_t *dstate,
1301                             void *closure) {
1302   char *cp, target[128];
1303   unsigned short port = 43191;
1304   int adding = (int)(vpsized_int)closure;
1305   char *cn = NULL;
1306   if(argc != 1 && argc != 2)
1307     return -1;
1308
1309   if(argc == 2) cn = argv[1];
1310
1311   cp = strchr(argv[0], ':');
1312   if(cp) {
1313     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1314     port = atoi(cp+1);
1315   }
1316   else strlcpy(target, argv[0], sizeof(target));
1317   if(adding) {
1318     if(stratcon_add_noit(target, port, cn) >= 0) {
1319       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1320     }
1321     else {
1322       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1323     }
1324   }
1325   else {
1326     if(stratcon_remove_noit(target, port, cn) >= 0) {
1327       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1328     }
1329     else {
1330       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1331     }
1332   }
1333   return 0;
1334 }
1335
1336 static void
1337 register_console_streamer_commands() {
1338   mtev_console_state_t *tl;
1339   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1340
1341   tl = mtev_console_state_initial();
1342   showcmd = mtev_console_state_get_cmd(tl, "show");
1343   mtevAssert(showcmd && showcmd->dstate);
1344   confcmd = mtev_console_state_get_cmd(tl, "configure");
1345   conftcmd = mtev_console_state_get_cmd(confcmd->dstate, "terminal");
1346   conftnocmd = mtev_console_state_get_cmd(conftcmd->dstate, "no");
1347
1348   mtev_console_state_add_cmd(conftcmd->dstate,
1349     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1350   mtev_console_state_add_cmd(conftnocmd->dstate,
1351     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1352
1353   mtev_console_state_add_cmd(showcmd->dstate,
1354     NCSCMD("noit", stratcon_console_show_noits,
1355            stratcon_console_noit_opts, NULL, (void *)1));
1356   mtev_console_state_add_cmd(showcmd->dstate,
1357     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1358 }
1359
1360 int
1361 stratcon_streamer_connection(const char *toplevel, const char *destination,
1362                              const char *type,
1363                              eventer_func_t handler,
1364                              void *(*handler_alloc)(void), void *handler_ctx,
1365                              void (*handler_free)(void *)) {
1366   return mtev_connections_from_config(&noits, &noits_lock,
1367                                       toplevel, destination, type,
1368                                       handler, handler_alloc, handler_ctx,
1369                                       handler_free);
1370 }
1371
1372 mtev_reverse_acl_decision_t
1373 mtev_reverse_socket_allow_noits(const char *id, acceptor_closure_t *ac) {
1374   if(!strncmp(id, "noit/", 5)) return MTEV_ACL_ALLOW;
1375   return MTEV_ACL_ABSTAIN;
1376 }
1377
1378 void
1379 stratcon_jlog_streamer_init(const char *toplevel) {
1380   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1381   struct in_addr remote;
1382   char uuid_str[UUID_STR_LEN + 1];
1383
1384   mtev_reverse_socket_acl(mtev_reverse_socket_allow_noits);
1385   pthread_mutex_init(&noits_lock, NULL);
1386   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1387   eventer_name_callback("stratcon_jlog_recv_handler",
1388                         stratcon_jlog_recv_handler);
1389   register_console_streamer_commands();
1390   stratcon_jlog_streamer_reload(toplevel);
1391   stratcon_streamer_connection(toplevel, "", "noit", NULL, NULL, NULL, NULL);
1392   mtevAssert(mtev_http_rest_register_auth(
1393     "GET", "/noits/", "^show(.json)?$", rest_show_noits,
1394              mtev_http_rest_client_cert_auth
1395   ) == 0);
1396   mtevAssert(mtev_http_rest_register_auth(
1397     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1398              mtev_http_rest_client_cert_auth
1399   ) == 0);
1400   mtevAssert(mtev_http_rest_register_auth(
1401     "PUT", "/noits/", "^set/([^/:]*):(\\d+)$", rest_set_noit,
1402              mtev_http_rest_client_cert_auth
1403   ) == 0);
1404   mtevAssert(mtev_http_rest_register_auth(
1405     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1406              mtev_http_rest_client_cert_auth
1407   ) == 0);
1408   mtevAssert(mtev_http_rest_register_auth(
1409     "DELETE", "/noits/", "^delete/([^/:]*):(\\d+)$", rest_delete_noit,
1410              mtev_http_rest_client_cert_auth
1411   ) == 0);
1412
1413   uuid_clear(self_stratcon_id);
1414
1415   if(mtev_conf_get_stringbuf(NULL, "/stratcon/@id",
1416                              uuid_str, sizeof(uuid_str)) &&
1417      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1418     int period;
1419     mtev_conf_get_boolean(NULL, "/stratcon/@extended_id",
1420                           &stratcon_selfcheck_extended_id);
1421     /* If a UUID was provided for stratcon itself, we will report metrics
1422      * on a large variety of things (including all noits).
1423      */
1424     if(mtev_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1425        period > 0) {
1426       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1427       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1428     }
1429     self_stratcon_ip.sin_family = AF_INET;
1430     remote.s_addr = 0xffffffff;
1431     mtev_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1432     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1433     eventer_add_in(periodic_noit_metrics, NULL, whence);
1434   }
1435 }
1436
1437 void
1438 stratcon_jlog_streamer_init_globals(void) {
1439   mtev_hash_init(&noits);
1440   mtev_hash_init(&noit_ip_by_cn);
1441 }
1442
Note: See TracBrowser for help on using the browser.