Changeset 4a653ea2ad1a5c2d4fa77037753a169291289a11

Show
Ignore:
Timestamp:
07/07/11 16:33:42 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1310056422 -0400
git-parent:

[485f3e7e53ef813c36bb393170d6c44997614545]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1310056422 -0400
Message:

make the postgres_ingestor work with the B messages

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/modules/postgres_ingestor.c

    rd10f13b r4a653ea  
    4444#include "noit_conf.h" 
    4545#include "noit_check.h" 
     46#include "noit_check_log_helpers.h" 
    4647#include "noit_rest.h" 
    4748#include <unistd.h> 
     
    5758#include <errno.h> 
    5859#include "postgres_ingestor.xmlh" 
     60#include "bundle.pb-c.h" 
    5961 
    6062#define DECL_STMT(codename,confname) \ 
     
    10371039  return info ? info->dsn : NULL; 
    10381040} 
     1041static void 
     1042expand_b_record(ds_line_detail **head, ds_line_detail **last, 
     1043                const char *line, int len) { 
     1044  char **outrows; 
     1045  int i, cnt; 
     1046  ds_line_detail *next; 
     1047 
     1048  cnt = noit_check_log_b_to_sm(line, len, &outrows); 
     1049  for(i=0;i<cnt;i++) { 
     1050    if(outrows[i] == NULL) continue; 
     1051    next = calloc(sizeof(*next), 1); 
     1052    next->data = outrows[i]; 
     1053    if(!*head) *head = next; 
     1054    if(*last) (*last)->next = next; 
     1055    *last = next; 
     1056  } 
     1057  if(outrows) free(outrows); 
     1058} 
    10391059static ds_line_detail * 
    10401060build_insert_batch(pg_interim_journal_t *ij) { 
     
    10701090    while(lcp < (buff + len) && 
    10711091          NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) { 
    1072       next = calloc(1, sizeof(*next)); 
    1073       next->data = malloc(cp - lcp + 1); 
    1074       memcpy(next->data, lcp, cp - lcp); 
    1075       next->data[cp - lcp] = '\0'; 
    1076       if(!head) head = next; 
    1077       if(last) last->next = next; 
    1078       last = next; 
     1092      if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') { 
     1093      /* Bundle records are special and need to be expanded into 
     1094       * traditional records here 
     1095       */ 
     1096        noit_compression_type_t ctype = NOIT_COMPRESS_NONE; 
     1097        switch(lcp[1]) { 
     1098          case '1': /* version 1 */ 
     1099            ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */ 
     1100          case '2': /* version 2 */ 
     1101              expand_b_record(&head, &last, lcp, cp - lcp); 
     1102            break; 
     1103          default: 
     1104            noitL(noit_error, "unknown bundle version %c\n", lcp[1]); 
     1105        } 
     1106      } 
     1107      else { 
     1108        next = calloc(1, sizeof(*next)); 
     1109        next->data = malloc(cp - lcp + 1); 
     1110        memcpy(next->data, lcp, cp - lcp); 
     1111        next->data[cp - lcp] = '\0'; 
     1112        if(!head) head = next; 
     1113        if(last) last->next = next; 
     1114        last = next; 
     1115      } 
    10791116      lcp = cp + 1; 
    10801117    } 
  • src/noit_check_log.c

    r37e31fd r4a653ea  
    424424 
    425425int 
    426 noit_stats_snprint_metric_value(char *b, int l, metric_t *m) { 
    427   int rv; 
    428   if(!m->metric_value.s) { /* they are all null */ 
    429     rv = snprintf(b, l, "[[null]]"); 
    430   } 
    431   else { 
    432     switch(m->metric_type) { 
    433       case METRIC_INT32: 
    434         rv = snprintf(b, l, "%d", *(m->metric_value.i)); break; 
    435       case METRIC_UINT32: 
    436         rv = snprintf(b, l, "%u", *(m->metric_value.I)); break; 
    437       case METRIC_INT64: 
    438         rv = snprintf(b, l, "%lld", (long long int)*(m->metric_value.l)); break; 
    439       case METRIC_UINT64: 
    440         rv = snprintf(b, l, "%llu", 
    441                       (long long unsigned int)*(m->metric_value.L)); break; 
    442       case METRIC_DOUBLE: 
    443         rv = snprintf(b, l, "%.12e", *(m->metric_value.n)); break; 
    444       case METRIC_STRING: 
    445         rv = snprintf(b, l, "%s", m->metric_value.s); break; 
    446       default: 
    447         return -1; 
    448     } 
    449   } 
    450   return rv; 
    451 } 
    452 int 
    453426noit_stats_snprint_metric(char *b, int l, metric_t *m) { 
    454427  int rv, nl; 
     
    460433  return rv + nl; 
    461434} 
     435 
  • src/noit_check_log_helpers.c

    r37e31fd r4a653ea  
    3232 
    3333#include "noit_defines.h" 
     34#include "noit_check.h" 
    3435#include "noit_check_log_helpers.h" 
     36#include <stdio.h> 
    3537#include <zlib.h> 
    3638#include "utils/noit_b64.h" 
     39#include "utils/noit_str.h" 
    3740#include "utils/noit_log.h" 
     41#include "bundle.pb-c.h" 
    3842 
    3943int 
     
    134138  return 0; 
    135139} 
     140 
     141int 
     142noit_stats_snprint_metric_value(char *b, int l, metric_t *m) { 
     143  int rv; 
     144  if(!m->metric_value.s) { /* they are all null */ 
     145    rv = snprintf(b, l, "[[null]]"); 
     146  } 
     147  else { 
     148    switch(m->metric_type) { 
     149      case METRIC_INT32: 
     150        rv = snprintf(b, l, "%d", *(m->metric_value.i)); break; 
     151      case METRIC_UINT32: 
     152        rv = snprintf(b, l, "%u", *(m->metric_value.I)); break; 
     153      case METRIC_INT64: 
     154        rv = snprintf(b, l, "%lld", (long long int)*(m->metric_value.l)); break; 
     155      case METRIC_UINT64: 
     156        rv = snprintf(b, l, "%llu", 
     157                      (long long unsigned int)*(m->metric_value.L)); break;      case METRIC_DOUBLE: 
     158        rv = snprintf(b, l, "%.12e", *(m->metric_value.n)); break; 
     159      case METRIC_STRING: 
     160        rv = snprintf(b, l, "%s", m->metric_value.s); break; 
     161      default: 
     162        return -1; 
     163    } 
     164  } 
     165  return rv; 
     166} 
     167 
     168int 
     169noit_check_log_b_to_sm(const char *line, int len, char ***out) { 
     170  Bundle *bundle = NULL; 
     171  noit_compression_type_t ctype; 
     172  unsigned int ulen; 
     173  int i, size, cnt = 0; 
     174  const char *cp1, *cp2, *rest, *error_str = NULL; 
     175  char *timestamp, *uuid_str, *target, *module, *name, *ulen_str; 
     176  unsigned char *raw_protobuf = NULL; 
     177 
     178  *out = NULL; 
     179  if(len < 3) return 0; 
     180  if(line[0] != 'B' || line[2] != '\t') return 0; 
     181  switch(line[1]) { 
     182    case '1': ctype = NOIT_COMPRESS_ZLIB; break; 
     183    case '2': ctype = NOIT_COMPRESS_NONE; break; 
     184    default: return 0; 
     185  } 
     186 
     187  /* All good, and we're off to the races */ 
     188  line += 3; len -= 3; 
     189  cp1 = line; 
     190#define SET_FIELD_FROM_BUNDLE(tgt) do { \ 
     191  if(*cp1 == '\0') { error_str = "short line @ " #tgt; goto bad_line; } \ 
     192  cp2 = strnstrn("\t", 1, cp1, len - (cp1 - line)); \ 
     193  if(cp2 == NULL) { error_str = "no tab after " #tgt; goto bad_line; } \ 
     194  tgt = (char *)alloca(cp2 - cp1 + 1); \ 
     195  if(!tgt) { error_str = "alloca failed for " #tgt; goto bad_line; } \ 
     196  memcpy(tgt, cp1, cp2 - cp1); \ 
     197  tgt[cp2 - cp1] = '\0'; \ 
     198  cp1 = cp2 + 1; \ 
     199} while(0) 
     200  SET_FIELD_FROM_BUNDLE(timestamp); 
     201  SET_FIELD_FROM_BUNDLE(uuid_str); 
     202  SET_FIELD_FROM_BUNDLE(target); 
     203  SET_FIELD_FROM_BUNDLE(module); 
     204  SET_FIELD_FROM_BUNDLE(name); 
     205  SET_FIELD_FROM_BUNDLE(ulen_str); 
     206  rest = cp1; 
     207 
     208  ulen = strtoul(ulen_str, NULL, 10); 
     209  raw_protobuf = malloc(ulen); 
     210  if(!raw_protobuf) { 
     211    noitL(noit_error, "bundle decode: memory exhausted\n"); 
     212    goto bad_line; 
     213  } 
     214  if(noit_check_log_bundle_decompress_b64(ctype, 
     215                                          rest, len - (rest - line), 
     216                                          (char *)raw_protobuf, 
     217                                          ulen)) { 
     218    noitL(noit_error, "bundle decode: failed to decompress\n"); 
     219    goto bad_line; 
     220  } 
     221  /* decode the protobuf */ 
     222  bundle = bundle__unpack(&protobuf_c_system_allocator, ulen, raw_protobuf); 
     223  if(!bundle) { 
     224    noitL(noit_error, "bundle decode: protobuf invalid\n"); 
     225    goto bad_line; 
     226  } 
     227  noitL(noit_error, "ZOMG -> data (%lld metrics)\n", 
     228        (long long int)bundle->n_metrics); 
     229  cnt = bundle->n_metrics + 1; 
     230  *out = calloc(sizeof(**out), cnt); 
     231  if(!*out) { error_str = "memory exhaustion"; goto bad_line; } 
     232  /* build out status line */ 
     233  size = 2 /* S\t */ + strlen(timestamp) + 1 /* \t */ + strlen(uuid_str) + 
     234         5 /* \tG\tA\t */ + 11 /* max(strlen(duration)) */ + 
     235         1 /* \t */ + 
     236         (bundle->status ? strlen(bundle->status) : 8 /* [[null]] */) + 
     237         1 /* \0 */; 
     238  **out = malloc(size); 
     239  snprintf(**out, size, "S\t%s\t%s\t%c\t%c\t%d\t%s", 
     240           timestamp, uuid_str, bundle->state, bundle->available, 
     241           bundle->duration, bundle->status ? bundle->status : "[[null]]"); 
     242  /* build our metric lines */ 
     243  for(i=1; i<cnt; i++) { 
     244    Metric *metric = bundle->metrics[i-1]; 
     245    metric_t m; 
     246    char scratch[64], *value_str;; 
     247    int value_size = 0; 
     248 
     249    m.metric_name = metric->name; 
     250    m.metric_type = metric->metrictype; 
     251    m.metric_value.vp = NULL; 
     252    scratch[0] = '\0'; 
     253    value_str = scratch; 
     254    switch(m.metric_type) { 
     255      case METRIC_INT32: 
     256        m.metric_value.i = &metric->valuei32; 
     257        noit_stats_snprint_metric_value(scratch, 64, &m); 
     258        value_size = strlen(scratch); 
     259        break; 
     260      case METRIC_UINT32: 
     261        m.metric_value.I = &metric->valueui32; 
     262        noit_stats_snprint_metric_value(scratch, 64, &m); 
     263        value_size = strlen(scratch); 
     264        break; 
     265      case METRIC_INT64: 
     266        m.metric_value.l = &metric->valuei64; 
     267        noit_stats_snprint_metric_value(scratch, 64, &m); 
     268        value_size = strlen(scratch); 
     269        break; 
     270      case METRIC_UINT64: 
     271        m.metric_value.L = &metric->valueui64; 
     272        noit_stats_snprint_metric_value(scratch, 64, &m); 
     273        value_size = strlen(scratch); 
     274        break; 
     275      case METRIC_DOUBLE: 
     276        m.metric_value.n = &metric->valuedbl; 
     277        noit_stats_snprint_metric_value(scratch, 64, &m); 
     278        value_size = strlen(scratch); 
     279        break; 
     280      case METRIC_STRING: 
     281        m.metric_value.s = metric->valuestr ? metric->valuestr : "[[null]]"; 
     282        value_str = metric->valuestr ? metric->valuestr : "[[null]]"; 
     283        value_size = strlen(value_str); 
     284        break; 
     285      default: 
     286        break; 
     287    } 
     288    if(value_size == 0) continue; /* WTF, bad metric_type? */ 
     289 
     290    size = 2 /* M\t */ + strlen(timestamp) + 1 /* \t */ + 
     291           strlen(uuid_str) + 1 /* \t */ + strlen(metric->name) + 
     292           3 /* \t<type>\t */ + value_size + 1 /* \0 */; 
     293    (*out)[i] = malloc(size); 
     294    snprintf((*out)[i], size, "M\t%s\t%s\t%s\t%c\t%s", 
     295             timestamp, uuid_str, metric->name, m.metric_type, value_str); 
     296  } 
     297  goto good_line; 
     298 
     299 bad_line: 
     300  if(*out) { 
     301    int i; 
     302    for(i=0; i<cnt; i++) if((*out)[i]) free((*out)[i]); 
     303    free(*out); 
     304    *out = NULL; 
     305  } 
     306  if(error_str) noitL(noit_error, "bundle: bad line due to %s\n", error_str); 
     307 good_line: 
     308  if(bundle) bundle__free_unpacked(bundle, &protobuf_c_system_allocator); 
     309  if(raw_protobuf) free(raw_protobuf); 
     310  return cnt; 
     311} 
  • src/noit_check_log_helpers.h

    r37e31fd r4a653ea  
    5353                                     unsigned int len_out); 
    5454 
     55int 
     56noit_check_log_b_to_sm(const char *line, int len, char ***out); 
     57 
    5558#endif