root/src/stratcon_jlog_streamer.c

Revision 43ac70be51d871d281297049198be8674935b689, 53.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 months ago)

Prefer an included noits section as it changes.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <unistd.h>
37 #include <assert.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #ifdef HAVE_SYS_FILIO_H
42 #include <sys/filio.h>
43 #endif
44 #include <netinet/in.h>
45 #include <sys/un.h>
46 #include <arpa/inet.h>
47
48 #include <eventer/eventer.h>
49 #include <mtev_conf.h>
50 #include <mtev_hash.h>
51 #include <mtev_log.h>
52 #include <mtev_getip.h>
53 #include <mtev_rest.h>
54 #include <mtev_json.h>
55
56 #include "noit_mtev_bridge.h"
57 #include "stratcon_dtrace_probes.h"
58 #include "noit_jlog_listener.h"
59 #include "stratcon_datastore.h"
60 #include "stratcon_jlog_streamer.h"
61 #include "stratcon_iep.h"
62
63 pthread_mutex_t noits_lock;
64 mtev_hash_table noits = MTEV_HASH_EMPTY;
65 pthread_mutex_t noit_ip_by_cn_lock;
66 mtev_hash_table noit_ip_by_cn = MTEV_HASH_EMPTY;
67 static uuid_t self_stratcon_id;
68 static char self_stratcon_hostname[256] = "\0";
69 static struct sockaddr_in self_stratcon_ip;
70 static mtev_boolean stratcon_selfcheck_extended_id = mtev_true;
71
72 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
73
74 static const char *feed_type_to_str(int jlog_feed_cmd) {
75   switch(jlog_feed_cmd) {
76     case NOIT_JLOG_DATA_FEED: return "durable/storage";
77     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
78   }
79   return "unknown";
80 }
81
82 #define GET_EXPECTED_CN(nctx, cn) do { \
83   void *vcn; \
84   cn = NULL; \
85   if(nctx->config && \
86      mtev_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
87      cn = vcn; \
88   } \
89 } while(0)
90 #define GET_FEEDTYPE(nctx, feedtype) do { \
91   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
92   feedtype = "unknown"; \
93   if(_jctx->push == stratcon_datastore_push) \
94     feedtype = "storage"; \
95   else if(_jctx->push == stratcon_iep_line_processor) \
96     feedtype = "iep"; \
97 } while(0)
98
99 static int
100 remote_str_sort(const void *a, const void *b) {
101   int rv;
102   mtev_connection_ctx_t * const *actx = a;
103   mtev_connection_ctx_t * const *bctx = b;
104   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
105   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
106   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
107   if(rv) return rv;
108   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
109            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
110 }
111 static void
112 nc_print_noit_conn_brief(mtev_console_closure_t ncct,
113                           mtev_connection_ctx_t *ctx) {
114   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
115   struct timeval now, diff, session_duration;
116   char cmdbuf[4096];
117   const char *feedtype = "unknown";
118   const char *lasttime = "never";
119   if(ctx->last_connect.tv_sec != 0) {
120     time_t r = ctx->last_connect.tv_sec;
121     struct tm tbuf, *tm;
122     tm = gmtime_r(&r, &tbuf);
123     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
124     lasttime = cmdbuf;
125   }
126   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
127             ctx->remote_cn ? "connected" :
128                              (ctx->retry_event ? "disconnected" :
129                                                    "connecting"), lasttime);
130   if(ctx->e) {
131     char buff[128];
132     const char *addrstr = NULL;
133     struct sockaddr_in6 addr6;
134     socklen_t len = sizeof(addr6);
135     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
136       unsigned short port = 0;
137       if(addr6.sin6_family == AF_INET) {
138         addrstr = inet_ntop(addr6.sin6_family,
139                             &((struct sockaddr_in *)&addr6)->sin_addr,
140                             buff, sizeof(buff));
141         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
142         port = ntohs(port);
143       }
144       else if(addr6.sin6_family == AF_INET6) {
145         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
146                             buff, sizeof(buff));
147         port = ntohs(addr6.sin6_port);
148       }
149       if(addrstr != NULL)
150         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
151       else
152         nc_printf(ncct, "\tLocal address not interpretable\n");
153     }
154     else {
155       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
156                 ctx->e->fd, strerror(errno));
157     }
158   }
159   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
160   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
161   gettimeofday(&now, NULL);
162   if(ctx->timeout_event) {
163     sub_timeval(ctx->timeout_event->whence, now, &diff);
164     nc_printf(ncct, "\tTimeout scheduled for %lld.%06us\n",
165               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
166   }
167   if(ctx->retry_event) {
168     sub_timeval(ctx->retry_event->whence, now, &diff);
169     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
170               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
171   }
172   else if(ctx->remote_cn) {
173     nc_printf(ncct, "\tRemote CN: '%s'\n",
174               ctx->remote_cn ? ctx->remote_cn : "???");
175     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
176       struct timeval last;
177       double session_duration_seconds;
178       const char *state = "unknown";
179
180       switch(jctx->state) {
181         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
182         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
183         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
184         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
185         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
186         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
187         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
188       }
189       last.tv_sec = jctx->header.tv_sec;
190       last.tv_usec = jctx->header.tv_usec;
191       sub_timeval(now, last, &diff);
192       sub_timeval(now, ctx->last_connect, &session_duration);
193       session_duration_seconds = session_duration.tv_sec +
194                                  (double)session_duration.tv_usec/1000000.0;
195       nc_printf(ncct, "\tState: %s\n"
196                       "\tNext checkpoint: [%08x:%08x]\n"
197                       "\tLast event: %lld.%06us ago\n"
198                       "\tEvents this session: %llu (%0.2f/s)\n"
199                       "\tOctets this session: %llu (%0.2f/s)\n",
200                 state,
201                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
202                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
203                 jctx->total_events,
204                 (double)jctx->total_events/session_duration_seconds,
205                 jctx->total_bytes_read,
206                 (double)jctx->total_bytes_read/session_duration_seconds);
207     }
208     else {
209       nc_printf(ncct, "\tUnknown type.\n");
210     }
211   }
212 }
213
214 jlog_streamer_ctx_t *
215 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
216   jlog_streamer_ctx_t *ctx;
217   ctx = stratcon_jlog_streamer_ctx_alloc();
218   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
219   ctx->push = stratcon_datastore_push;
220   return ctx;
221 }
222 jlog_streamer_ctx_t *
223 stratcon_jlog_streamer_ctx_alloc(void) {
224   jlog_streamer_ctx_t *ctx;
225   ctx = calloc(1, sizeof(*ctx));
226   return ctx;
227 }
228
229 void
230 jlog_streamer_ctx_free(void *cl) {
231   jlog_streamer_ctx_t *ctx = cl;
232   if(ctx->buffer) free(ctx->buffer);
233   free(ctx);
234 }
235
236 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
237 static int
238 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
239   int len, mask;
240   while(ctx->bytes_read < ctx->bytes_expected) {
241     len = Eread(ctx->buffer + ctx->bytes_read,
242                 ctx->bytes_expected - ctx->bytes_read);
243     if(len < 0) {
244       *newmask = mask;
245       return -1;
246     }
247     /* if we get 0 inside SSL, and there was a real error, we
248      * will actually get a -1 here.
249      * if(len == 0) return ctx->bytes_read;
250      */
251     ctx->total_bytes_read += len;
252     ctx->bytes_read += len;
253   }
254   assert(ctx->bytes_read == ctx->bytes_expected);
255   return ctx->bytes_read;
256 }
257 #define FULLREAD(e,ctx,size) do { \
258   int mask, len; \
259   if(!ctx->bytes_expected) { \
260     ctx->bytes_expected = size; \
261     if(ctx->buffer) free(ctx->buffer); \
262     ctx->buffer = malloc(size + 1); \
263     if(ctx->buffer == NULL) { \
264       mtevL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
265       goto socket_error; \
266     } \
267     ctx->buffer[size] = '\0'; \
268   } \
269   len = __read_on_ctx(e, ctx, &mask); \
270   if(len < 0) { \
271     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
272     mtevL(noit_error, "[%s] [%s] SSL read error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)", \
273           nctx->remote_cn ? nctx->remote_cn : "(null)", \
274           strerror(errno)); \
275     goto socket_error; \
276   } \
277   ctx->bytes_read = 0; \
278   ctx->bytes_expected = 0; \
279   if(len != size) { \
280     mtevL(noit_error, "[%s] [%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
281           nctx->remote_str ? nctx->remote_str : "(null)", \
282           nctx->remote_cn ? nctx->remote_cn : "(null)", \
283           ctx->state, len, (long unsigned int)size); \
284     goto socket_error; \
285   } \
286 } while(0)
287
288 int
289 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
290                            struct timeval *now) {
291   mtev_connection_ctx_t *nctx = closure;
292   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
293   jlog_streamer_ctx_t dummy;
294   int len;
295   jlog_id n_chkpt;
296   const char *cn_expected, *feedtype;
297   GET_EXPECTED_CN(nctx, cn_expected);
298   GET_FEEDTYPE(nctx, feedtype);
299
300   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
301     if(write(e->fd, e, 0) == -1)
302       mtevL(noit_error, "[%s] [%s] socket error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)",
303             nctx->remote_cn ? nctx->remote_cn : "(null)", strerror(errno));
304  socket_error:
305     ctx->state = JLOG_STREAMER_WANT_INITIATE;
306     ctx->count = 0;
307     ctx->needs_chkpt = 0;
308     ctx->bytes_read = 0;
309     ctx->bytes_expected = 0;
310     if(ctx->buffer) free(ctx->buffer);
311     ctx->buffer = NULL;
312     nctx->schedule_reattempt(nctx, now);
313     nctx->close(nctx, e);
314     return 0;
315   }
316
317   mtev_connection_update_timeout(nctx);
318   while(1) {
319     switch(ctx->state) {
320       case JLOG_STREAMER_WANT_INITIATE:
321         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
322                               sizeof(ctx->jlog_feed_cmd),
323                               &mask, e);
324         if(len < 0) {
325           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
326           goto socket_error;
327         }
328         if(len != sizeof(ctx->jlog_feed_cmd)) {
329           mtevL(noit_error, "[%s] [%s] short write [%d/%d] on initiating stream.\n",
330                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)",
331                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
332           goto socket_error;
333         }
334         ctx->state = JLOG_STREAMER_WANT_COUNT;
335         break;
336
337       case JLOG_STREAMER_WANT_ERROR:
338         FULLREAD(e, ctx, 0 - ctx->count);
339         mtevL(noit_error, "[%s] [%s] %.*s\n", nctx->remote_str ? nctx->remote_str : "(null)",
340               nctx->remote_cn ? nctx->remote_cn : "(null)", 0 - ctx->count, ctx->buffer);
341         free(ctx->buffer); ctx->buffer = NULL;
342         goto socket_error;
343         break;
344
345       case JLOG_STREAMER_WANT_COUNT:
346         FULLREAD(e, ctx, sizeof(u_int32_t));
347         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
348         ctx->count = ntohl(dummy.count);
349         ctx->needs_chkpt = 0;
350         free(ctx->buffer); ctx->buffer = NULL;
351         STRATCON_STREAM_COUNT(e->fd, (char *)feedtype,
352                                    nctx->remote_str, (char *)cn_expected,
353                                    ctx->count);
354         if(ctx->count < 0)
355           ctx->state = JLOG_STREAMER_WANT_ERROR;
356         else
357           ctx->state = JLOG_STREAMER_WANT_HEADER;
358         break;
359
360       case JLOG_STREAMER_WANT_HEADER:
361         if(ctx->count == 0) {
362           ctx->state = JLOG_STREAMER_WANT_COUNT;
363           break;
364         }
365         FULLREAD(e, ctx, sizeof(ctx->header));
366         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
367         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
368         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
369         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
370         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
371         ctx->header.message_len = ntohl(dummy.header.message_len);
372         STRATCON_STREAM_HEADER(e->fd, (char *)feedtype,
373                                     nctx->remote_str, (char *)cn_expected,
374                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
375                                     ctx->header.tv_sec, ctx->header.tv_usec,
376                                     ctx->header.message_len);
377         free(ctx->buffer); ctx->buffer = NULL;
378         ctx->state = JLOG_STREAMER_WANT_BODY;
379         break;
380
381       case JLOG_STREAMER_WANT_BODY:
382         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
383         STRATCON_STREAM_BODY(e->fd, (char *)feedtype,
384                                   nctx->remote_str, (char *)cn_expected,
385                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
386                                   ctx->header.tv_sec, ctx->header.tv_usec,
387                                   ctx->buffer);
388         if(ctx->header.message_len > 0) {
389           ctx->needs_chkpt = 1;
390           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
391                     ctx->buffer, NULL);
392         }
393         else if(ctx->buffer)
394           free(ctx->buffer);
395         /* Don't free the buffer, it's used by the datastore process. */
396         ctx->buffer = NULL;
397         ctx->count--;
398         ctx->total_events++;
399         if(ctx->count == 0 && ctx->needs_chkpt) {
400           eventer_t completion_e;
401           eventer_remove_fd(e->fd);
402           completion_e = eventer_alloc();
403           memcpy(completion_e, e, sizeof(*e));
404           nctx->e = completion_e;
405           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
406           ctx->state = JLOG_STREAMER_IS_ASYNC;
407           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
408                     NULL, completion_e);
409           mtevL(noit_debug, "Pushing %s batch async [%s] [%s]: [%u/%u]\n",
410                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
411                 nctx->remote_str ? nctx->remote_str : "(null)",
412                 nctx->remote_cn ? nctx->remote_cn : "(null)",
413                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
414           mtev_connection_disable_timeout(nctx);
415           return 0;
416         }
417         else if(ctx->count == 0)
418           ctx->state = JLOG_STREAMER_WANT_CHKPT;
419         else
420           ctx->state = JLOG_STREAMER_WANT_HEADER;
421         break;
422
423       case JLOG_STREAMER_IS_ASYNC:
424         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
425       case JLOG_STREAMER_WANT_CHKPT:
426         mtevL(noit_debug, "Pushing %s checkpoint [%s] [%s]: [%u/%u]\n",
427               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
428               nctx->remote_str ? nctx->remote_str : "(null)",
429               nctx->remote_cn ? nctx->remote_cn : "(null)",
430               ctx->header.chkpt.log, ctx->header.chkpt.marker);
431         n_chkpt.log = htonl(ctx->header.chkpt.log);
432         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
433
434         /* screw short writes.  I'd rather die than not write my data! */
435         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
436                               &mask, e);
437         if(len < 0) {
438           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
439           goto socket_error;
440         }
441         if(len != sizeof(jlog_id)) {
442           mtevL(noit_error, "[%s] [%s] short write on checkpointing stream.\n",
443             nctx->remote_str ? nctx->remote_str : "(null)",
444             nctx->remote_cn ? nctx->remote_cn : "(null)");
445           goto socket_error;
446         }
447         STRATCON_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
448                                         nctx->remote_str, (char *)cn_expected,
449                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
450         ctx->state = JLOG_STREAMER_WANT_COUNT;
451         break;
452     }
453   }
454   /* never get here */
455 }
456
457
458 int
459 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
460   int rv = -1;
461   void *vip;
462   pthread_mutex_lock(&noit_ip_by_cn_lock);
463   if(mtev_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
464     int new_len;
465     char *new_ip = (char *)vip;
466     new_len = strlen(new_ip);
467     strlcpy(ip, new_ip, len);
468     if(new_len >= len) rv = new_len+1;
469     else rv = 0;
470   }
471   pthread_mutex_unlock(&noit_ip_by_cn_lock);
472   return rv;
473 }
474 void
475 stratcon_jlog_streamer_recache_noit() {
476   int di, cnt;
477   mtev_conf_section_t *noit_configs;
478   noit_configs = mtev_conf_get_sections(NULL, "//noits//noit", &cnt);
479   pthread_mutex_lock(&noit_ip_by_cn_lock);
480   mtev_hash_delete_all(&noit_ip_by_cn, free, free);
481   for(di=0; di<cnt; di++) {
482     char address[64];
483     if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
484                                  address, sizeof(address))) {
485       char expected_cn[256];
486       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
487                                  expected_cn, sizeof(expected_cn)))
488         mtev_hash_store(&noit_ip_by_cn,
489                         strdup(expected_cn), strlen(expected_cn),
490                         strdup(address));
491     }
492   }
493   free(noit_configs);
494   pthread_mutex_unlock(&noit_ip_by_cn_lock);
495 }
496 void
497 stratcon_jlog_streamer_reload(const char *toplevel) {
498   /* flush and repopulate the cn cache */
499   stratcon_jlog_streamer_recache_noit();
500   if(!stratcon_datastore_get_enabled()) return;
501   stratcon_streamer_connection(toplevel, NULL, "noit",
502                                stratcon_jlog_recv_handler,
503                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
504                                NULL,
505                                jlog_streamer_ctx_free);
506 }
507
508 char *
509 stratcon_console_noit_opts(mtev_console_closure_t ncct,
510                            mtev_console_state_stack_t *stack,
511                            mtev_console_state_t *dstate,
512                            int argc, char **argv, int idx) {
513   if(argc == 1) {
514     mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
515     const char *key_id;
516     int klen, i = 0;
517     void *vconn, *vcn;
518     mtev_connection_ctx_t *ctx;
519     mtev_hash_table dedup = MTEV_HASH_EMPTY;
520
521     pthread_mutex_lock(&noits_lock);
522     while(mtev_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
523       ctx = (mtev_connection_ctx_t *)vconn;
524       vcn = NULL;
525       if(ctx->config && mtev_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
526          !mtev_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
527         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
528           if(idx == i) {
529             pthread_mutex_unlock(&noits_lock);
530             mtev_hash_destroy(&dedup, NULL, NULL);
531             return strdup(vcn);
532           }
533           i++;
534         }
535       }
536       if(ctx->remote_str &&
537          !mtev_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
538         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
539           if(idx == i) {
540             pthread_mutex_unlock(&noits_lock);
541             mtev_hash_destroy(&dedup, NULL, NULL);
542             return strdup(ctx->remote_str);
543           }
544           i++;
545         }
546       }
547     }
548     pthread_mutex_unlock(&noits_lock);
549     mtev_hash_destroy(&dedup, NULL, NULL);
550   }
551   if(argc == 2)
552     return mtev_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
553   return NULL;
554 }
555 static int
556 stratcon_console_show_noits(mtev_console_closure_t ncct,
557                             int argc, char **argv,
558                             mtev_console_state_t *dstate,
559                             void *closure) {
560   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
561   const char *key_id, *ecn;
562   int klen, n = 0, i;
563   void *vconn;
564   mtev_connection_ctx_t **ctx;
565
566   if(closure != (void *)0 && argc == 0) {
567     nc_printf(ncct, "takes an argument\n");
568     return 0;
569   }
570   if(closure == (void *)0 && argc > 0) {
571     nc_printf(ncct, "takes no arguments\n");
572     return 0;
573   }
574   pthread_mutex_lock(&noits_lock);
575   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
576   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
577                        &vconn)) {
578     ctx[n] = (mtev_connection_ctx_t *)vconn;
579     if(argc == 0 ||
580        !strcmp(ctx[n]->remote_str, argv[0]) ||
581        (ctx[n]->config && mtev_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
582         !strcmp(ecn, argv[0]))) {
583       mtev_connection_ctx_ref(ctx[n]);
584       n++;
585     }
586   }
587   pthread_mutex_unlock(&noits_lock);
588   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
589   for(i=0; i<n; i++) {
590     nc_print_noit_conn_brief(ncct, ctx[i]);
591     mtev_connection_ctx_deref(ctx[i]);
592   }
593   free(ctx);
594   return 0;
595 }
596
597 static void
598 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
599                        mtev_connection_ctx_t *nctx) {
600   struct timeval last, session_duration, diff;
601   u_int64_t session_duration_ms, last_event_ms;
602   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
603   char str[1024], *wr;
604   int len;
605   const char *cn_expected;
606   const char *feedtype = "unknown";
607
608   GET_FEEDTYPE(nctx, feedtype);
609   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
610
611   GET_EXPECTED_CN(nctx, cn_expected);
612   if(!cn_expected) return;
613
614   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
615            (long unsigned int)now->tv_sec,
616            (long unsigned int)now->tv_usec/1000UL,
617            uuid_str, cn_expected, feedtype);
618   wr = str + strlen(str);
619   len = sizeof(str) - (wr - str);
620
621   /* Now we write NAME TYPE VALUE into wr each time and push it */
622 #define push_noit_m_str(name, value) do { \
623   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
624   stratcon_datastore_push(DS_OP_INSERT, \
625                           (struct sockaddr *)&self_stratcon_ip, \
626                           self_stratcon_hostname, strdup(str), NULL); \
627   stratcon_iep_line_processor(DS_OP_INSERT, \
628                               (struct sockaddr *)&self_stratcon_ip, \
629                               self_stratcon_hostname, strdup(str), NULL); \
630 } while(0)
631 #define push_noit_m_u64(name, value) do { \
632   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
633   stratcon_datastore_push(DS_OP_INSERT, \
634                           (struct sockaddr *)&self_stratcon_ip, \
635                           self_stratcon_hostname, strdup(str), NULL); \
636   stratcon_iep_line_processor(DS_OP_INSERT, \
637                               (struct sockaddr *)&self_stratcon_ip, \
638                               self_stratcon_hostname, strdup(str), NULL); \
639 } while(0)
640
641   last.tv_sec = jctx->header.tv_sec;
642   last.tv_usec = jctx->header.tv_usec;
643   sub_timeval(*now, last, &diff);
644   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
645   sub_timeval(*now, nctx->last_connect, &session_duration);
646   session_duration_ms = session_duration.tv_sec * 1000 +
647                         session_duration.tv_usec / 1000;
648
649   push_noit_m_str("state", nctx->remote_cn ? "connected" :
650                              (nctx->retry_event ? "disconnected" :
651                                                   "connecting"));
652   push_noit_m_u64("last_event_age_ms", last_event_ms);
653   push_noit_m_u64("session_length_ms", session_duration_ms);
654 }
655 static int
656 periodic_noit_metrics(eventer_t e, int mask, void *closure,
657                       struct timeval *now) {
658   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
659   mtev_connection_ctx_t **ctxs;
660   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
661   const char *key_id;
662   void *vconn;
663   int klen, n = 0, i;
664   char str[1024];
665   char uuid_str[1024], tmp_uuid_str[UUID_STR_LEN+1];
666   struct timeval epoch, diff;
667   u_int64_t uptime = 0;
668   char ip_str[128];
669
670   inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
671             sizeof(ip_str));
672
673   uuid_str[0] = '\0';
674   uuid_unparse_lower(self_stratcon_id, tmp_uuid_str);
675   if(stratcon_selfcheck_extended_id) {
676     strlcat(uuid_str, ip_str, sizeof(uuid_str)-37);
677     strlcat(uuid_str, "`selfcheck`selfcheck`", sizeof(uuid_str)-37);
678   }
679   strlcat(uuid_str, tmp_uuid_str, sizeof(uuid_str));
680
681 #define PUSH_BOTH(type, str) do { \
682   stratcon_datastore_push(type, \
683                           (struct sockaddr *)&self_stratcon_ip, \
684                           self_stratcon_hostname, str, NULL); \
685   stratcon_iep_line_processor(type, \
686                               (struct sockaddr *)&self_stratcon_ip, \
687                               self_stratcon_hostname, str, NULL); \
688 } while(0)
689
690   if(closure == NULL) {
691     /* Only do this the first time we get called */
692     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
693              (long unsigned int)now->tv_sec,
694              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
695              self_stratcon_hostname);
696     PUSH_BOTH(DS_OP_INSERT, strdup(str));
697   }
698
699   pthread_mutex_lock(&noits_lock);
700   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
701   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
702                        &vconn)) {
703     ctxs[n] = (mtev_connection_ctx_t *)vconn;
704     mtev_connection_ctx_ref(ctxs[n]);
705     n++;
706   }
707   pthread_mutex_unlock(&noits_lock);
708
709   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
710            (long unsigned int)now->tv_sec,
711            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
712   PUSH_BOTH(DS_OP_INSERT, strdup(str));
713
714   if(eventer_get_epoch(&epoch) != 0)
715     memcpy(&epoch, now, sizeof(epoch));
716   sub_timeval(*now, epoch, &diff);
717   uptime = diff.tv_sec;
718   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
719            (long unsigned int)now->tv_sec,
720            (long unsigned int)now->tv_usec/1000UL,
721            uuid_str, (long long unsigned int)uptime);
722   PUSH_BOTH(DS_OP_INSERT, strdup(str));
723
724   for(i=0; i<n; i++) {
725     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
726     mtev_connection_ctx_deref(ctxs[i]);
727   }
728   free(ctxs);
729   PUSH_BOTH(DS_OP_CHKPT, NULL);
730
731   add_timeval(e->whence, whence, &whence);
732   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
733   return 0;
734 }
735
736 static int
737 rest_show_noits_json(mtev_http_rest_closure_t *restc,
738                      int npats, char **pats) {
739   const char *jsonstr;
740   struct json_object *doc, *nodes, *node;
741   mtev_hash_table seen = MTEV_HASH_EMPTY;
742   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
743   char path[256];
744   const char *key_id;
745   const char *type = NULL, *want_cn = NULL;
746   int klen, n = 0, i, di, cnt;
747   void *vconn;
748   mtev_connection_ctx_t **ctxs;
749   mtev_conf_section_t *noit_configs;
750   struct timeval now, diff, last;
751   mtev_http_request *req = mtev_http_session_request(restc->http_ctx);
752
753   mtev_http_process_querystring(req);
754   type = mtev_http_request_querystring(req, "type");
755   want_cn = mtev_http_request_querystring(req, "cn");
756
757   gettimeofday(&now, NULL);
758
759   pthread_mutex_lock(&noits_lock);
760   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
761   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
762                        &vconn)) {
763     ctxs[n] = (mtev_connection_ctx_t *)vconn;
764     mtev_connection_ctx_ref(ctxs[n]);
765     n++;
766   }
767   pthread_mutex_unlock(&noits_lock);
768   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
769
770   doc = json_object_new_object();
771   nodes = json_object_new_array();
772   json_object_object_add(doc, "nodes", nodes);
773  
774   for(i=0; i<n; i++) {
775     char buff[256];
776     const char *feedtype = "unknown", *state = "unknown";
777     mtev_connection_ctx_t *ctx = ctxs[i];
778     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
779
780     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
781
782     /* If the user requested a specific type and we're not it, skip. */
783     if(type && strcmp(feedtype, type)) {
784         mtev_connection_ctx_deref(ctx);
785         continue;
786     }
787     /* If the user wants a specific CN... limit to that. */
788     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
789         mtev_connection_ctx_deref(ctx);
790         continue;
791     }
792
793     node = json_object_new_object();
794     snprintf(buff, sizeof(buff), "%llu.%06d",
795              (long long unsigned)ctx->last_connect.tv_sec,
796              (int)ctx->last_connect.tv_usec);
797     json_object_object_add(node, "last_connect", json_object_new_string(buff));
798     json_object_object_add(node, "state",
799          json_object_new_string(ctx->remote_cn ?
800                                   "connected" :
801                                   (ctx->retry_event ? "disconnected" :
802                                                       "connecting")));
803     if(ctx->e) {
804       char buff[128];
805       const char *addrstr = NULL;
806       struct sockaddr_in6 addr6;
807       socklen_t len = sizeof(addr6);
808       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
809         unsigned short port = 0;
810         if(addr6.sin6_family == AF_INET) {
811           addrstr = inet_ntop(addr6.sin6_family,
812                               &((struct sockaddr_in *)&addr6)->sin_addr,
813                               buff, sizeof(buff));
814           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
815           port = ntohs(port);
816         }
817         else if(addr6.sin6_family == AF_INET6) {
818           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
819                               buff, sizeof(buff));
820           port = ntohs(addr6.sin6_port);
821         }
822         if(addrstr != NULL) {
823           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
824                    ":%u", port);
825           json_object_object_add(node, "local", json_object_new_string(buff));
826         }
827       }
828     }
829     mtev_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
830                       0, free, NULL);
831     json_object_object_add(node, "remote", json_object_new_string(ctx->remote_str));
832     json_object_object_add(node, "type", json_object_new_string(feedtype));
833     if(ctx->retry_event) {
834       sub_timeval(ctx->retry_event->whence, now, &diff);
835       snprintf(buff, sizeof(buff), "%llu.%06d",
836                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
837       json_object_object_add(node, "next_attempt", json_object_new_string(buff));
838     }
839     else if(ctx->remote_cn) {
840       if(ctx->remote_cn)
841         json_object_object_add(node, "remote_cn", json_object_new_string(ctx->remote_cn));
842  
843       switch(jctx->state) {
844         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
845         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
846         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
847         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
848         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
849         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
850         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
851       }
852       json_object_object_add(node, "state", json_object_new_string(state));
853       snprintf(buff, sizeof(buff), "%08x:%08x",
854                jctx->header.chkpt.log, jctx->header.chkpt.marker);
855       json_object_object_add(node, "checkpoint", json_object_new_string(buff));
856       snprintf(buff, sizeof(buff), "%llu",
857                (unsigned long long)jctx->total_events);
858       json_object_object_add(node, "session_events", json_object_new_string(buff));
859       snprintf(buff, sizeof(buff), "%llu",
860                (unsigned long long)jctx->total_bytes_read);
861       json_object_object_add(node, "session_bytes", json_object_new_string(buff));
862  
863       sub_timeval(now, ctx->last_connect, &diff);
864       snprintf(buff, sizeof(buff), "%lld.%06d",
865                (long long)diff.tv_sec, (int)diff.tv_usec);
866       json_object_object_add(node, "session_duration", json_object_new_string(buff));
867  
868       if(jctx->header.tv_sec) {
869         last.tv_sec = jctx->header.tv_sec;
870         last.tv_usec = jctx->header.tv_usec;
871         snprintf(buff, sizeof(buff), "%llu.%06d",
872                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
873         json_object_object_add(node, "last_event", json_object_new_string(buff));
874         sub_timeval(now, last, &diff);
875         snprintf(buff, sizeof(buff), "%lld.%06d",
876                  (long long)diff.tv_sec, (int)diff.tv_usec);
877         json_object_object_add(node, "last_event_age", json_object_new_string(buff));
878       }
879     }
880     json_object_array_add(nodes, node);
881     mtev_connection_ctx_deref(ctx);
882   }
883   free(ctxs);
884
885   if(!type || !strcmp(type, "configured")) {
886     snprintf(path, sizeof(path), "//noits//noit");
887     noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
888     for(di=0; di<cnt; di++) {
889       char address[64], port_str[32], remote_str[98];
890       char expected_cn_buff[256], *expected_cn = NULL;
891       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
892                                  expected_cn_buff, sizeof(expected_cn_buff)))
893         expected_cn = expected_cn_buff;
894       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
895       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
896                                  address, sizeof(address))) {
897         void *v;
898         if(!mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
899                                    port_str, sizeof(port_str)))
900           strlcpy(port_str, "43191", sizeof(port_str));
901
902         /* If the user wants a specific CN... limit to that. */
903           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
904             continue;
905
906         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
907         if(!mtev_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
908           node = json_object_new_object();
909           json_object_object_add(node, "remote", json_object_new_string(remote_str));
910           json_object_object_add(node, "type", json_object_new_string("configured"));
911           if(expected_cn)
912             json_object_object_add(node, "cn", json_object_new_string(expected_cn));
913           json_object_array_add(nodes, node);
914         }
915       }
916     }
917     free(noit_configs);
918   }
919   mtev_hash_destroy(&seen, free, NULL);
920
921   mtev_http_response_ok(restc->http_ctx, "application/json");
922   jsonstr = json_object_to_json_string(doc);
923   mtev_http_response_append(restc->http_ctx, jsonstr, strlen(jsonstr));
924   mtev_http_response_append(restc->http_ctx, "\n", 1);
925   json_object_put(doc);
926   mtev_http_response_end(restc->http_ctx);
927   return 0;
928 }
929 static int
930 rest_show_noits(mtev_http_rest_closure_t *restc,
931                 int npats, char **pats) {
932   xmlDocPtr doc;
933   xmlNodePtr root;
934   mtev_hash_table *hdrs, seen = MTEV_HASH_EMPTY;
935   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
936   char path[256];
937   const char *key_id, *accepthdr;
938   const char *type = NULL, *want_cn = NULL;
939   int klen, n = 0, i, di, cnt;
940   void *vconn;
941   mtev_connection_ctx_t **ctxs;
942   mtev_conf_section_t *noit_configs;
943   struct timeval now, diff, last;
944   xmlNodePtr node;
945   mtev_http_request *req = mtev_http_session_request(restc->http_ctx);
946
947   if(npats == 1 && !strcmp(pats[0], ".json"))
948     return rest_show_noits_json(restc, npats, pats);
949
950   hdrs = mtev_http_request_headers_table(req);
951   if(mtev_hash_retr_str(hdrs, "accept", strlen("accept"), &accepthdr)) {
952     char buf[256], *brkt, *part;
953     strlcpy(buf, accepthdr, sizeof(buf));
954     for(part = strtok_r(buf, ",", &brkt);
955         part;
956         part = strtok_r(NULL, ",", &brkt)) {
957       while(*part && isspace(*part)) part++;
958       if(!strcmp(part, "application/json")) {
959         return rest_show_noits_json(restc, npats, pats);
960       }
961     }
962   }
963
964   mtev_http_process_querystring(req);
965   type = mtev_http_request_querystring(req, "type");
966   want_cn = mtev_http_request_querystring(req, "cn");
967
968   gettimeofday(&now, NULL);
969
970   pthread_mutex_lock(&noits_lock);
971   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
972   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
973                        &vconn)) {
974     ctxs[n] = (mtev_connection_ctx_t *)vconn;
975     mtev_connection_ctx_ref(ctxs[n]);
976     n++;
977   }
978   pthread_mutex_unlock(&noits_lock);
979   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
980
981   doc = xmlNewDoc((xmlChar *)"1.0");
982   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
983   xmlDocSetRootElement(doc, root);
984
985   for(i=0; i<n; i++) {
986     char buff[256];
987     const char *feedtype = "unknown", *state = "unknown";
988     mtev_connection_ctx_t *ctx = ctxs[i];
989     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
990
991     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
992
993     /* If the user requested a specific type and we're not it, skip. */
994     if(type && strcmp(feedtype, type)) {
995         mtev_connection_ctx_deref(ctx);
996         continue;
997     }
998     /* If the user wants a specific CN... limit to that. */
999     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
1000         mtev_connection_ctx_deref(ctx);
1001         continue;
1002     }
1003
1004     node = xmlNewNode(NULL, (xmlChar *)"noit");
1005     snprintf(buff, sizeof(buff), "%llu.%06d",
1006              (long long unsigned)ctx->last_connect.tv_sec,
1007              (int)ctx->last_connect.tv_usec);
1008     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1009     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1010                (xmlChar *)"connected" :
1011                (ctx->retry_event ? (xmlChar *)"disconnected" :
1012                                     (xmlChar *)"connecting"));
1013     if(ctx->e) {
1014       char buff[128];
1015       const char *addrstr = NULL;
1016       struct sockaddr_in6 addr6;
1017       socklen_t len = sizeof(addr6);
1018       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1019         unsigned short port = 0;
1020         if(addr6.sin6_family == AF_INET) {
1021           addrstr = inet_ntop(addr6.sin6_family,
1022                               &((struct sockaddr_in *)&addr6)->sin_addr,
1023                               buff, sizeof(buff));
1024           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
1025           port = ntohs(port);
1026         }
1027         else if(addr6.sin6_family == AF_INET6) {
1028           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1029                               buff, sizeof(buff));
1030           port = ntohs(addr6.sin6_port);
1031         }
1032         if(addrstr != NULL) {
1033           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1034                    ":%u", port);
1035           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1036         }
1037       }
1038     }
1039     mtev_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1040                       0, free, NULL);
1041     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1042     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1043     if(ctx->retry_event) {
1044       sub_timeval(ctx->retry_event->whence, now, &diff);
1045       snprintf(buff, sizeof(buff), "%llu.%06d",
1046                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1047       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1048     }
1049     else if(ctx->remote_cn) {
1050       if(ctx->remote_cn)
1051         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1052  
1053       switch(jctx->state) {
1054         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1055         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1056         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1057         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1058         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1059         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1060         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1061       }
1062       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1063       snprintf(buff, sizeof(buff), "%08x:%08x",
1064                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1065       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1066       snprintf(buff, sizeof(buff), "%llu",
1067                (unsigned long long)jctx->total_events);
1068       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1069       snprintf(buff, sizeof(buff), "%llu",
1070                (unsigned long long)jctx->total_bytes_read);
1071       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1072  
1073       sub_timeval(now, ctx->last_connect, &diff);
1074       snprintf(buff, sizeof(buff), "%lld.%06d",
1075                (long long)diff.tv_sec, (int)diff.tv_usec);
1076       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1077  
1078       if(jctx->header.tv_sec) {
1079         last.tv_sec = jctx->header.tv_sec;
1080         last.tv_usec = jctx->header.tv_usec;
1081         snprintf(buff, sizeof(buff), "%llu.%06d",
1082                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1083         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1084         sub_timeval(now, last, &diff);
1085         snprintf(buff, sizeof(buff), "%lld.%06d",
1086                  (long long)diff.tv_sec, (int)diff.tv_usec);
1087         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1088       }
1089     }
1090
1091     xmlAddChild(root, node);
1092     mtev_connection_ctx_deref(ctx);
1093   }
1094   free(ctxs);
1095
1096   if(!type || !strcmp(type, "configured")) {
1097     snprintf(path, sizeof(path), "//noits//noit");
1098     noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1099     for(di=0; di<cnt; di++) {
1100       char address[64], port_str[32], remote_str[98];
1101       char expected_cn_buff[256], *expected_cn = NULL;
1102       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1103                                  expected_cn_buff, sizeof(expected_cn_buff)))
1104         expected_cn = expected_cn_buff;
1105       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1106       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1107                                  address, sizeof(address))) {
1108         void *v;
1109         if(!mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1110                                    port_str, sizeof(port_str)))
1111           strlcpy(port_str, "43191", sizeof(port_str));
1112
1113         /* If the user wants a specific CN... limit to that. */
1114           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1115             continue;
1116
1117         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1118         if(!mtev_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1119           node = xmlNewNode(NULL, (xmlChar *)"noit");
1120           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1121           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1122           if(expected_cn)
1123             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1124           xmlAddChild(root, node);
1125         }
1126       }
1127     }
1128     free(noit_configs);
1129   }
1130   mtev_hash_destroy(&seen, free, NULL);
1131
1132   mtev_http_response_ok(restc->http_ctx, "text/xml");
1133   mtev_http_response_xml(restc->http_ctx, doc);
1134   mtev_http_response_end(restc->http_ctx);
1135   xmlFreeDoc(doc);
1136   return 0;
1137 }
1138 static int
1139 stratcon_add_noit(const char *target, unsigned short port,
1140                   const char *cn) {
1141   int cnt;
1142   char path[256];
1143   char port_str[6];
1144   mtev_conf_section_t *noit_configs, parent;
1145   xmlNodePtr newnoit, config, cnnode;
1146
1147   snprintf(path, sizeof(path),
1148            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1149   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1150   free(noit_configs);
1151   if(cnt != 0) return -1;
1152
1153   parent = mtev_conf_get_section(NULL, "//noits//include//noits");
1154   if(!parent) parent = mtev_conf_get_section(NULL, "//noits");
1155   if(!parent) return -1;
1156   snprintf(port_str, sizeof(port_str), "%d", port);
1157   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1158   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1159   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1160   xmlAddChild(parent, newnoit);
1161   if(cn) {
1162     config = xmlNewNode(NULL, (xmlChar *)"config");
1163     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1164     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1165     xmlAddChild(config, cnnode);
1166     xmlAddChild(newnoit, config);
1167     pthread_mutex_lock(&noit_ip_by_cn_lock);
1168     mtev_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1169                       strdup(target), free, free);
1170     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1171   }
1172   if(stratcon_datastore_get_enabled())
1173     stratcon_streamer_connection(NULL, target, "noit",
1174                                  stratcon_jlog_recv_handler,
1175                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1176                                  NULL,
1177                                  jlog_streamer_ctx_free);
1178   if(stratcon_iep_get_enabled())
1179     stratcon_streamer_connection(NULL, target, "noit",
1180                                  stratcon_jlog_recv_handler,
1181                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1182                                  NULL,
1183                                  jlog_streamer_ctx_free);
1184   return 1;
1185 }
1186 static int
1187 stratcon_remove_noit(const char *target, unsigned short port) {
1188   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
1189   const char *key_id;
1190   int klen, n = -1, i, cnt = 0;
1191   void *vconn;
1192   mtev_connection_ctx_t **ctx;
1193   mtev_conf_section_t *noit_configs;
1194   char path[256];
1195   char remote_str[256];
1196
1197   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1198
1199   snprintf(path, sizeof(path),
1200            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1201   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
1202   for(i=0; i<cnt; i++) {
1203     char expected_cn[256];
1204     if(mtev_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1205                                expected_cn, sizeof(expected_cn))) {
1206       pthread_mutex_lock(&noit_ip_by_cn_lock);
1207       mtev_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1208                        free, free);
1209       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1210     }
1211     CONF_REMOVE(noit_configs[i]);
1212     xmlUnlinkNode(noit_configs[i]);
1213     xmlFreeNode(noit_configs[i]);
1214     n = 0;
1215   }
1216   free(noit_configs);
1217
1218   pthread_mutex_lock(&noits_lock);
1219   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
1220   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
1221                        &vconn)) {
1222     if(!strcmp(((mtev_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1223       ctx[n] = (mtev_connection_ctx_t *)vconn;
1224       mtev_connection_ctx_ref(ctx[n]);
1225       n++;
1226     }
1227   }
1228   pthread_mutex_unlock(&noits_lock);
1229   for(i=0; i<n; i++) {
1230     mtev_connection_ctx_dealloc(ctx[i]); /* once for the record */
1231     mtev_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1232   }
1233   free(ctx);
1234   return n;
1235 }
1236 static int
1237 rest_set_noit(mtev_http_rest_closure_t *restc,
1238               int npats, char **pats) {
1239   const char *cn = NULL;
1240   mtev_http_session_ctx *ctx = restc->http_ctx;
1241   mtev_http_request *req = mtev_http_session_request(ctx);
1242   unsigned short port = 43191;
1243   if(npats < 1 || npats > 2)
1244     mtev_http_response_server_error(ctx, "text/xml");
1245   if(npats == 2) port = atoi(pats[1]);
1246   mtev_http_process_querystring(req);
1247   cn = mtev_http_request_querystring(req, "cn");
1248   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1249     mtev_http_response_ok(ctx, "text/xml");
1250   else
1251     mtev_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1252   if(mtev_conf_write_file(NULL) != 0)
1253     mtevL(noit_error, "local config write failed\n");
1254   mtev_conf_mark_changed();
1255   mtev_http_response_end(ctx);
1256   return 0;
1257 }
1258 static int
1259 rest_delete_noit(mtev_http_rest_closure_t *restc,
1260                  int npats, char **pats) {
1261   mtev_http_session_ctx *ctx = restc->http_ctx;
1262   unsigned short port = 43191;
1263   if(npats < 1 || npats > 2)
1264     mtev_http_response_server_error(ctx, "text/xml");
1265   if(npats == 2) port = atoi(pats[1]);
1266   if(stratcon_remove_noit(pats[0], port) >= 0)
1267     mtev_http_response_ok(ctx, "text/xml");
1268   else
1269     mtev_http_response_not_found(ctx, "text/xml");
1270   if(mtev_conf_write_file(NULL) != 0)
1271     mtevL(noit_error, "local config write failed\n");
1272   mtev_conf_mark_changed();
1273   mtev_http_response_end(ctx);
1274   return 0;
1275 }
1276 static int
1277 stratcon_console_conf_noits(mtev_console_closure_t ncct,
1278                             int argc, char **argv,
1279                             mtev_console_state_t *dstate,
1280                             void *closure) {
1281   char *cp, target[128];
1282   unsigned short port = 43191;
1283   int adding = (int)(vpsized_int)closure;
1284   if(argc != 1)
1285     return -1;
1286
1287   cp = strchr(argv[0], ':');
1288   if(cp) {
1289     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1290     port = atoi(cp+1);
1291   }
1292   else strlcpy(target, argv[0], sizeof(target));
1293   if(adding) {
1294     if(stratcon_add_noit(target, port, NULL) >= 0) {
1295       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1296     }
1297     else {
1298       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1299     }
1300   }
1301   else {
1302     if(stratcon_remove_noit(target, port) >= 0) {
1303       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1304     }
1305     else {
1306       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1307     }
1308   }
1309   return 0;
1310 }
1311
1312 static void
1313 register_console_streamer_commands() {
1314   mtev_console_state_t *tl;
1315   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1316
1317   tl = mtev_console_state_initial();
1318   showcmd = mtev_console_state_get_cmd(tl, "show");
1319   assert(showcmd && showcmd->dstate);
1320   confcmd = mtev_console_state_get_cmd(tl, "configure");
1321   conftcmd = mtev_console_state_get_cmd(confcmd->dstate, "terminal");
1322   conftnocmd = mtev_console_state_get_cmd(conftcmd->dstate, "no");
1323
1324   mtev_console_state_add_cmd(conftcmd->dstate,
1325     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1326   mtev_console_state_add_cmd(conftnocmd->dstate,
1327     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1328
1329   mtev_console_state_add_cmd(showcmd->dstate,
1330     NCSCMD("noit", stratcon_console_show_noits,
1331            stratcon_console_noit_opts, NULL, (void *)1));
1332   mtev_console_state_add_cmd(showcmd->dstate,
1333     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1334 }
1335
1336 int
1337 stratcon_streamer_connection(const char *toplevel, const char *destination,
1338                              const char *type,
1339                              eventer_func_t handler,
1340                              void *(*handler_alloc)(void), void *handler_ctx,
1341                              void (*handler_free)(void *)) {
1342   return mtev_connections_from_config(&noits, &noits_lock,
1343                                       toplevel, destination, type,
1344                                       handler, handler_alloc, handler_ctx,
1345                                       handler_free);
1346 }
1347
1348 mtev_reverse_acl_decision_t
1349 mtev_reverse_socket_allow_noits(const char *id, acceptor_closure_t *ac) {
1350   if(!strncmp(id, "noit/", 5)) return MTEV_ACL_ALLOW;
1351   return MTEV_ACL_ABSTAIN;
1352 }
1353
1354 void
1355 stratcon_jlog_streamer_init(const char *toplevel) {
1356   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1357   struct in_addr remote;
1358   char uuid_str[UUID_STR_LEN + 1];
1359
1360   mtev_reverse_socket_acl(mtev_reverse_socket_allow_noits);
1361   pthread_mutex_init(&noits_lock, NULL);
1362   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1363   eventer_name_callback("stratcon_jlog_recv_handler",
1364                         stratcon_jlog_recv_handler);
1365   register_console_streamer_commands();
1366   stratcon_jlog_streamer_reload(toplevel);
1367   stratcon_streamer_connection(toplevel, "", "noit", NULL, NULL, NULL, NULL);
1368   assert(mtev_http_rest_register_auth(
1369     "GET", "/noits/", "^show(.json)?$", rest_show_noits,
1370              mtev_http_rest_client_cert_auth
1371   ) == 0);
1372   assert(mtev_http_rest_register_auth(
1373     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1374              mtev_http_rest_client_cert_auth
1375   ) == 0);
1376   assert(mtev_http_rest_register_auth(
1377     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1378              mtev_http_rest_client_cert_auth
1379   ) == 0);
1380   assert(mtev_http_rest_register_auth(
1381     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1382              mtev_http_rest_client_cert_auth
1383   ) == 0);
1384   assert(mtev_http_rest_register_auth(
1385     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1386              mtev_http_rest_client_cert_auth
1387   ) == 0);
1388
1389   uuid_clear(self_stratcon_id);
1390
1391   if(mtev_conf_get_stringbuf(NULL, "/stratcon/@id",
1392                              uuid_str, sizeof(uuid_str)) &&
1393      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1394     int period;
1395     mtev_conf_get_boolean(NULL, "/stratcon/@extended_id",
1396                           &stratcon_selfcheck_extended_id);
1397     /* If a UUID was provided for stratcon itself, we will report metrics
1398      * on a large variety of things (including all noits).
1399      */
1400     if(mtev_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1401        period > 0) {
1402       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1403       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1404     }
1405     self_stratcon_ip.sin_family = AF_INET;
1406     remote.s_addr = 0xffffffff;
1407     mtev_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1408     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1409     eventer_add_in(periodic_noit_metrics, NULL, whence);
1410   }
1411 }
1412
Note: See TracBrowser for help on using the browser.