root/src/stratcon_jlog_streamer.c

Revision 304ec80b8cf842fc0abe5f9029790908b6455957, 44.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 1 week ago)

Convert to libmtev.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <unistd.h>
37 #include <assert.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #ifdef HAVE_SYS_FILIO_H
42 #include <sys/filio.h>
43 #endif
44 #include <netinet/in.h>
45 #include <sys/un.h>
46 #include <arpa/inet.h>
47
48 #include <eventer/eventer.h>
49 #include <mtev_conf.h>
50 #include <mtev_hash.h>
51 #include <mtev_log.h>
52 #include <mtev_getip.h>
53 #include <mtev_rest.h>
54
55 #include "noit_mtev_bridge.h"
56 #include "stratcon_dtrace_probes.h"
57 #include "noit_jlog_listener.h"
58 #include "stratcon_datastore.h"
59 #include "stratcon_jlog_streamer.h"
60 #include "stratcon_iep.h"
61
62 pthread_mutex_t noits_lock;
63 mtev_hash_table noits = MTEV_HASH_EMPTY;
64 pthread_mutex_t noit_ip_by_cn_lock;
65 mtev_hash_table noit_ip_by_cn = MTEV_HASH_EMPTY;
66 static uuid_t self_stratcon_id;
67 static char self_stratcon_hostname[256] = "\0";
68 static struct sockaddr_in self_stratcon_ip;
69 static mtev_boolean stratcon_selfcheck_extended_id = mtev_true;
70
71 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
72
73 static const char *feed_type_to_str(int jlog_feed_cmd) {
74   switch(jlog_feed_cmd) {
75     case NOIT_JLOG_DATA_FEED: return "durable/storage";
76     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
77   }
78   return "unknown";
79 }
80
81 #define GET_EXPECTED_CN(nctx, cn) do { \
82   void *vcn; \
83   cn = NULL; \
84   if(nctx->config && \
85      mtev_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
86      cn = vcn; \
87   } \
88 } while(0)
89 #define GET_FEEDTYPE(nctx, feedtype) do { \
90   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
91   feedtype = "unknown"; \
92   if(_jctx->push == stratcon_datastore_push) \
93     feedtype = "storage"; \
94   else if(_jctx->push == stratcon_iep_line_processor) \
95     feedtype = "iep"; \
96 } while(0)
97
98 static int
99 remote_str_sort(const void *a, const void *b) {
100   int rv;
101   mtev_connection_ctx_t * const *actx = a;
102   mtev_connection_ctx_t * const *bctx = b;
103   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
104   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
105   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
106   if(rv) return rv;
107   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
108            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
109 }
110 static void
111 nc_print_noit_conn_brief(mtev_console_closure_t ncct,
112                           mtev_connection_ctx_t *ctx) {
113   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
114   struct timeval now, diff, session_duration;
115   char cmdbuf[4096];
116   const char *feedtype = "unknown";
117   const char *lasttime = "never";
118   if(ctx->last_connect.tv_sec != 0) {
119     time_t r = ctx->last_connect.tv_sec;
120     struct tm tbuf, *tm;
121     tm = gmtime_r(&r, &tbuf);
122     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
123     lasttime = cmdbuf;
124   }
125   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
126             ctx->remote_cn ? "connected" :
127                              (ctx->retry_event ? "disconnected" :
128                                                    "connecting"), lasttime);
129   if(ctx->e) {
130     char buff[128];
131     const char *addrstr = NULL;
132     struct sockaddr_in6 addr6;
133     socklen_t len = sizeof(addr6);
134     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
135       unsigned short port = 0;
136       if(addr6.sin6_family == AF_INET) {
137         addrstr = inet_ntop(addr6.sin6_family,
138                             &((struct sockaddr_in *)&addr6)->sin_addr,
139                             buff, sizeof(buff));
140         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
141         port = ntohs(port);
142       }
143       else if(addr6.sin6_family == AF_INET6) {
144         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
145                             buff, sizeof(buff));
146         port = ntohs(addr6.sin6_port);
147       }
148       if(addrstr != NULL)
149         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
150       else
151         nc_printf(ncct, "\tLocal address not interpretable\n");
152     }
153     else {
154       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
155                 ctx->e->fd, strerror(errno));
156     }
157   }
158   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
159   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
160   gettimeofday(&now, NULL);
161   if(ctx->timeout_event) {
162     sub_timeval(ctx->timeout_event->whence, now, &diff);
163     nc_printf(ncct, "\tTimeout scheduled for %lld.%06us\n",
164               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
165   }
166   if(ctx->retry_event) {
167     sub_timeval(ctx->retry_event->whence, now, &diff);
168     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
169               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
170   }
171   else if(ctx->remote_cn) {
172     nc_printf(ncct, "\tRemote CN: '%s'\n",
173               ctx->remote_cn ? ctx->remote_cn : "???");
174     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
175       struct timeval last;
176       double session_duration_seconds;
177       const char *state = "unknown";
178
179       switch(jctx->state) {
180         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
181         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
182         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
183         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
184         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
185         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
186         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
187       }
188       last.tv_sec = jctx->header.tv_sec;
189       last.tv_usec = jctx->header.tv_usec;
190       sub_timeval(now, last, &diff);
191       sub_timeval(now, ctx->last_connect, &session_duration);
192       session_duration_seconds = session_duration.tv_sec +
193                                  (double)session_duration.tv_usec/1000000.0;
194       nc_printf(ncct, "\tState: %s\n"
195                       "\tNext checkpoint: [%08x:%08x]\n"
196                       "\tLast event: %lld.%06us ago\n"
197                       "\tEvents this session: %llu (%0.2f/s)\n"
198                       "\tOctets this session: %llu (%0.2f/s)\n",
199                 state,
200                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
201                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
202                 jctx->total_events,
203                 (double)jctx->total_events/session_duration_seconds,
204                 jctx->total_bytes_read,
205                 (double)jctx->total_bytes_read/session_duration_seconds);
206     }
207     else {
208       nc_printf(ncct, "\tUnknown type.\n");
209     }
210   }
211 }
212
213 jlog_streamer_ctx_t *
214 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
215   jlog_streamer_ctx_t *ctx;
216   ctx = stratcon_jlog_streamer_ctx_alloc();
217   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
218   ctx->push = stratcon_datastore_push;
219   return ctx;
220 }
221 jlog_streamer_ctx_t *
222 stratcon_jlog_streamer_ctx_alloc(void) {
223   jlog_streamer_ctx_t *ctx;
224   ctx = calloc(1, sizeof(*ctx));
225   return ctx;
226 }
227
228 void
229 jlog_streamer_ctx_free(void *cl) {
230   jlog_streamer_ctx_t *ctx = cl;
231   if(ctx->buffer) free(ctx->buffer);
232   free(ctx);
233 }
234
235 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
236 static int
237 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
238   int len, mask;
239   while(ctx->bytes_read < ctx->bytes_expected) {
240     len = Eread(ctx->buffer + ctx->bytes_read,
241                 ctx->bytes_expected - ctx->bytes_read);
242     if(len < 0) {
243       *newmask = mask;
244       return -1;
245     }
246     /* if we get 0 inside SSL, and there was a real error, we
247      * will actually get a -1 here.
248      * if(len == 0) return ctx->bytes_read;
249      */
250     ctx->total_bytes_read += len;
251     ctx->bytes_read += len;
252   }
253   assert(ctx->bytes_read == ctx->bytes_expected);
254   return ctx->bytes_read;
255 }
256 #define FULLREAD(e,ctx,size) do { \
257   int mask, len; \
258   if(!ctx->bytes_expected) { \
259     ctx->bytes_expected = size; \
260     if(ctx->buffer) free(ctx->buffer); \
261     ctx->buffer = malloc(size + 1); \
262     if(ctx->buffer == NULL) { \
263       mtevL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
264       goto socket_error; \
265     } \
266     ctx->buffer[size] = '\0'; \
267   } \
268   len = __read_on_ctx(e, ctx, &mask); \
269   if(len < 0) { \
270     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
271     mtevL(noit_error, "[%s] [%s] SSL read error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)", \
272           nctx->remote_cn ? nctx->remote_cn : "(null)", \
273           strerror(errno)); \
274     goto socket_error; \
275   } \
276   ctx->bytes_read = 0; \
277   ctx->bytes_expected = 0; \
278   if(len != size) { \
279     mtevL(noit_error, "[%s] [%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
280           nctx->remote_str ? nctx->remote_str : "(null)", \
281           nctx->remote_cn ? nctx->remote_cn : "(null)", \
282           ctx->state, len, (long unsigned int)size); \
283     goto socket_error; \
284   } \
285 } while(0)
286
287 int
288 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
289                            struct timeval *now) {
290   mtev_connection_ctx_t *nctx = closure;
291   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
292   jlog_streamer_ctx_t dummy;
293   int len;
294   jlog_id n_chkpt;
295   const char *cn_expected, *feedtype;
296   GET_EXPECTED_CN(nctx, cn_expected);
297   GET_FEEDTYPE(nctx, feedtype);
298
299   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
300     if(write(e->fd, e, 0) == -1)
301       mtevL(noit_error, "[%s] [%s] socket error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)",
302             nctx->remote_cn ? nctx->remote_cn : "(null)", strerror(errno));
303  socket_error:
304     ctx->state = JLOG_STREAMER_WANT_INITIATE;
305     ctx->count = 0;
306     ctx->needs_chkpt = 0;
307     ctx->bytes_read = 0;
308     ctx->bytes_expected = 0;
309     if(ctx->buffer) free(ctx->buffer);
310     ctx->buffer = NULL;
311     nctx->schedule_reattempt(nctx, now);
312     nctx->close(nctx, e);
313     return 0;
314   }
315
316   mtev_connection_update_timeout(nctx);
317   while(1) {
318     switch(ctx->state) {
319       case JLOG_STREAMER_WANT_INITIATE:
320         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
321                               sizeof(ctx->jlog_feed_cmd),
322                               &mask, e);
323         if(len < 0) {
324           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
325           goto socket_error;
326         }
327         if(len != sizeof(ctx->jlog_feed_cmd)) {
328           mtevL(noit_error, "[%s] [%s] short write [%d/%d] on initiating stream.\n",
329                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)",
330                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
331           goto socket_error;
332         }
333         ctx->state = JLOG_STREAMER_WANT_COUNT;
334         break;
335
336       case JLOG_STREAMER_WANT_ERROR:
337         FULLREAD(e, ctx, 0 - ctx->count);
338         mtevL(noit_error, "[%s] [%s] %.*s\n", nctx->remote_str ? nctx->remote_str : "(null)",
339               nctx->remote_cn ? nctx->remote_cn : "(null)", 0 - ctx->count, ctx->buffer);
340         free(ctx->buffer); ctx->buffer = NULL;
341         goto socket_error;
342         break;
343
344       case JLOG_STREAMER_WANT_COUNT:
345         FULLREAD(e, ctx, sizeof(u_int32_t));
346         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
347         ctx->count = ntohl(dummy.count);
348         ctx->needs_chkpt = 0;
349         free(ctx->buffer); ctx->buffer = NULL;
350         STRATCON_STREAM_COUNT(e->fd, (char *)feedtype,
351                                    nctx->remote_str, (char *)cn_expected,
352                                    ctx->count);
353         if(ctx->count < 0)
354           ctx->state = JLOG_STREAMER_WANT_ERROR;
355         else
356           ctx->state = JLOG_STREAMER_WANT_HEADER;
357         break;
358
359       case JLOG_STREAMER_WANT_HEADER:
360         if(ctx->count == 0) {
361           ctx->state = JLOG_STREAMER_WANT_COUNT;
362           break;
363         }
364         FULLREAD(e, ctx, sizeof(ctx->header));
365         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
366         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
367         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
368         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
369         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
370         ctx->header.message_len = ntohl(dummy.header.message_len);
371         STRATCON_STREAM_HEADER(e->fd, (char *)feedtype,
372                                     nctx->remote_str, (char *)cn_expected,
373                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
374                                     ctx->header.tv_sec, ctx->header.tv_usec,
375                                     ctx->header.message_len);
376         free(ctx->buffer); ctx->buffer = NULL;
377         ctx->state = JLOG_STREAMER_WANT_BODY;
378         break;
379
380       case JLOG_STREAMER_WANT_BODY:
381         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
382         STRATCON_STREAM_BODY(e->fd, (char *)feedtype,
383                                   nctx->remote_str, (char *)cn_expected,
384                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
385                                   ctx->header.tv_sec, ctx->header.tv_usec,
386                                   ctx->buffer);
387         if(ctx->header.message_len > 0) {
388           ctx->needs_chkpt = 1;
389           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
390                     ctx->buffer, NULL);
391         }
392         else if(ctx->buffer)
393           free(ctx->buffer);
394         /* Don't free the buffer, it's used by the datastore process. */
395         ctx->buffer = NULL;
396         ctx->count--;
397         ctx->total_events++;
398         if(ctx->count == 0 && ctx->needs_chkpt) {
399           eventer_t completion_e;
400           eventer_remove_fd(e->fd);
401           completion_e = eventer_alloc();
402           memcpy(completion_e, e, sizeof(*e));
403           nctx->e = completion_e;
404           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
405           ctx->state = JLOG_STREAMER_IS_ASYNC;
406           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
407                     NULL, completion_e);
408           mtevL(noit_debug, "Pushing %s batch async [%s] [%s]: [%u/%u]\n",
409                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
410                 nctx->remote_str ? nctx->remote_str : "(null)",
411                 nctx->remote_cn ? nctx->remote_cn : "(null)",
412                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
413           mtev_connection_disable_timeout(nctx);
414           return 0;
415         }
416         else if(ctx->count == 0)
417           ctx->state = JLOG_STREAMER_WANT_CHKPT;
418         else
419           ctx->state = JLOG_STREAMER_WANT_HEADER;
420         break;
421
422       case JLOG_STREAMER_IS_ASYNC:
423         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
424       case JLOG_STREAMER_WANT_CHKPT:
425         mtevL(noit_debug, "Pushing %s checkpoint [%s] [%s]: [%u/%u]\n",
426               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
427               nctx->remote_str ? nctx->remote_str : "(null)",
428               nctx->remote_cn ? nctx->remote_cn : "(null)",
429               ctx->header.chkpt.log, ctx->header.chkpt.marker);
430         n_chkpt.log = htonl(ctx->header.chkpt.log);
431         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
432
433         /* screw short writes.  I'd rather die than not write my data! */
434         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
435                               &mask, e);
436         if(len < 0) {
437           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
438           goto socket_error;
439         }
440         if(len != sizeof(jlog_id)) {
441           mtevL(noit_error, "[%s] [%s] short write on checkpointing stream.\n",
442             nctx->remote_str ? nctx->remote_str : "(null)",
443             nctx->remote_cn ? nctx->remote_cn : "(null)");
444           goto socket_error;
445         }
446         STRATCON_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
447                                         nctx->remote_str, (char *)cn_expected,
448                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
449         ctx->state = JLOG_STREAMER_WANT_COUNT;
450         break;
451     }
452   }
453   /* never get here */
454 }
455
456
457 int
458 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
459   int rv = -1;
460   void *vip;
461   pthread_mutex_lock(&noit_ip_by_cn_lock);
462   if(mtev_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
463     int new_len;
464     char *new_ip = (char *)vip;
465     new_len = strlen(new_ip);
466     strlcpy(ip, new_ip, len);
467     if(new_len >= len) rv = new_len+1;
468     else rv = 0;
469   }
470   pthread_mutex_unlock(&noit_ip_by_cn_lock);
471   return rv;
472 }
473 void
474 stratcon_jlog_streamer_recache_noit() {
475   int di, cnt;
476   mtev_conf_section_t *noit_configs;
477   noit_configs = mtev_conf_get_sections(NULL, "//noits//noit", &cnt);
478   pthread_mutex_lock(&noit_ip_by_cn_lock);
479   mtev_hash_delete_all(&noit_ip_by_cn, free, free);
480   for(di=0; di<cnt; di++) {
481     char address[64];
482     if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
483                                  address, sizeof(address))) {
484       char expected_cn[256];
485       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
486                                  expected_cn, sizeof(expected_cn)))
487         mtev_hash_store(&noit_ip_by_cn,
488                         strdup(expected_cn), strlen(expected_cn),
489                         strdup(address));
490     }
491   }
492   free(noit_configs);
493   pthread_mutex_unlock(&noit_ip_by_cn_lock);
494 }
495 void
496 stratcon_jlog_streamer_reload(const char *toplevel) {
497   /* flush and repopulate the cn cache */
498   stratcon_jlog_streamer_recache_noit();
499   if(!stratcon_datastore_get_enabled()) return;
500   stratcon_streamer_connection(toplevel, NULL, "noit",
501                                stratcon_jlog_recv_handler,
502                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
503                                NULL,
504                                jlog_streamer_ctx_free);
505 }
506
507 char *
508 stratcon_console_noit_opts(mtev_console_closure_t ncct,
509                            mtev_console_state_stack_t *stack,
510                            mtev_console_state_t *dstate,
511                            int argc, char **argv, int idx) {
512   if(argc == 1) {
513     mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
514     const char *key_id;
515     int klen, i = 0;
516     void *vconn, *vcn;
517     mtev_connection_ctx_t *ctx;
518     mtev_hash_table dedup = MTEV_HASH_EMPTY;
519
520     pthread_mutex_lock(&noits_lock);
521     while(mtev_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
522       ctx = (mtev_connection_ctx_t *)vconn;
523       vcn = NULL;
524       if(ctx->config && mtev_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
525          !mtev_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
526         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
527           if(idx == i) {
528             pthread_mutex_unlock(&noits_lock);
529             mtev_hash_destroy(&dedup, NULL, NULL);
530             return strdup(vcn);
531           }
532           i++;
533         }
534       }
535       if(ctx->remote_str &&
536          !mtev_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
537         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
538           if(idx == i) {
539             pthread_mutex_unlock(&noits_lock);
540             mtev_hash_destroy(&dedup, NULL, NULL);
541             return strdup(ctx->remote_str);
542           }
543           i++;
544         }
545       }
546     }
547     pthread_mutex_unlock(&noits_lock);
548     mtev_hash_destroy(&dedup, NULL, NULL);
549   }
550   if(argc == 2)
551     return mtev_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
552   return NULL;
553 }
554 static int
555 stratcon_console_show_noits(mtev_console_closure_t ncct,
556                             int argc, char **argv,
557                             mtev_console_state_t *dstate,
558                             void *closure) {
559   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
560   const char *key_id, *ecn;
561   int klen, n = 0, i;
562   void *vconn;
563   mtev_connection_ctx_t **ctx;
564
565   if(closure != (void *)0 && argc == 0) {
566     nc_printf(ncct, "takes an argument\n");
567     return 0;
568   }
569   if(closure == (void *)0 && argc > 0) {
570     nc_printf(ncct, "takes no arguments\n");
571     return 0;
572   }
573   pthread_mutex_lock(&noits_lock);
574   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
575   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
576                        &vconn)) {
577     ctx[n] = (mtev_connection_ctx_t *)vconn;
578     if(argc == 0 ||
579        !strcmp(ctx[n]->remote_str, argv[0]) ||
580        (ctx[n]->config && mtev_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
581         !strcmp(ecn, argv[0]))) {
582       mtev_connection_ctx_ref(ctx[n]);
583       n++;
584     }
585   }
586   pthread_mutex_unlock(&noits_lock);
587   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
588   for(i=0; i<n; i++) {
589     nc_print_noit_conn_brief(ncct, ctx[i]);
590     mtev_connection_ctx_deref(ctx[i]);
591   }
592   free(ctx);
593   return 0;
594 }
595
596 static void
597 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
598                        mtev_connection_ctx_t *nctx) {
599   struct timeval last, session_duration, diff;
600   u_int64_t session_duration_ms, last_event_ms;
601   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
602   char str[1024], *wr;
603   int len;
604   const char *cn_expected;
605   const char *feedtype = "unknown";
606
607   GET_FEEDTYPE(nctx, feedtype);
608   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
609
610   GET_EXPECTED_CN(nctx, cn_expected);
611   if(!cn_expected) return;
612
613   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
614            (long unsigned int)now->tv_sec,
615            (long unsigned int)now->tv_usec/1000UL,
616            uuid_str, cn_expected, feedtype);
617   wr = str + strlen(str);
618   len = sizeof(str) - (wr - str);
619
620   /* Now we write NAME TYPE VALUE into wr each time and push it */
621 #define push_noit_m_str(name, value) do { \
622   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
623   stratcon_datastore_push(DS_OP_INSERT, \
624                           (struct sockaddr *)&self_stratcon_ip, \
625                           self_stratcon_hostname, strdup(str), NULL); \
626   stratcon_iep_line_processor(DS_OP_INSERT, \
627                               (struct sockaddr *)&self_stratcon_ip, \
628                               self_stratcon_hostname, strdup(str), NULL); \
629 } while(0)
630 #define push_noit_m_u64(name, value) do { \
631   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
632   stratcon_datastore_push(DS_OP_INSERT, \
633                           (struct sockaddr *)&self_stratcon_ip, \
634                           self_stratcon_hostname, strdup(str), NULL); \
635   stratcon_iep_line_processor(DS_OP_INSERT, \
636                               (struct sockaddr *)&self_stratcon_ip, \
637                               self_stratcon_hostname, strdup(str), NULL); \
638 } while(0)
639
640   last.tv_sec = jctx->header.tv_sec;
641   last.tv_usec = jctx->header.tv_usec;
642   sub_timeval(*now, last, &diff);
643   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
644   sub_timeval(*now, nctx->last_connect, &session_duration);
645   session_duration_ms = session_duration.tv_sec * 1000 +
646                         session_duration.tv_usec / 1000;
647
648   push_noit_m_str("state", nctx->remote_cn ? "connected" :
649                              (nctx->retry_event ? "disconnected" :
650                                                   "connecting"));
651   push_noit_m_u64("last_event_age_ms", last_event_ms);
652   push_noit_m_u64("session_length_ms", session_duration_ms);
653 }
654 static int
655 periodic_noit_metrics(eventer_t e, int mask, void *closure,
656                       struct timeval *now) {
657   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
658   mtev_connection_ctx_t **ctxs;
659   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
660   const char *key_id;
661   void *vconn;
662   int klen, n = 0, i;
663   char str[1024];
664   char uuid_str[1024], tmp_uuid_str[UUID_STR_LEN+1];
665   struct timeval epoch, diff;
666   u_int64_t uptime = 0;
667   char ip_str[128];
668
669   inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
670             sizeof(ip_str));
671
672   uuid_str[0] = '\0';
673   uuid_unparse_lower(self_stratcon_id, tmp_uuid_str);
674   if(stratcon_selfcheck_extended_id) {
675     strlcat(uuid_str, ip_str, sizeof(uuid_str)-37);
676     strlcat(uuid_str, "`selfcheck`selfcheck`", sizeof(uuid_str)-37);
677   }
678   strlcat(uuid_str, tmp_uuid_str, sizeof(uuid_str));
679
680 #define PUSH_BOTH(type, str) do { \
681   stratcon_datastore_push(type, \
682                           (struct sockaddr *)&self_stratcon_ip, \
683                           self_stratcon_hostname, str, NULL); \
684   stratcon_iep_line_processor(type, \
685                               (struct sockaddr *)&self_stratcon_ip, \
686                               self_stratcon_hostname, str, NULL); \
687 } while(0)
688
689   if(closure == NULL) {
690     /* Only do this the first time we get called */
691     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
692              (long unsigned int)now->tv_sec,
693              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
694              self_stratcon_hostname);
695     PUSH_BOTH(DS_OP_INSERT, strdup(str));
696   }
697
698   pthread_mutex_lock(&noits_lock);
699   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
700   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
701                        &vconn)) {
702     ctxs[n] = (mtev_connection_ctx_t *)vconn;
703     mtev_connection_ctx_ref(ctxs[n]);
704     n++;
705   }
706   pthread_mutex_unlock(&noits_lock);
707
708   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
709            (long unsigned int)now->tv_sec,
710            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
711   PUSH_BOTH(DS_OP_INSERT, strdup(str));
712
713   if(eventer_get_epoch(&epoch) != 0)
714     memcpy(&epoch, now, sizeof(epoch));
715   sub_timeval(*now, epoch, &diff);
716   uptime = diff.tv_sec;
717   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
718            (long unsigned int)now->tv_sec,
719            (long unsigned int)now->tv_usec/1000UL,
720            uuid_str, (long long unsigned int)uptime);
721   PUSH_BOTH(DS_OP_INSERT, strdup(str));
722
723   for(i=0; i<n; i++) {
724     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
725     mtev_connection_ctx_deref(ctxs[i]);
726   }
727   free(ctxs);
728   PUSH_BOTH(DS_OP_CHKPT, NULL);
729
730   add_timeval(e->whence, whence, &whence);
731   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
732   return 0;
733 }
734
735 static int
736 rest_show_noits(mtev_http_rest_closure_t *restc,
737                 int npats, char **pats) {
738   xmlDocPtr doc;
739   xmlNodePtr root;
740   mtev_hash_table seen = MTEV_HASH_EMPTY;
741   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
742   char path[256];
743   const char *key_id;
744   const char *type = NULL, *want_cn = NULL;
745   int klen, n = 0, i, di, cnt;
746   void *vconn;
747   mtev_connection_ctx_t **ctxs;
748   mtev_conf_section_t *noit_configs;
749   struct timeval now, diff, last;
750   xmlNodePtr node;
751   mtev_http_request *req = mtev_http_session_request(restc->http_ctx);
752
753   mtev_http_process_querystring(req);
754   type = mtev_http_request_querystring(req, "type");
755   want_cn = mtev_http_request_querystring(req, "cn");
756
757   gettimeofday(&now, NULL);
758
759   pthread_mutex_lock(&noits_lock);
760   ctxs = malloc(sizeof(*ctxs) * mtev_hash_size(&noits));
761   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
762                        &vconn)) {
763     ctxs[n] = (mtev_connection_ctx_t *)vconn;
764     mtev_connection_ctx_ref(ctxs[n]);
765     n++;
766   }
767   pthread_mutex_unlock(&noits_lock);
768   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
769
770   doc = xmlNewDoc((xmlChar *)"1.0");
771   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
772   xmlDocSetRootElement(doc, root);
773
774   for(i=0; i<n; i++) {
775     char buff[256];
776     const char *feedtype = "unknown", *state = "unknown";
777     mtev_connection_ctx_t *ctx = ctxs[i];
778     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
779
780     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
781
782     /* If the user requested a specific type and we're not it, skip. */
783     if(type && strcmp(feedtype, type)) {
784         mtev_connection_ctx_deref(ctx);
785         continue;
786     }
787     /* If the user wants a specific CN... limit to that. */
788     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
789         mtev_connection_ctx_deref(ctx);
790         continue;
791     }
792
793     node = xmlNewNode(NULL, (xmlChar *)"noit");
794     snprintf(buff, sizeof(buff), "%llu.%06d",
795              (long long unsigned)ctx->last_connect.tv_sec,
796              (int)ctx->last_connect.tv_usec);
797     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
798     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
799                (xmlChar *)"connected" :
800                (ctx->retry_event ? (xmlChar *)"disconnected" :
801                                     (xmlChar *)"connecting"));
802     if(ctx->e) {
803       char buff[128];
804       const char *addrstr = NULL;
805       struct sockaddr_in6 addr6;
806       socklen_t len = sizeof(addr6);
807       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
808         unsigned short port = 0;
809         if(addr6.sin6_family == AF_INET) {
810           addrstr = inet_ntop(addr6.sin6_family,
811                               &((struct sockaddr_in *)&addr6)->sin_addr,
812                               buff, sizeof(buff));
813           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
814           port = ntohs(port);
815         }
816         else if(addr6.sin6_family == AF_INET6) {
817           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
818                               buff, sizeof(buff));
819           port = ntohs(addr6.sin6_port);
820         }
821         if(addrstr != NULL) {
822           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
823                    ":%u", port);
824           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
825         }
826       }
827     }
828     mtev_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
829                       0, free, NULL);
830     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
831     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
832     if(ctx->retry_event) {
833       sub_timeval(ctx->retry_event->whence, now, &diff);
834       snprintf(buff, sizeof(buff), "%llu.%06d",
835                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
836       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
837     }
838     else if(ctx->remote_cn) {
839       if(ctx->remote_cn)
840         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
841  
842       switch(jctx->state) {
843         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
844         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
845         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
846         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
847         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
848         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
849         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
850       }
851       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
852       snprintf(buff, sizeof(buff), "%08x:%08x",
853                jctx->header.chkpt.log, jctx->header.chkpt.marker);
854       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
855       snprintf(buff, sizeof(buff), "%llu",
856                (unsigned long long)jctx->total_events);
857       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
858       snprintf(buff, sizeof(buff), "%llu",
859                (unsigned long long)jctx->total_bytes_read);
860       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
861  
862       sub_timeval(now, ctx->last_connect, &diff);
863       snprintf(buff, sizeof(buff), "%lld.%06d",
864                (long long)diff.tv_sec, (int)diff.tv_usec);
865       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
866  
867       if(jctx->header.tv_sec) {
868         last.tv_sec = jctx->header.tv_sec;
869         last.tv_usec = jctx->header.tv_usec;
870         snprintf(buff, sizeof(buff), "%llu.%06d",
871                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
872         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
873         sub_timeval(now, last, &diff);
874         snprintf(buff, sizeof(buff), "%lld.%06d",
875                  (long long)diff.tv_sec, (int)diff.tv_usec);
876         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
877       }
878     }
879
880     xmlAddChild(root, node);
881     mtev_connection_ctx_deref(ctx);
882   }
883   free(ctxs);
884
885   if(!type || !strcmp(type, "configured")) {
886     snprintf(path, sizeof(path), "//noits//noit");
887     noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
888     for(di=0; di<cnt; di++) {
889       char address[64], port_str[32], remote_str[98];
890       char expected_cn_buff[256], *expected_cn = NULL;
891       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
892                                  expected_cn_buff, sizeof(expected_cn_buff)))
893         expected_cn = expected_cn_buff;
894       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
895       if(mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
896                                  address, sizeof(address))) {
897         void *v;
898         if(!mtev_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
899                                    port_str, sizeof(port_str)))
900           strlcpy(port_str, "43191", sizeof(port_str));
901
902         /* If the user wants a specific CN... limit to that. */
903           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
904             continue;
905
906         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
907         if(!mtev_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
908           node = xmlNewNode(NULL, (xmlChar *)"noit");
909           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
910           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
911           if(expected_cn)
912             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
913           xmlAddChild(root, node);
914         }
915       }
916     }
917     free(noit_configs);
918   }
919   mtev_hash_destroy(&seen, free, NULL);
920
921   mtev_http_response_ok(restc->http_ctx, "text/xml");
922   mtev_http_response_xml(restc->http_ctx, doc);
923   mtev_http_response_end(restc->http_ctx);
924   xmlFreeDoc(doc);
925   return 0;
926 }
927 static int
928 stratcon_add_noit(const char *target, unsigned short port,
929                   const char *cn) {
930   int cnt;
931   char path[256];
932   char port_str[6];
933   mtev_conf_section_t *noit_configs, parent;
934   xmlNodePtr newnoit, config, cnnode;
935
936   snprintf(path, sizeof(path),
937            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
938   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
939   free(noit_configs);
940   if(cnt != 0) return -1;
941
942   parent = mtev_conf_get_section(NULL, "//noits");
943   if(!parent) return -1;
944   snprintf(port_str, sizeof(port_str), "%d", port);
945   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
946   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
947   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
948   xmlAddChild(parent, newnoit);
949   if(cn) {
950     config = xmlNewNode(NULL, (xmlChar *)"config");
951     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
952     xmlNodeAddContent(cnnode, (xmlChar *)cn);
953     xmlAddChild(config, cnnode);
954     xmlAddChild(newnoit, config);
955     pthread_mutex_lock(&noit_ip_by_cn_lock);
956     mtev_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
957                       strdup(target), free, free);
958     pthread_mutex_unlock(&noit_ip_by_cn_lock);
959   }
960   if(stratcon_datastore_get_enabled())
961     stratcon_streamer_connection(NULL, target, "noit",
962                                  stratcon_jlog_recv_handler,
963                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
964                                  NULL,
965                                  jlog_streamer_ctx_free);
966   if(stratcon_iep_get_enabled())
967     stratcon_streamer_connection(NULL, target, "noit",
968                                  stratcon_jlog_recv_handler,
969                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
970                                  NULL,
971                                  jlog_streamer_ctx_free);
972   return 1;
973 }
974 static int
975 stratcon_remove_noit(const char *target, unsigned short port) {
976   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
977   const char *key_id;
978   int klen, n = -1, i, cnt = 0;
979   void *vconn;
980   mtev_connection_ctx_t **ctx;
981   mtev_conf_section_t *noit_configs;
982   char path[256];
983   char remote_str[256];
984
985   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
986
987   snprintf(path, sizeof(path),
988            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
989   noit_configs = mtev_conf_get_sections(NULL, path, &cnt);
990   for(i=0; i<cnt; i++) {
991     char expected_cn[256];
992     if(mtev_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
993                                expected_cn, sizeof(expected_cn))) {
994       pthread_mutex_lock(&noit_ip_by_cn_lock);
995       mtev_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
996                        free, free);
997       pthread_mutex_unlock(&noit_ip_by_cn_lock);
998     }
999     CONF_REMOVE(noit_configs[i]);
1000     xmlUnlinkNode(noit_configs[i]);
1001     xmlFreeNode(noit_configs[i]);
1002     n = 0;
1003   }
1004   free(noit_configs);
1005
1006   pthread_mutex_lock(&noits_lock);
1007   ctx = malloc(sizeof(*ctx) * mtev_hash_size(&noits));
1008   while(mtev_hash_next(&noits, &iter, &key_id, &klen,
1009                        &vconn)) {
1010     if(!strcmp(((mtev_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1011       ctx[n] = (mtev_connection_ctx_t *)vconn;
1012       mtev_connection_ctx_ref(ctx[n]);
1013       n++;
1014     }
1015   }
1016   pthread_mutex_unlock(&noits_lock);
1017   for(i=0; i<n; i++) {
1018     mtev_connection_ctx_dealloc(ctx[i]); /* once for the record */
1019     mtev_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1020   }
1021   free(ctx);
1022   return n;
1023 }
1024 static int
1025 rest_set_noit(mtev_http_rest_closure_t *restc,
1026               int npats, char **pats) {
1027   const char *cn = NULL;
1028   mtev_http_session_ctx *ctx = restc->http_ctx;
1029   mtev_http_request *req = mtev_http_session_request(ctx);
1030   unsigned short port = 43191;
1031   if(npats < 1 || npats > 2)
1032     mtev_http_response_server_error(ctx, "text/xml");
1033   if(npats == 2) port = atoi(pats[1]);
1034   mtev_http_process_querystring(req);
1035   cn = mtev_http_request_querystring(req, "cn");
1036   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1037     mtev_http_response_ok(ctx, "text/xml");
1038   else
1039     mtev_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1040   if(mtev_conf_write_file(NULL) != 0)
1041     mtevL(noit_error, "local config write failed\n");
1042   mtev_conf_mark_changed();
1043   mtev_http_response_end(ctx);
1044   return 0;
1045 }
1046 static int
1047 rest_delete_noit(mtev_http_rest_closure_t *restc,
1048                  int npats, char **pats) {
1049   mtev_http_session_ctx *ctx = restc->http_ctx;
1050   unsigned short port = 43191;
1051   if(npats < 1 || npats > 2)
1052     mtev_http_response_server_error(ctx, "text/xml");
1053   if(npats == 2) port = atoi(pats[1]);
1054   if(stratcon_remove_noit(pats[0], port) >= 0)
1055     mtev_http_response_ok(ctx, "text/xml");
1056   else
1057     mtev_http_response_not_found(ctx, "text/xml");
1058   if(mtev_conf_write_file(NULL) != 0)
1059     mtevL(noit_error, "local config write failed\n");
1060   mtev_conf_mark_changed();
1061   mtev_http_response_end(ctx);
1062   return 0;
1063 }
1064 static int
1065 stratcon_console_conf_noits(mtev_console_closure_t ncct,
1066                             int argc, char **argv,
1067                             mtev_console_state_t *dstate,
1068                             void *closure) {
1069   char *cp, target[128];
1070   unsigned short port = 43191;
1071   int adding = (int)(vpsized_int)closure;
1072   if(argc != 1)
1073     return -1;
1074
1075   cp = strchr(argv[0], ':');
1076   if(cp) {
1077     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1078     port = atoi(cp+1);
1079   }
1080   else strlcpy(target, argv[0], sizeof(target));
1081   if(adding) {
1082     if(stratcon_add_noit(target, port, NULL) >= 0) {
1083       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1084     }
1085     else {
1086       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1087     }
1088   }
1089   else {
1090     if(stratcon_remove_noit(target, port) >= 0) {
1091       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1092     }
1093     else {
1094       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1095     }
1096   }
1097   return 0;
1098 }
1099
1100 static void
1101 register_console_streamer_commands() {
1102   mtev_console_state_t *tl;
1103   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1104
1105   tl = mtev_console_state_initial();
1106   showcmd = mtev_console_state_get_cmd(tl, "show");
1107   assert(showcmd && showcmd->dstate);
1108   confcmd = mtev_console_state_get_cmd(tl, "configure");
1109   conftcmd = mtev_console_state_get_cmd(confcmd->dstate, "terminal");
1110   conftnocmd = mtev_console_state_get_cmd(conftcmd->dstate, "no");
1111
1112   mtev_console_state_add_cmd(conftcmd->dstate,
1113     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1114   mtev_console_state_add_cmd(conftnocmd->dstate,
1115     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1116
1117   mtev_console_state_add_cmd(showcmd->dstate,
1118     NCSCMD("noit", stratcon_console_show_noits,
1119            stratcon_console_noit_opts, NULL, (void *)1));
1120   mtev_console_state_add_cmd(showcmd->dstate,
1121     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1122 }
1123
1124 int
1125 stratcon_streamer_connection(const char *toplevel, const char *destination,
1126                              const char *type,
1127                              eventer_func_t handler,
1128                              void *(*handler_alloc)(void), void *handler_ctx,
1129                              void (*handler_free)(void *)) {
1130   return mtev_connections_from_config(&noits, &noits_lock,
1131                                       toplevel, destination, type,
1132                                       handler, handler_alloc, handler_ctx,
1133                                       handler_free);
1134 }
1135
1136 void
1137 stratcon_jlog_streamer_init(const char *toplevel) {
1138   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1139   struct in_addr remote;
1140   char uuid_str[UUID_STR_LEN + 1];
1141
1142   pthread_mutex_init(&noits_lock, NULL);
1143   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1144   eventer_name_callback("stratcon_jlog_recv_handler",
1145                         stratcon_jlog_recv_handler);
1146   register_console_streamer_commands();
1147   stratcon_jlog_streamer_reload(toplevel);
1148   stratcon_streamer_connection(toplevel, "", "noit", NULL, NULL, NULL, NULL);
1149   assert(mtev_http_rest_register_auth(
1150     "GET", "/noits/", "^show$", rest_show_noits,
1151              mtev_http_rest_client_cert_auth
1152   ) == 0);
1153   assert(mtev_http_rest_register_auth(
1154     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1155              mtev_http_rest_client_cert_auth
1156   ) == 0);
1157   assert(mtev_http_rest_register_auth(
1158     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1159              mtev_http_rest_client_cert_auth
1160   ) == 0);
1161   assert(mtev_http_rest_register_auth(
1162     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1163              mtev_http_rest_client_cert_auth
1164   ) == 0);
1165   assert(mtev_http_rest_register_auth(
1166     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1167              mtev_http_rest_client_cert_auth
1168   ) == 0);
1169
1170   uuid_clear(self_stratcon_id);
1171
1172   if(mtev_conf_get_stringbuf(NULL, "/stratcon/@id",
1173                              uuid_str, sizeof(uuid_str)) &&
1174      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1175     int period;
1176     mtev_conf_get_boolean(NULL, "/stratcon/@extended_id",
1177                           &stratcon_selfcheck_extended_id);
1178     /* If a UUID was provided for stratcon itself, we will report metrics
1179      * on a large variety of things (including all noits).
1180      */
1181     if(mtev_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1182        period > 0) {
1183       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1184       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1185     }
1186     self_stratcon_ip.sin_family = AF_INET;
1187     remote.s_addr = 0xffffffff;
1188     mtev_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1189     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1190     eventer_add_in(periodic_noit_metrics, NULL, whence);
1191   }
1192 }
1193
Note: See TracBrowser for help on using the browser.