root/src/stratcon_jlog_streamer.c

Revision 7b386ef4c8bb241dc157e198b028d156bf082dbc, 44.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 months ago)

Basic support for reverse tunneling of TCP connections over a noit SSL connection.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "dtrace_probes.h"
35 #include "eventer/eventer.h"
36 #include "noit_conf.h"
37 #include "utils/noit_hash.h"
38 #include "utils/noit_log.h"
39 #include "utils/noit_getip.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_rest.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_jlog_streamer.h"
44 #include "stratcon_iep.h"
45
46 #include <unistd.h>
47 #include <assert.h>
48 #include <errno.h>
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #ifdef HAVE_SYS_FILIO_H
52 #include <sys/filio.h>
53 #endif
54 #include <netinet/in.h>
55 #include <sys/un.h>
56 #include <arpa/inet.h>
57
58 pthread_mutex_t noits_lock;
59 noit_hash_table noits = NOIT_HASH_EMPTY;
60 pthread_mutex_t noit_ip_by_cn_lock;
61 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
62 static uuid_t self_stratcon_id;
63 static char self_stratcon_hostname[256] = "\0";
64 static struct sockaddr_in self_stratcon_ip;
65 static noit_boolean stratcon_selfcheck_extended_id = noit_true;
66
67 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
68
69 static const char *feed_type_to_str(int jlog_feed_cmd) {
70   switch(jlog_feed_cmd) {
71     case NOIT_JLOG_DATA_FEED: return "durable/storage";
72     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
73   }
74   return "unknown";
75 }
76
77 #define GET_EXPECTED_CN(nctx, cn) do { \
78   void *vcn; \
79   cn = NULL; \
80   if(nctx->config && \
81      noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
82      cn = vcn; \
83   } \
84 } while(0)
85 #define GET_FEEDTYPE(nctx, feedtype) do { \
86   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
87   feedtype = "unknown"; \
88   if(_jctx->push == stratcon_datastore_push) \
89     feedtype = "storage"; \
90   else if(_jctx->push == stratcon_iep_line_processor) \
91     feedtype = "iep"; \
92 } while(0)
93
94 static int
95 remote_str_sort(const void *a, const void *b) {
96   int rv;
97   noit_connection_ctx_t * const *actx = a;
98   noit_connection_ctx_t * const *bctx = b;
99   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
100   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
101   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
102   if(rv) return rv;
103   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
104            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
105 }
106 static void
107 nc_print_noit_conn_brief(noit_console_closure_t ncct,
108                           noit_connection_ctx_t *ctx) {
109   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
110   struct timeval now, diff, session_duration;
111   char cmdbuf[4096];
112   const char *feedtype = "unknown";
113   const char *lasttime = "never";
114   if(ctx->last_connect.tv_sec != 0) {
115     time_t r = ctx->last_connect.tv_sec;
116     struct tm tbuf, *tm;
117     tm = gmtime_r(&r, &tbuf);
118     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
119     lasttime = cmdbuf;
120   }
121   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
122             ctx->remote_cn ? "connected" :
123                              (ctx->retry_event ? "disconnected" :
124                                                    "connecting"), lasttime);
125   if(ctx->e) {
126     char buff[128];
127     const char *addrstr = NULL;
128     struct sockaddr_in6 addr6;
129     socklen_t len = sizeof(addr6);
130     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
131       unsigned short port = 0;
132       if(addr6.sin6_family == AF_INET) {
133         addrstr = inet_ntop(addr6.sin6_family,
134                             &((struct sockaddr_in *)&addr6)->sin_addr,
135                             buff, sizeof(buff));
136         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
137         port = ntohs(port);
138       }
139       else if(addr6.sin6_family == AF_INET6) {
140         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
141                             buff, sizeof(buff));
142         port = ntohs(addr6.sin6_port);
143       }
144       if(addrstr != NULL)
145         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
146       else
147         nc_printf(ncct, "\tLocal address not interpretable\n");
148     }
149     else {
150       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
151                 ctx->e->fd, strerror(errno));
152     }
153   }
154   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
155   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
156   gettimeofday(&now, NULL);
157   if(ctx->timeout_event) {
158     sub_timeval(ctx->timeout_event->whence, now, &diff);
159     nc_printf(ncct, "\tTimeout scheduled for %lld.%06us\n",
160               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
161   }
162   if(ctx->retry_event) {
163     sub_timeval(ctx->retry_event->whence, now, &diff);
164     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
165               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
166   }
167   else if(ctx->remote_cn) {
168     nc_printf(ncct, "\tRemote CN: '%s'\n",
169               ctx->remote_cn ? ctx->remote_cn : "???");
170     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
171       struct timeval last;
172       double session_duration_seconds;
173       const char *state = "unknown";
174
175       switch(jctx->state) {
176         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
177         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
178         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
179         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
180         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
181         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
182         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
183       }
184       last.tv_sec = jctx->header.tv_sec;
185       last.tv_usec = jctx->header.tv_usec;
186       sub_timeval(now, last, &diff);
187       sub_timeval(now, ctx->last_connect, &session_duration);
188       session_duration_seconds = session_duration.tv_sec +
189                                  (double)session_duration.tv_usec/1000000.0;
190       nc_printf(ncct, "\tState: %s\n"
191                       "\tNext checkpoint: [%08x:%08x]\n"
192                       "\tLast event: %lld.%06us ago\n"
193                       "\tEvents this session: %llu (%0.2f/s)\n"
194                       "\tOctets this session: %llu (%0.2f/s)\n",
195                 state,
196                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
197                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
198                 jctx->total_events,
199                 (double)jctx->total_events/session_duration_seconds,
200                 jctx->total_bytes_read,
201                 (double)jctx->total_bytes_read/session_duration_seconds);
202     }
203     else {
204       nc_printf(ncct, "\tUnknown type.\n");
205     }
206   }
207 }
208
209 jlog_streamer_ctx_t *
210 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
211   jlog_streamer_ctx_t *ctx;
212   ctx = stratcon_jlog_streamer_ctx_alloc();
213   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
214   ctx->push = stratcon_datastore_push;
215   return ctx;
216 }
217 jlog_streamer_ctx_t *
218 stratcon_jlog_streamer_ctx_alloc(void) {
219   jlog_streamer_ctx_t *ctx;
220   ctx = calloc(1, sizeof(*ctx));
221   return ctx;
222 }
223
224 void
225 jlog_streamer_ctx_free(void *cl) {
226   jlog_streamer_ctx_t *ctx = cl;
227   if(ctx->buffer) free(ctx->buffer);
228   free(ctx);
229 }
230
231 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
232 static int
233 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
234   int len, mask;
235   while(ctx->bytes_read < ctx->bytes_expected) {
236     len = Eread(ctx->buffer + ctx->bytes_read,
237                 ctx->bytes_expected - ctx->bytes_read);
238     if(len < 0) {
239       *newmask = mask;
240       return -1;
241     }
242     /* if we get 0 inside SSL, and there was a real error, we
243      * will actually get a -1 here.
244      * if(len == 0) return ctx->bytes_read;
245      */
246     ctx->total_bytes_read += len;
247     ctx->bytes_read += len;
248   }
249   assert(ctx->bytes_read == ctx->bytes_expected);
250   return ctx->bytes_read;
251 }
252 #define FULLREAD(e,ctx,size) do { \
253   int mask, len; \
254   if(!ctx->bytes_expected) { \
255     ctx->bytes_expected = size; \
256     if(ctx->buffer) free(ctx->buffer); \
257     ctx->buffer = malloc(size + 1); \
258     if(ctx->buffer == NULL) { \
259       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
260       goto socket_error; \
261     } \
262     ctx->buffer[size] = '\0'; \
263   } \
264   len = __read_on_ctx(e, ctx, &mask); \
265   if(len < 0) { \
266     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
267     noitL(noit_error, "[%s] [%s] SSL read error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)", \
268           nctx->remote_cn ? nctx->remote_cn : "(null)", \
269           strerror(errno)); \
270     goto socket_error; \
271   } \
272   ctx->bytes_read = 0; \
273   ctx->bytes_expected = 0; \
274   if(len != size) { \
275     noitL(noit_error, "[%s] [%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
276           nctx->remote_str ? nctx->remote_str : "(null)", \
277           nctx->remote_cn ? nctx->remote_cn : "(null)", \
278           ctx->state, len, (long unsigned int)size); \
279     goto socket_error; \
280   } \
281 } while(0)
282
283 int
284 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
285                            struct timeval *now) {
286   noit_connection_ctx_t *nctx = closure;
287   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
288   jlog_streamer_ctx_t dummy;
289   int len;
290   jlog_id n_chkpt;
291   const char *cn_expected, *feedtype;
292   GET_EXPECTED_CN(nctx, cn_expected);
293   GET_FEEDTYPE(nctx, feedtype);
294
295   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
296     if(write(e->fd, e, 0) == -1)
297       noitL(noit_error, "[%s] [%s] socket error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)",
298             nctx->remote_cn ? nctx->remote_cn : "(null)", strerror(errno));
299  socket_error:
300     ctx->state = JLOG_STREAMER_WANT_INITIATE;
301     ctx->count = 0;
302     ctx->needs_chkpt = 0;
303     ctx->bytes_read = 0;
304     ctx->bytes_expected = 0;
305     if(ctx->buffer) free(ctx->buffer);
306     ctx->buffer = NULL;
307     nctx->schedule_reattempt(nctx, now);
308     nctx->close(nctx, e);
309     return 0;
310   }
311
312   noit_connection_update_timeout(nctx);
313   while(1) {
314     switch(ctx->state) {
315       case JLOG_STREAMER_WANT_INITIATE:
316         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
317                               sizeof(ctx->jlog_feed_cmd),
318                               &mask, e);
319         if(len < 0) {
320           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
321           goto socket_error;
322         }
323         if(len != sizeof(ctx->jlog_feed_cmd)) {
324           noitL(noit_error, "[%s] [%s] short write [%d/%d] on initiating stream.\n",
325                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)",
326                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
327           goto socket_error;
328         }
329         ctx->state = JLOG_STREAMER_WANT_COUNT;
330         break;
331
332       case JLOG_STREAMER_WANT_ERROR:
333         FULLREAD(e, ctx, 0 - ctx->count);
334         noitL(noit_error, "[%s] [%s] %.*s\n", nctx->remote_str ? nctx->remote_str : "(null)",
335               nctx->remote_cn ? nctx->remote_cn : "(null)", 0 - ctx->count, ctx->buffer);
336         free(ctx->buffer); ctx->buffer = NULL;
337         goto socket_error;
338         break;
339
340       case JLOG_STREAMER_WANT_COUNT:
341         FULLREAD(e, ctx, sizeof(u_int32_t));
342         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
343         ctx->count = ntohl(dummy.count);
344         ctx->needs_chkpt = 0;
345         free(ctx->buffer); ctx->buffer = NULL;
346         STRATCON_STREAM_COUNT(e->fd, (char *)feedtype,
347                                    nctx->remote_str, (char *)cn_expected,
348                                    ctx->count);
349         if(ctx->count < 0)
350           ctx->state = JLOG_STREAMER_WANT_ERROR;
351         else
352           ctx->state = JLOG_STREAMER_WANT_HEADER;
353         break;
354
355       case JLOG_STREAMER_WANT_HEADER:
356         if(ctx->count == 0) {
357           ctx->state = JLOG_STREAMER_WANT_COUNT;
358           break;
359         }
360         FULLREAD(e, ctx, sizeof(ctx->header));
361         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
362         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
363         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
364         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
365         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
366         ctx->header.message_len = ntohl(dummy.header.message_len);
367         STRATCON_STREAM_HEADER(e->fd, (char *)feedtype,
368                                     nctx->remote_str, (char *)cn_expected,
369                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
370                                     ctx->header.tv_sec, ctx->header.tv_usec,
371                                     ctx->header.message_len);
372         free(ctx->buffer); ctx->buffer = NULL;
373         ctx->state = JLOG_STREAMER_WANT_BODY;
374         break;
375
376       case JLOG_STREAMER_WANT_BODY:
377         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
378         STRATCON_STREAM_BODY(e->fd, (char *)feedtype,
379                                   nctx->remote_str, (char *)cn_expected,
380                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
381                                   ctx->header.tv_sec, ctx->header.tv_usec,
382                                   ctx->buffer);
383         if(ctx->header.message_len > 0) {
384           ctx->needs_chkpt = 1;
385           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
386                     ctx->buffer, NULL);
387         }
388         else if(ctx->buffer)
389           free(ctx->buffer);
390         /* Don't free the buffer, it's used by the datastore process. */
391         ctx->buffer = NULL;
392         ctx->count--;
393         ctx->total_events++;
394         if(ctx->count == 0 && ctx->needs_chkpt) {
395           eventer_t completion_e;
396           eventer_remove_fd(e->fd);
397           completion_e = eventer_alloc();
398           memcpy(completion_e, e, sizeof(*e));
399           nctx->e = completion_e;
400           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
401           ctx->state = JLOG_STREAMER_IS_ASYNC;
402           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
403                     NULL, completion_e);
404           noitL(noit_debug, "Pushing %s batch async [%s] [%s]: [%u/%u]\n",
405                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
406                 nctx->remote_str ? nctx->remote_str : "(null)",
407                 nctx->remote_cn ? nctx->remote_cn : "(null)",
408                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
409           noit_connection_disable_timeout(nctx);
410           return 0;
411         }
412         else if(ctx->count == 0)
413           ctx->state = JLOG_STREAMER_WANT_CHKPT;
414         else
415           ctx->state = JLOG_STREAMER_WANT_HEADER;
416         break;
417
418       case JLOG_STREAMER_IS_ASYNC:
419         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
420       case JLOG_STREAMER_WANT_CHKPT:
421         noitL(noit_debug, "Pushing %s checkpoint [%s] [%s]: [%u/%u]\n",
422               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
423               nctx->remote_str ? nctx->remote_str : "(null)",
424               nctx->remote_cn ? nctx->remote_cn : "(null)",
425               ctx->header.chkpt.log, ctx->header.chkpt.marker);
426         n_chkpt.log = htonl(ctx->header.chkpt.log);
427         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
428
429         /* screw short writes.  I'd rather die than not write my data! */
430         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
431                               &mask, e);
432         if(len < 0) {
433           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
434           goto socket_error;
435         }
436         if(len != sizeof(jlog_id)) {
437           noitL(noit_error, "[%s] [%s] short write on checkpointing stream.\n",
438             nctx->remote_str ? nctx->remote_str : "(null)",
439             nctx->remote_cn ? nctx->remote_cn : "(null)");
440           goto socket_error;
441         }
442         STRATCON_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
443                                         nctx->remote_str, (char *)cn_expected,
444                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
445         ctx->state = JLOG_STREAMER_WANT_COUNT;
446         break;
447     }
448   }
449   /* never get here */
450 }
451
452
453 int
454 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
455   int rv = -1;
456   void *vip;
457   pthread_mutex_lock(&noit_ip_by_cn_lock);
458   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
459     int new_len;
460     char *new_ip = (char *)vip;
461     new_len = strlen(new_ip);
462     strlcpy(ip, new_ip, len);
463     if(new_len >= len) rv = new_len+1;
464     else rv = 0;
465   }
466   pthread_mutex_unlock(&noit_ip_by_cn_lock);
467   return rv;
468 }
469 void
470 stratcon_jlog_streamer_recache_noit() {
471   int di, cnt;
472   noit_conf_section_t *noit_configs;
473   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
474   pthread_mutex_lock(&noit_ip_by_cn_lock);
475   noit_hash_delete_all(&noit_ip_by_cn, free, free);
476   for(di=0; di<cnt; di++) {
477     char address[64];
478     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
479                                  address, sizeof(address))) {
480       char expected_cn[256];
481       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
482                                  expected_cn, sizeof(expected_cn)))
483         noit_hash_store(&noit_ip_by_cn,
484                         strdup(expected_cn), strlen(expected_cn),
485                         strdup(address));
486     }
487   }
488   free(noit_configs);
489   pthread_mutex_unlock(&noit_ip_by_cn_lock);
490 }
491 void
492 stratcon_jlog_streamer_reload(const char *toplevel) {
493   /* flush and repopulate the cn cache */
494   stratcon_jlog_streamer_recache_noit();
495   if(!stratcon_datastore_get_enabled()) return;
496   stratcon_streamer_connection(toplevel, NULL, "noit",
497                                stratcon_jlog_recv_handler,
498                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
499                                NULL,
500                                jlog_streamer_ctx_free);
501 }
502
503 char *
504 stratcon_console_noit_opts(noit_console_closure_t ncct,
505                            noit_console_state_stack_t *stack,
506                            noit_console_state_t *dstate,
507                            int argc, char **argv, int idx) {
508   if(argc == 1) {
509     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
510     const char *key_id;
511     int klen, i = 0;
512     void *vconn, *vcn;
513     noit_connection_ctx_t *ctx;
514     noit_hash_table dedup = NOIT_HASH_EMPTY;
515
516     pthread_mutex_lock(&noits_lock);
517     while(noit_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
518       ctx = (noit_connection_ctx_t *)vconn;
519       vcn = NULL;
520       if(ctx->config && noit_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
521          !noit_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
522         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
523           if(idx == i) {
524             pthread_mutex_unlock(&noits_lock);
525             noit_hash_destroy(&dedup, NULL, NULL);
526             return strdup(vcn);
527           }
528           i++;
529         }
530       }
531       if(ctx->remote_str &&
532          !noit_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
533         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
534           if(idx == i) {
535             pthread_mutex_unlock(&noits_lock);
536             noit_hash_destroy(&dedup, NULL, NULL);
537             return strdup(ctx->remote_str);
538           }
539           i++;
540         }
541       }
542     }
543     pthread_mutex_unlock(&noits_lock);
544     noit_hash_destroy(&dedup, NULL, NULL);
545   }
546   if(argc == 2)
547     return noit_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
548   return NULL;
549 }
550 static int
551 stratcon_console_show_noits(noit_console_closure_t ncct,
552                             int argc, char **argv,
553                             noit_console_state_t *dstate,
554                             void *closure) {
555   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
556   const char *key_id, *ecn;
557   int klen, n = 0, i;
558   void *vconn;
559   noit_connection_ctx_t **ctx;
560
561   if(closure != (void *)0 && argc == 0) {
562     nc_printf(ncct, "takes an argument\n");
563     return 0;
564   }
565   if(closure == (void *)0 && argc > 0) {
566     nc_printf(ncct, "takes no arguments\n");
567     return 0;
568   }
569   pthread_mutex_lock(&noits_lock);
570   ctx = malloc(sizeof(*ctx) * noits.size);
571   while(noit_hash_next(&noits, &iter, &key_id, &klen,
572                        &vconn)) {
573     ctx[n] = (noit_connection_ctx_t *)vconn;
574     if(argc == 0 ||
575        !strcmp(ctx[n]->remote_str, argv[0]) ||
576        (ctx[n]->config && noit_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
577         !strcmp(ecn, argv[0]))) {
578       noit_connection_ctx_ref(ctx[n]);
579       n++;
580     }
581   }
582   pthread_mutex_unlock(&noits_lock);
583   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
584   for(i=0; i<n; i++) {
585     nc_print_noit_conn_brief(ncct, ctx[i]);
586     noit_connection_ctx_deref(ctx[i]);
587   }
588   free(ctx);
589   return 0;
590 }
591
592 static void
593 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
594                        noit_connection_ctx_t *nctx) {
595   struct timeval last, session_duration, diff;
596   u_int64_t session_duration_ms, last_event_ms;
597   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
598   char str[1024], *wr;
599   int len;
600   const char *cn_expected;
601   const char *feedtype = "unknown";
602
603   GET_FEEDTYPE(nctx, feedtype);
604   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
605
606   GET_EXPECTED_CN(nctx, cn_expected);
607   if(!cn_expected) return;
608
609   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
610            (long unsigned int)now->tv_sec,
611            (long unsigned int)now->tv_usec/1000UL,
612            uuid_str, cn_expected, feedtype);
613   wr = str + strlen(str);
614   len = sizeof(str) - (wr - str);
615
616   /* Now we write NAME TYPE VALUE into wr each time and push it */
617 #define push_noit_m_str(name, value) do { \
618   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
619   stratcon_datastore_push(DS_OP_INSERT, \
620                           (struct sockaddr *)&self_stratcon_ip, \
621                           self_stratcon_hostname, strdup(str), NULL); \
622   stratcon_iep_line_processor(DS_OP_INSERT, \
623                               (struct sockaddr *)&self_stratcon_ip, \
624                               self_stratcon_hostname, strdup(str), NULL); \
625 } while(0)
626 #define push_noit_m_u64(name, value) do { \
627   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
628   stratcon_datastore_push(DS_OP_INSERT, \
629                           (struct sockaddr *)&self_stratcon_ip, \
630                           self_stratcon_hostname, strdup(str), NULL); \
631   stratcon_iep_line_processor(DS_OP_INSERT, \
632                               (struct sockaddr *)&self_stratcon_ip, \
633                               self_stratcon_hostname, strdup(str), NULL); \
634 } while(0)
635
636   last.tv_sec = jctx->header.tv_sec;
637   last.tv_usec = jctx->header.tv_usec;
638   sub_timeval(*now, last, &diff);
639   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
640   sub_timeval(*now, nctx->last_connect, &session_duration);
641   session_duration_ms = session_duration.tv_sec * 1000 +
642                         session_duration.tv_usec / 1000;
643
644   push_noit_m_str("state", nctx->remote_cn ? "connected" :
645                              (nctx->retry_event ? "disconnected" :
646                                                   "connecting"));
647   push_noit_m_u64("last_event_age_ms", last_event_ms);
648   push_noit_m_u64("session_length_ms", session_duration_ms);
649 }
650 static int
651 periodic_noit_metrics(eventer_t e, int mask, void *closure,
652                       struct timeval *now) {
653   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
654   noit_connection_ctx_t **ctxs;
655   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
656   const char *key_id;
657   void *vconn;
658   int klen, n = 0, i;
659   char str[1024];
660   char uuid_str[1024], tmp_uuid_str[UUID_STR_LEN+1];
661   struct timeval epoch, diff;
662   u_int64_t uptime = 0;
663   char ip_str[128];
664
665   inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
666             sizeof(ip_str));
667
668   uuid_str[0] = '\0';
669   uuid_unparse_lower(self_stratcon_id, tmp_uuid_str);
670   if(stratcon_selfcheck_extended_id) {
671     strlcat(uuid_str, ip_str, sizeof(uuid_str)-37);
672     strlcat(uuid_str, "`selfcheck`selfcheck`", sizeof(uuid_str)-37);
673   }
674   strlcat(uuid_str, tmp_uuid_str, sizeof(uuid_str));
675
676 #define PUSH_BOTH(type, str) do { \
677   stratcon_datastore_push(type, \
678                           (struct sockaddr *)&self_stratcon_ip, \
679                           self_stratcon_hostname, str, NULL); \
680   stratcon_iep_line_processor(type, \
681                               (struct sockaddr *)&self_stratcon_ip, \
682                               self_stratcon_hostname, str, NULL); \
683 } while(0)
684
685   if(closure == NULL) {
686     /* Only do this the first time we get called */
687     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
688              (long unsigned int)now->tv_sec,
689              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
690              self_stratcon_hostname);
691     PUSH_BOTH(DS_OP_INSERT, strdup(str));
692   }
693
694   pthread_mutex_lock(&noits_lock);
695   ctxs = malloc(sizeof(*ctxs) * noits.size);
696   while(noit_hash_next(&noits, &iter, &key_id, &klen,
697                        &vconn)) {
698     ctxs[n] = (noit_connection_ctx_t *)vconn;
699     noit_connection_ctx_ref(ctxs[n]);
700     n++;
701   }
702   pthread_mutex_unlock(&noits_lock);
703
704   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
705            (long unsigned int)now->tv_sec,
706            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
707   PUSH_BOTH(DS_OP_INSERT, strdup(str));
708
709   if(eventer_get_epoch(&epoch) != 0)
710     memcpy(&epoch, now, sizeof(epoch));
711   sub_timeval(*now, epoch, &diff);
712   uptime = diff.tv_sec;
713   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
714            (long unsigned int)now->tv_sec,
715            (long unsigned int)now->tv_usec/1000UL,
716            uuid_str, (long long unsigned int)uptime);
717   PUSH_BOTH(DS_OP_INSERT, strdup(str));
718
719   for(i=0; i<n; i++) {
720     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
721     noit_connection_ctx_deref(ctxs[i]);
722   }
723   free(ctxs);
724   PUSH_BOTH(DS_OP_CHKPT, NULL);
725
726   add_timeval(e->whence, whence, &whence);
727   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
728   return 0;
729 }
730
731 static int
732 rest_show_noits(noit_http_rest_closure_t *restc,
733                 int npats, char **pats) {
734   xmlDocPtr doc;
735   xmlNodePtr root;
736   noit_hash_table seen = NOIT_HASH_EMPTY;
737   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
738   char path[256];
739   const char *key_id;
740   const char *type = NULL, *want_cn = NULL;
741   int klen, n = 0, i, di, cnt;
742   void *vconn;
743   noit_connection_ctx_t **ctxs;
744   noit_conf_section_t *noit_configs;
745   struct timeval now, diff, last;
746   xmlNodePtr node;
747   noit_http_request *req = noit_http_session_request(restc->http_ctx);
748
749   noit_http_process_querystring(req);
750   type = noit_http_request_querystring(req, "type");
751   want_cn = noit_http_request_querystring(req, "cn");
752
753   gettimeofday(&now, NULL);
754
755   pthread_mutex_lock(&noits_lock);
756   ctxs = malloc(sizeof(*ctxs) * noits.size);
757   while(noit_hash_next(&noits, &iter, &key_id, &klen,
758                        &vconn)) {
759     ctxs[n] = (noit_connection_ctx_t *)vconn;
760     noit_connection_ctx_ref(ctxs[n]);
761     n++;
762   }
763   pthread_mutex_unlock(&noits_lock);
764   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
765
766   doc = xmlNewDoc((xmlChar *)"1.0");
767   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
768   xmlDocSetRootElement(doc, root);
769
770   for(i=0; i<n; i++) {
771     char buff[256];
772     const char *feedtype = "unknown", *state = "unknown";
773     noit_connection_ctx_t *ctx = ctxs[i];
774     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
775
776     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
777
778     /* If the user requested a specific type and we're not it, skip. */
779     if(type && strcmp(feedtype, type)) {
780         noit_connection_ctx_deref(ctx);
781         continue;
782     }
783     /* If the user wants a specific CN... limit to that. */
784     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
785         noit_connection_ctx_deref(ctx);
786         continue;
787     }
788
789     node = xmlNewNode(NULL, (xmlChar *)"noit");
790     snprintf(buff, sizeof(buff), "%llu.%06d",
791              (long long unsigned)ctx->last_connect.tv_sec,
792              (int)ctx->last_connect.tv_usec);
793     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
794     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
795                (xmlChar *)"connected" :
796                (ctx->retry_event ? (xmlChar *)"disconnected" :
797                                     (xmlChar *)"connecting"));
798     if(ctx->e) {
799       char buff[128];
800       const char *addrstr = NULL;
801       struct sockaddr_in6 addr6;
802       socklen_t len = sizeof(addr6);
803       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
804         unsigned short port = 0;
805         if(addr6.sin6_family == AF_INET) {
806           addrstr = inet_ntop(addr6.sin6_family,
807                               &((struct sockaddr_in *)&addr6)->sin_addr,
808                               buff, sizeof(buff));
809           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
810           port = ntohs(port);
811         }
812         else if(addr6.sin6_family == AF_INET6) {
813           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
814                               buff, sizeof(buff));
815           port = ntohs(addr6.sin6_port);
816         }
817         if(addrstr != NULL) {
818           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
819                    ":%u", port);
820           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
821         }
822       }
823     }
824     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
825                       0, free, NULL);
826     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
827     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
828     if(ctx->retry_event) {
829       sub_timeval(ctx->retry_event->whence, now, &diff);
830       snprintf(buff, sizeof(buff), "%llu.%06d",
831                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
832       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
833     }
834     else if(ctx->remote_cn) {
835       if(ctx->remote_cn)
836         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
837  
838       switch(jctx->state) {
839         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
840         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
841         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
842         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
843         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
844         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
845         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
846       }
847       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
848       snprintf(buff, sizeof(buff), "%08x:%08x",
849                jctx->header.chkpt.log, jctx->header.chkpt.marker);
850       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
851       snprintf(buff, sizeof(buff), "%llu",
852                (unsigned long long)jctx->total_events);
853       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
854       snprintf(buff, sizeof(buff), "%llu",
855                (unsigned long long)jctx->total_bytes_read);
856       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
857  
858       sub_timeval(now, ctx->last_connect, &diff);
859       snprintf(buff, sizeof(buff), "%lld.%06d",
860                (long long)diff.tv_sec, (int)diff.tv_usec);
861       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
862  
863       if(jctx->header.tv_sec) {
864         last.tv_sec = jctx->header.tv_sec;
865         last.tv_usec = jctx->header.tv_usec;
866         snprintf(buff, sizeof(buff), "%llu.%06d",
867                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
868         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
869         sub_timeval(now, last, &diff);
870         snprintf(buff, sizeof(buff), "%lld.%06d",
871                  (long long)diff.tv_sec, (int)diff.tv_usec);
872         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
873       }
874     }
875
876     xmlAddChild(root, node);
877     noit_connection_ctx_deref(ctx);
878   }
879   free(ctxs);
880
881   if(!type || !strcmp(type, "configured")) {
882     snprintf(path, sizeof(path), "//noits//noit");
883     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
884     for(di=0; di<cnt; di++) {
885       char address[64], port_str[32], remote_str[98];
886       char expected_cn_buff[256], *expected_cn = NULL;
887       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
888                                  expected_cn_buff, sizeof(expected_cn_buff)))
889         expected_cn = expected_cn_buff;
890       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
891       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
892                                  address, sizeof(address))) {
893         void *v;
894         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
895                                    port_str, sizeof(port_str)))
896           strlcpy(port_str, "43191", sizeof(port_str));
897
898         /* If the user wants a specific CN... limit to that. */
899           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
900             continue;
901
902         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
903         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
904           node = xmlNewNode(NULL, (xmlChar *)"noit");
905           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
906           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
907           if(expected_cn)
908             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
909           xmlAddChild(root, node);
910         }
911       }
912     }
913     free(noit_configs);
914   }
915   noit_hash_destroy(&seen, free, NULL);
916
917   noit_http_response_ok(restc->http_ctx, "text/xml");
918   noit_http_response_xml(restc->http_ctx, doc);
919   noit_http_response_end(restc->http_ctx);
920   xmlFreeDoc(doc);
921   return 0;
922 }
923 static int
924 stratcon_add_noit(const char *target, unsigned short port,
925                   const char *cn) {
926   int cnt;
927   char path[256];
928   char port_str[6];
929   noit_conf_section_t *noit_configs, parent;
930   xmlNodePtr newnoit, config, cnnode;
931
932   snprintf(path, sizeof(path),
933            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
934   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
935   free(noit_configs);
936   if(cnt != 0) return -1;
937
938   parent = noit_conf_get_section(NULL, "//noits");
939   if(!parent) return -1;
940   snprintf(port_str, sizeof(port_str), "%d", port);
941   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
942   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
943   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
944   xmlAddChild(parent, newnoit);
945   if(cn) {
946     config = xmlNewNode(NULL, (xmlChar *)"config");
947     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
948     xmlNodeAddContent(cnnode, (xmlChar *)cn);
949     xmlAddChild(config, cnnode);
950     xmlAddChild(newnoit, config);
951     pthread_mutex_lock(&noit_ip_by_cn_lock);
952     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
953                       strdup(target), free, free);
954     pthread_mutex_unlock(&noit_ip_by_cn_lock);
955   }
956   if(stratcon_datastore_get_enabled())
957     stratcon_streamer_connection(NULL, target, "noit",
958                                  stratcon_jlog_recv_handler,
959                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
960                                  NULL,
961                                  jlog_streamer_ctx_free);
962   if(stratcon_iep_get_enabled())
963     stratcon_streamer_connection(NULL, target, "noit",
964                                  stratcon_jlog_recv_handler,
965                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
966                                  NULL,
967                                  jlog_streamer_ctx_free);
968   return 1;
969 }
970 static int
971 stratcon_remove_noit(const char *target, unsigned short port) {
972   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
973   const char *key_id;
974   int klen, n = -1, i, cnt = 0;
975   void *vconn;
976   noit_connection_ctx_t **ctx;
977   noit_conf_section_t *noit_configs;
978   char path[256];
979   char remote_str[256];
980
981   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
982
983   snprintf(path, sizeof(path),
984            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
985   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
986   for(i=0; i<cnt; i++) {
987     char expected_cn[256];
988     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
989                                expected_cn, sizeof(expected_cn))) {
990       pthread_mutex_lock(&noit_ip_by_cn_lock);
991       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
992                        free, free);
993       pthread_mutex_unlock(&noit_ip_by_cn_lock);
994     }
995     CONF_REMOVE(noit_configs[i]);
996     xmlUnlinkNode(noit_configs[i]);
997     xmlFreeNode(noit_configs[i]);
998     n = 0;
999   }
1000   free(noit_configs);
1001
1002   pthread_mutex_lock(&noits_lock);
1003   ctx = malloc(sizeof(*ctx) * noits.size);
1004   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1005                        &vconn)) {
1006     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1007       ctx[n] = (noit_connection_ctx_t *)vconn;
1008       noit_connection_ctx_ref(ctx[n]);
1009       n++;
1010     }
1011   }
1012   pthread_mutex_unlock(&noits_lock);
1013   for(i=0; i<n; i++) {
1014     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1015     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1016   }
1017   free(ctx);
1018   return n;
1019 }
1020 static int
1021 rest_set_noit(noit_http_rest_closure_t *restc,
1022               int npats, char **pats) {
1023   const char *cn = NULL;
1024   noit_http_session_ctx *ctx = restc->http_ctx;
1025   noit_http_request *req = noit_http_session_request(ctx);
1026   unsigned short port = 43191;
1027   if(npats < 1 || npats > 2)
1028     noit_http_response_server_error(ctx, "text/xml");
1029   if(npats == 2) port = atoi(pats[1]);
1030   noit_http_process_querystring(req);
1031   cn = noit_http_request_querystring(req, "cn");
1032   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1033     noit_http_response_ok(ctx, "text/xml");
1034   else
1035     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1036   if(noit_conf_write_file(NULL) != 0)
1037     noitL(noit_error, "local config write failed\n");
1038   noit_conf_mark_changed();
1039   noit_http_response_end(ctx);
1040   return 0;
1041 }
1042 static int
1043 rest_delete_noit(noit_http_rest_closure_t *restc,
1044                  int npats, char **pats) {
1045   noit_http_session_ctx *ctx = restc->http_ctx;
1046   unsigned short port = 43191;
1047   if(npats < 1 || npats > 2)
1048     noit_http_response_server_error(ctx, "text/xml");
1049   if(npats == 2) port = atoi(pats[1]);
1050   if(stratcon_remove_noit(pats[0], port) >= 0)
1051     noit_http_response_ok(ctx, "text/xml");
1052   else
1053     noit_http_response_not_found(ctx, "text/xml");
1054   if(noit_conf_write_file(NULL) != 0)
1055     noitL(noit_error, "local config write failed\n");
1056   noit_conf_mark_changed();
1057   noit_http_response_end(ctx);
1058   return 0;
1059 }
1060 static int
1061 stratcon_console_conf_noits(noit_console_closure_t ncct,
1062                             int argc, char **argv,
1063                             noit_console_state_t *dstate,
1064                             void *closure) {
1065   char *cp, target[128];
1066   unsigned short port = 43191;
1067   int adding = (int)(vpsized_int)closure;
1068   if(argc != 1)
1069     return -1;
1070
1071   cp = strchr(argv[0], ':');
1072   if(cp) {
1073     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1074     port = atoi(cp+1);
1075   }
1076   else strlcpy(target, argv[0], sizeof(target));
1077   if(adding) {
1078     if(stratcon_add_noit(target, port, NULL) >= 0) {
1079       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1080     }
1081     else {
1082       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1083     }
1084   }
1085   else {
1086     if(stratcon_remove_noit(target, port) >= 0) {
1087       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1088     }
1089     else {
1090       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1091     }
1092   }
1093   return 0;
1094 }
1095
1096 static void
1097 register_console_streamer_commands() {
1098   noit_console_state_t *tl;
1099   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1100
1101   tl = noit_console_state_initial();
1102   showcmd = noit_console_state_get_cmd(tl, "show");
1103   assert(showcmd && showcmd->dstate);
1104   confcmd = noit_console_state_get_cmd(tl, "configure");
1105   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1106   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1107
1108   noit_console_state_add_cmd(conftcmd->dstate,
1109     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1110   noit_console_state_add_cmd(conftnocmd->dstate,
1111     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1112
1113   noit_console_state_add_cmd(showcmd->dstate,
1114     NCSCMD("noit", stratcon_console_show_noits,
1115            stratcon_console_noit_opts, NULL, (void *)1));
1116   noit_console_state_add_cmd(showcmd->dstate,
1117     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1118 }
1119
1120 int
1121 stratcon_streamer_connection(const char *toplevel, const char *destination,
1122                              const char *type,
1123                              eventer_func_t handler,
1124                              void *(*handler_alloc)(void), void *handler_ctx,
1125                              void (*handler_free)(void *)) {
1126   return noit_connections_from_config(&noits, &noits_lock,
1127                                       toplevel, destination, type,
1128                                       handler, handler_alloc, handler_ctx,
1129                                       handler_free);
1130 }
1131
1132 void
1133 stratcon_jlog_streamer_init(const char *toplevel) {
1134   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1135   struct in_addr remote;
1136   char uuid_str[UUID_STR_LEN + 1];
1137
1138   pthread_mutex_init(&noits_lock, NULL);
1139   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1140   eventer_name_callback("stratcon_jlog_recv_handler",
1141                         stratcon_jlog_recv_handler);
1142   register_console_streamer_commands();
1143   stratcon_jlog_streamer_reload(toplevel);
1144   stratcon_streamer_connection(toplevel, "", "noit", NULL, NULL, NULL, NULL);
1145   assert(noit_http_rest_register_auth(
1146     "GET", "/noits/", "^show$", rest_show_noits,
1147              noit_http_rest_client_cert_auth
1148   ) == 0);
1149   assert(noit_http_rest_register_auth(
1150     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1151              noit_http_rest_client_cert_auth
1152   ) == 0);
1153   assert(noit_http_rest_register_auth(
1154     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1155              noit_http_rest_client_cert_auth
1156   ) == 0);
1157   assert(noit_http_rest_register_auth(
1158     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1159              noit_http_rest_client_cert_auth
1160   ) == 0);
1161   assert(noit_http_rest_register_auth(
1162     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1163              noit_http_rest_client_cert_auth
1164   ) == 0);
1165
1166   uuid_clear(self_stratcon_id);
1167
1168   if(noit_conf_get_stringbuf(NULL, "/stratcon/@id",
1169                              uuid_str, sizeof(uuid_str)) &&
1170      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1171     int period;
1172     noit_conf_get_boolean(NULL, "/stratcon/@extended_id",
1173                           &stratcon_selfcheck_extended_id);
1174     /* If a UUID was provided for stratcon itself, we will report metrics
1175      * on a large variety of things (including all noits).
1176      */
1177     if(noit_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1178        period > 0) {
1179       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1180       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1181     }
1182     self_stratcon_ip.sin_family = AF_INET;
1183     remote.s_addr = 0xffffffff;
1184     noit_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1185     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1186     eventer_add_in(periodic_noit_metrics, NULL, whence);
1187   }
1188 }
1189
Note: See TracBrowser for help on using the browser.