Show
Ignore:
Timestamp:
07/07/11 16:33:42 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1310056422 -0400
git-parent:

[485f3e7e53ef813c36bb393170d6c44997614545]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1310056422 -0400
Message:

make the postgres_ingestor work with the B messages

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/noit_check_log_helpers.c

    r37e31fd r4a653ea  
    3232 
    3333#include "noit_defines.h" 
     34#include "noit_check.h" 
    3435#include "noit_check_log_helpers.h" 
     36#include <stdio.h> 
    3537#include <zlib.h> 
    3638#include "utils/noit_b64.h" 
     39#include "utils/noit_str.h" 
    3740#include "utils/noit_log.h" 
     41#include "bundle.pb-c.h" 
    3842 
    3943int 
     
    134138  return 0; 
    135139} 
     140 
     141int 
     142noit_stats_snprint_metric_value(char *b, int l, metric_t *m) { 
     143  int rv; 
     144  if(!m->metric_value.s) { /* they are all null */ 
     145    rv = snprintf(b, l, "[[null]]"); 
     146  } 
     147  else { 
     148    switch(m->metric_type) { 
     149      case METRIC_INT32: 
     150        rv = snprintf(b, l, "%d", *(m->metric_value.i)); break; 
     151      case METRIC_UINT32: 
     152        rv = snprintf(b, l, "%u", *(m->metric_value.I)); break; 
     153      case METRIC_INT64: 
     154        rv = snprintf(b, l, "%lld", (long long int)*(m->metric_value.l)); break; 
     155      case METRIC_UINT64: 
     156        rv = snprintf(b, l, "%llu", 
     157                      (long long unsigned int)*(m->metric_value.L)); break;      case METRIC_DOUBLE: 
     158        rv = snprintf(b, l, "%.12e", *(m->metric_value.n)); break; 
     159      case METRIC_STRING: 
     160        rv = snprintf(b, l, "%s", m->metric_value.s); break; 
     161      default: 
     162        return -1; 
     163    } 
     164  } 
     165  return rv; 
     166} 
     167 
     168int 
     169noit_check_log_b_to_sm(const char *line, int len, char ***out) { 
     170  Bundle *bundle = NULL; 
     171  noit_compression_type_t ctype; 
     172  unsigned int ulen; 
     173  int i, size, cnt = 0; 
     174  const char *cp1, *cp2, *rest, *error_str = NULL; 
     175  char *timestamp, *uuid_str, *target, *module, *name, *ulen_str; 
     176  unsigned char *raw_protobuf = NULL; 
     177 
     178  *out = NULL; 
     179  if(len < 3) return 0; 
     180  if(line[0] != 'B' || line[2] != '\t') return 0; 
     181  switch(line[1]) { 
     182    case '1': ctype = NOIT_COMPRESS_ZLIB; break; 
     183    case '2': ctype = NOIT_COMPRESS_NONE; break; 
     184    default: return 0; 
     185  } 
     186 
     187  /* All good, and we're off to the races */ 
     188  line += 3; len -= 3; 
     189  cp1 = line; 
     190#define SET_FIELD_FROM_BUNDLE(tgt) do { \ 
     191  if(*cp1 == '\0') { error_str = "short line @ " #tgt; goto bad_line; } \ 
     192  cp2 = strnstrn("\t", 1, cp1, len - (cp1 - line)); \ 
     193  if(cp2 == NULL) { error_str = "no tab after " #tgt; goto bad_line; } \ 
     194  tgt = (char *)alloca(cp2 - cp1 + 1); \ 
     195  if(!tgt) { error_str = "alloca failed for " #tgt; goto bad_line; } \ 
     196  memcpy(tgt, cp1, cp2 - cp1); \ 
     197  tgt[cp2 - cp1] = '\0'; \ 
     198  cp1 = cp2 + 1; \ 
     199} while(0) 
     200  SET_FIELD_FROM_BUNDLE(timestamp); 
     201  SET_FIELD_FROM_BUNDLE(uuid_str); 
     202  SET_FIELD_FROM_BUNDLE(target); 
     203  SET_FIELD_FROM_BUNDLE(module); 
     204  SET_FIELD_FROM_BUNDLE(name); 
     205  SET_FIELD_FROM_BUNDLE(ulen_str); 
     206  rest = cp1; 
     207 
     208  ulen = strtoul(ulen_str, NULL, 10); 
     209  raw_protobuf = malloc(ulen); 
     210  if(!raw_protobuf) { 
     211    noitL(noit_error, "bundle decode: memory exhausted\n"); 
     212    goto bad_line; 
     213  } 
     214  if(noit_check_log_bundle_decompress_b64(ctype, 
     215                                          rest, len - (rest - line), 
     216                                          (char *)raw_protobuf, 
     217                                          ulen)) { 
     218    noitL(noit_error, "bundle decode: failed to decompress\n"); 
     219    goto bad_line; 
     220  } 
     221  /* decode the protobuf */ 
     222  bundle = bundle__unpack(&protobuf_c_system_allocator, ulen, raw_protobuf); 
     223  if(!bundle) { 
     224    noitL(noit_error, "bundle decode: protobuf invalid\n"); 
     225    goto bad_line; 
     226  } 
     227  noitL(noit_error, "ZOMG -> data (%lld metrics)\n", 
     228        (long long int)bundle->n_metrics); 
     229  cnt = bundle->n_metrics + 1; 
     230  *out = calloc(sizeof(**out), cnt); 
     231  if(!*out) { error_str = "memory exhaustion"; goto bad_line; } 
     232  /* build out status line */ 
     233  size = 2 /* S\t */ + strlen(timestamp) + 1 /* \t */ + strlen(uuid_str) + 
     234         5 /* \tG\tA\t */ + 11 /* max(strlen(duration)) */ + 
     235         1 /* \t */ + 
     236         (bundle->status ? strlen(bundle->status) : 8 /* [[null]] */) + 
     237         1 /* \0 */; 
     238  **out = malloc(size); 
     239  snprintf(**out, size, "S\t%s\t%s\t%c\t%c\t%d\t%s", 
     240           timestamp, uuid_str, bundle->state, bundle->available, 
     241           bundle->duration, bundle->status ? bundle->status : "[[null]]"); 
     242  /* build our metric lines */ 
     243  for(i=1; i<cnt; i++) { 
     244    Metric *metric = bundle->metrics[i-1]; 
     245    metric_t m; 
     246    char scratch[64], *value_str;; 
     247    int value_size = 0; 
     248 
     249    m.metric_name = metric->name; 
     250    m.metric_type = metric->metrictype; 
     251    m.metric_value.vp = NULL; 
     252    scratch[0] = '\0'; 
     253    value_str = scratch; 
     254    switch(m.metric_type) { 
     255      case METRIC_INT32: 
     256        m.metric_value.i = &metric->valuei32; 
     257        noit_stats_snprint_metric_value(scratch, 64, &m); 
     258        value_size = strlen(scratch); 
     259        break; 
     260      case METRIC_UINT32: 
     261        m.metric_value.I = &metric->valueui32; 
     262        noit_stats_snprint_metric_value(scratch, 64, &m); 
     263        value_size = strlen(scratch); 
     264        break; 
     265      case METRIC_INT64: 
     266        m.metric_value.l = &metric->valuei64; 
     267        noit_stats_snprint_metric_value(scratch, 64, &m); 
     268        value_size = strlen(scratch); 
     269        break; 
     270      case METRIC_UINT64: 
     271        m.metric_value.L = &metric->valueui64; 
     272        noit_stats_snprint_metric_value(scratch, 64, &m); 
     273        value_size = strlen(scratch); 
     274        break; 
     275      case METRIC_DOUBLE: 
     276        m.metric_value.n = &metric->valuedbl; 
     277        noit_stats_snprint_metric_value(scratch, 64, &m); 
     278        value_size = strlen(scratch); 
     279        break; 
     280      case METRIC_STRING: 
     281        m.metric_value.s = metric->valuestr ? metric->valuestr : "[[null]]"; 
     282        value_str = metric->valuestr ? metric->valuestr : "[[null]]"; 
     283        value_size = strlen(value_str); 
     284        break; 
     285      default: 
     286        break; 
     287    } 
     288    if(value_size == 0) continue; /* WTF, bad metric_type? */ 
     289 
     290    size = 2 /* M\t */ + strlen(timestamp) + 1 /* \t */ + 
     291           strlen(uuid_str) + 1 /* \t */ + strlen(metric->name) + 
     292           3 /* \t<type>\t */ + value_size + 1 /* \0 */; 
     293    (*out)[i] = malloc(size); 
     294    snprintf((*out)[i], size, "M\t%s\t%s\t%s\t%c\t%s", 
     295             timestamp, uuid_str, metric->name, m.metric_type, value_str); 
     296  } 
     297  goto good_line; 
     298 
     299 bad_line: 
     300  if(*out) { 
     301    int i; 
     302    for(i=0; i<cnt; i++) if((*out)[i]) free((*out)[i]); 
     303    free(*out); 
     304    *out = NULL; 
     305  } 
     306  if(error_str) noitL(noit_error, "bundle: bad line due to %s\n", error_str); 
     307 good_line: 
     308  if(bundle) bundle__free_unpacked(bundle, &protobuf_c_system_allocator); 
     309  if(raw_protobuf) free(raw_protobuf); 
     310  return cnt; 
     311}