Show
Ignore:
Timestamp:
09/07/11 18:43:47 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1315421027 -0400
git-parent:

[c6d83bbfb23e3a87bbd661cc601af89380951a49]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1315421027 -0400
Message:

support multiple ingestors

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/modules/handoff_ingestor.c

    r5a8b07a r9799031  
    175175} 
    176176 
    177 static void 
     177static int 
    178178stratcon_ingest_launch_file_ingestion(const char *path, 
    179179                                      const char *remote_str, 
    180180                                      const char *remote_cn, 
    181181                                      const char *id_str) { 
    182   char msg[PATH_MAX + 7]; /*file:\r\n*/ 
    183   noitL(noit_error, " handoff -> %s\n", path); 
     182  char msg[PATH_MAX + 7], hfile[PATH_MAX]; /*file:\r\n*/ 
     183  if(strcmp(path + strlen(path) - 2, ".h")) { 
     184    snprintf(hfile, sizeof(hfile), "%s.h", path); 
     185    if(link(path, hfile) < 0) { 
     186      noitL(noit_error, "cannot link journal: %s\n", strerror(errno)); 
     187      return -1; 
     188    } 
     189  } 
     190  else 
     191    strlcpy(hfile, path, sizeof(hfile)); 
     192 
     193  noitL(noit_error, " handoff -> %s\n", hfile); 
    184194  if(the_one_and_only) { 
    185195    noit_http_session_ctx *ctx = the_one_and_only; 
    186     snprintf(msg, sizeof(msg), "file:%s\r\n", path); 
     196    snprintf(msg, sizeof(msg), "file:%s\r\n", hfile); 
    187197    if(noit_http_response_append(ctx,msg,strlen(msg)) == noit_false || 
    188198       noit_http_response_flush(ctx, noit_false) == noit_false) { 
     
    191201    } 
    192202  } 
     203  return 0; 
    193204} 
    194205 
  • src/modules/postgres_ingestor.c

    r7f6c1bc r9799031  
    14131413} 
    14141414 
    1415 static void 
     1415static int 
    14161416stratcon_ingest_launch_file_ingestion(const char *path, 
    14171417                                      const char *remote_str, 
     
    14191419                                      const char *id_str) { 
    14201420  pg_interim_journal_t *ij; 
     1421  char pgfile[PATH_MAX]; 
    14211422  eventer_t ingest; 
    14221423 
     1424  if(strcmp(path + strlen(path) - 3, ".pg")) { 
     1425    snprintf(pgfile, sizeof(pgfile), "%s.pg", path); 
     1426    if(link(path, pgfile) < 0) { 
     1427      noitL(noit_error, "cannot link journal: %s\n", strerror(errno)); 
     1428      free(ij); 
     1429      return -1; 
     1430    } 
     1431  } 
     1432  else 
     1433    strlcpy(pgfile, path, sizeof(pgfile)); 
    14231434  ij = calloc(1, sizeof(*ij)); 
    1424   ij->fd = open(path, O_RDONLY); 
     1435  ij->fd = open(pgfile, O_RDONLY); 
    14251436  if(ij->fd < 0) { 
    14261437    noitL(noit_error, "cannot open journal '%s': %s\n", 
    1427           path, strerror(errno)); 
     1438          pgfile, strerror(errno)); 
    14281439    free(ij); 
    1429     return
     1440    return -1
    14301441  } 
    14311442  close(ij->fd); 
    14321443  ij->fd = -1; 
    1433   ij->filename = strdup(path); 
     1444  ij->filename = strdup(pgfile); 
    14341445  ij->remote_str = strdup(remote_str); 
    14351446  ij->remote_cn = strdup(remote_cn); 
     
    14431454  ingest->closure = ij; 
    14441455  eventer_add_asynch(ij->cpool->jobq, ingest); 
    1445 
    1446 static void 
    1447 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third) { 
    1448   char path[PATH_MAX]; 
    1449   DIR *root; 
    1450   struct dirent *de, *entry; 
    1451   int i = 0, cnt = 0; 
    1452   char **entries; 
    1453   int size = 0; 
    1454  
    1455   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 
    1456            first ? "/" : "", first ? first : "", 
    1457            second ? "/" : "", second ? second : "", 
    1458            third ? "/" : "", third ? third : ""); 
    1459 #ifdef _PC_NAME_MAX 
    1460   size = pathconf(path, _PC_NAME_MAX); 
    1461 #endif 
    1462   size = MAX(size, PATH_MAX + 128); 
    1463   de = alloca(size); 
    1464   root = opendir(path); 
    1465   if(!root) return; 
    1466   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++; 
    1467   closedir(root); 
    1468   root = opendir(path); 
    1469   if(!root) return; 
    1470   entries = malloc(sizeof(*entries) * cnt); 
    1471   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) { 
    1472     if(i < cnt) { 
    1473       entries[i++] = strdup(entry->d_name); 
    1474     } 
    1475   } 
    1476   closedir(root); 
    1477   cnt = i; /* could have changed, directories are fickle */ 
    1478   qsort(entries, i, sizeof(*entries), 
    1479         (int (*)(const void *, const void *))strcasecmp); 
    1480   for(i=0; i<cnt; i++) { 
    1481     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 
    1482     noitL(ds_deb, "Processing L%d entry '%s'\n", 
    1483           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]); 
    1484     if(!first) 
    1485       stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL); 
    1486     else if(!second) 
    1487       stratcon_ingest_sweep_journals_int(first, entries[i], NULL); 
    1488     else if(!third) 
    1489       stratcon_ingest_sweep_journals_int(first, second, entries[i]); 
    1490     else if(strlen(entries[i]) == 16) { 
    1491       char fullpath[PATH_MAX]; 
    1492       snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath, 
    1493                first,second,third,entries[i]); 
    1494       stratcon_ingest_launch_file_ingestion(fullpath,first,second,third); 
    1495     } 
    1496   } 
    1497   for(i=0; i<cnt; i++) 
    1498     free(entries[i]); 
    1499   free(entries); 
    1500 
    1501 static void 
    1502 stratcon_ingest_sweep_journals() { 
    1503   stratcon_ingest_sweep_journals_int(NULL,NULL,NULL); 
     1456  return 0; 
    15041457} 
    15051458 
     
    16801633  return 0; 
    16811634} 
     1635static int is_postgres_ingestor_file(const char *file) { 
     1636  return (strlen(file) == 19 && !strcmp(file + 16, ".pg")); 
     1637} 
    16821638static int postgres_ingestor_init(noit_module_generic_t *self) { 
    16831639  pthread_mutex_init(&ds_conns_lock, NULL); 
     
    16961652  stratcon_ingest_all_check_info(); 
    16971653  stratcon_ingest_all_storagenode_info(); 
    1698   stratcon_ingest_sweep_journals(); 
     1654  stratcon_ingest_sweep_journals(is_postgres_ingestor_file, 
     1655                                 stratcon_ingest_launch_file_ingestion); 
    16991656  return stratcon_datastore_set_ingestor(&postgres_ingestor_api); 
    17001657} 
  • src/stratcon_datastore.c

    r4b21a98 r9799031  
    6262 
    6363static ingestor_api_t *ingestor = NULL; 
     64typedef struct ingest_chain_t { 
     65  ingestor_api_t *ingestor; 
     66  struct ingest_chain_t *next; 
     67} ingest_chain_t; 
     68 
     69static ingest_chain_t *ingestor_chain; 
     70 
    6471static int ds_system_enabled = 1; 
    6572int stratcon_datastore_get_enabled() { return ds_system_enabled; } 
    6673void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; } 
    6774int stratcon_datastore_set_ingestor(ingestor_api_t *ni) { 
    68   if(ingestor) return -1; 
    69   ingestor = ni; 
     75  ingest_chain_t *p, *i = calloc(1, sizeof(*i)); 
     76  i->ingestor = ni; 
     77  if(!ingestor_chain) ingestor_chain = i; 
     78  else { 
     79    for(p = ingestor_chain; p->next; p = p->next); 
     80    p->next = i; 
     81  } 
     82  ingestor = ingestor_chain->ingestor; 
    7083  return 0; 
    7184} 
     
    92105  if(ij->fqdn) free(ij->fqdn); 
    93106  free(ij); 
     107} 
     108 
     109static void 
     110stratcon_ingest_sweep_journals_int(char *first, char *second, char *third, 
     111                                   int (*test)(const char *), 
     112                                   int (*ingest)(const char *fullpath, 
     113                                                 const char *remote_str, 
     114                                                 const char *remote_cn, 
     115                                                 const char *id_str)) { 
     116  char path[PATH_MAX]; 
     117  DIR *root; 
     118  struct dirent *de, *entry; 
     119  int i = 0, cnt = 0; 
     120  char **entries; 
     121  int size = 0; 
     122 
     123  snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 
     124           first ? "/" : "", first ? first : "", 
     125           second ? "/" : "", second ? second : "", 
     126           third ? "/" : "", third ? third : ""); 
     127#ifdef _PC_NAME_MAX 
     128  size = pathconf(path, _PC_NAME_MAX); 
     129#endif 
     130  size = MAX(size, PATH_MAX + 128); 
     131  de = alloca(size); 
     132  root = opendir(path); 
     133  if(!root) return; 
     134  while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++; 
     135  closedir(root); 
     136  root = opendir(path); 
     137  if(!root) return; 
     138  entries = malloc(sizeof(*entries) * cnt); 
     139  while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) { 
     140    if(i < cnt) { 
     141      entries[i++] = strdup(entry->d_name); 
     142    } 
     143  } 
     144  closedir(root); 
     145  cnt = i; /* could have changed, directories are fickle */ 
     146  qsort(entries, i, sizeof(*entries), 
     147        (int (*)(const void *, const void *))strcasecmp); 
     148  for(i=0; i<cnt; i++) { 
     149    if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 
     150    noitL(ds_deb, "Processing L%d entry '%s'\n", 
     151          third ? 4 : second ? 3 : first ? 2 : 1, entries[i]); 
     152    if(!first) 
     153      stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL, test, ingest); 
     154    else if(!second) 
     155      stratcon_ingest_sweep_journals_int(first, entries[i], NULL, test, ingest); 
     156    else if(!third) 
     157      stratcon_ingest_sweep_journals_int(first, second, entries[i], test, ingest); 
     158    else if(test(entries[i])) { 
     159      char fullpath[PATH_MAX]; 
     160      snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath, 
     161               first,second,third,entries[i]); 
     162      ingest(fullpath,first,second,third); 
     163    } 
     164  } 
     165  for(i=0; i<cnt; i++) 
     166    free(entries[i]); 
     167  free(entries); 
     168} 
     169void 
     170stratcon_ingest_sweep_journals(int (*test)(const char *), 
     171                               int (*ingest)(const char *fullpath, 
     172                                             const char *remote_str, 
     173                                             const char *remote_cn, 
     174                                             const char *id_str)) { 
     175  stratcon_ingest_sweep_journals_int(NULL,NULL,NULL, test, ingest); 
     176} 
     177 
     178static int 
     179stratcon_ingest(const char *fullpath, const char *remote_str, 
     180                const char *remote_cn, const char *id_str) { 
     181  ingest_chain_t *ic; 
     182  int err = 0; 
     183  for(ic = ingestor_chain; ic; ic = ic->next) 
     184    if(ic->ingestor->launch_file_ingestion(fullpath, remote_str, 
     185                                           remote_cn, id_str)) 
     186      err = -1; 
     187  if(err == 0) { 
     188    unlink(fullpath); 
     189  } 
     190  return err; 
    94191} 
    95192static int 
     
    139236    ij->fd = -1; 
    140237    snprintf(id_str, sizeof(id_str), "%d", ij->storagenode_id); 
    141     ingestor->launch_file_ingestion(ij->filename, ij->remote_str, 
    142                                     ij->remote_cn, id_str); 
     238    stratcon_ingest(ij->filename, ij->remote_str, 
     239                    ij->remote_cn, id_str); 
    143240  } 
    144241  noit_hash_destroy(syncset->ws, free, interim_journal_free); 
     
    368465} 
    369466 
    370 void 
    371 stratcon_datastore_init() { 
     467static int is_raw_ingestion_file(const char *file) { 
     468  return (strlen(file) == 16); 
     469
     470 
     471void 
     472stratcon_datastore_core_init() { 
    372473  ds_err = noit_log_stream_find("error/datastore"); 
    373474  ds_deb = noit_log_stream_find("debug/datastore"); 
     
    381482    exit(-1); 
    382483  } 
     484} 
     485void 
     486stratcon_datastore_init() { 
     487  stratcon_datastore_core_init(); 
     488 
     489  stratcon_ingest_sweep_journals(is_raw_ingestion_file, 
     490                                 stratcon_ingest); 
    383491 
    384492  assert(noit_http_rest_register_auth( 
  • src/stratcon_datastore.h

    r5f816fc r9799031  
    4343 
    4444typedef struct { 
    45   void (*launch_file_ingestion)(const char *file, const char *ip, 
    46                                 const char *cn, const char *store); 
     45  int (*launch_file_ingestion)(const char *file, const char *ip, 
     46                               const char *cn, const char *store); 
    4747  void (*iep_check_preload)(); 
    4848  int (*storage_node_lookup)(const char *uuid_str, const char *remote_cn, 
     
    8383 
    8484API_EXPORT(void) 
     85  stratcon_datastore_core_init(); 
     86 
     87API_EXPORT(void) 
    8588  stratcon_datastore_init(); 
    8689 
     
    98101  stratcon_datastore_set_enabled(int); 
    99102 
     103API_EXPORT(void) 
     104  stratcon_ingest_sweep_journals(int (*test)(const char *), 
     105                                 int (*ingest)(const char *fullpath, 
     106                                               const char *remote_str, 
     107                                               const char *remote_cn, 
     108                                               const char *id_str)); 
    100109#endif