Changeset 9799031c4d6cbb100f1ccbc5c773a07e3d988542 for src
- Timestamp:
- 09/07/11 18:43:47 (2 years ago)
- git-parent:
- Files:
-
- src/modules/handoff_ingestor.c (modified) (2 diffs)
- src/modules/postgres_ingestor.c (modified) (5 diffs)
- src/stratcon_datastore.c (modified) (5 diffs)
- src/stratcon_datastore.h (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/modules/handoff_ingestor.c
r5a8b07a r9799031 175 175 } 176 176 177 static void177 static int 178 178 stratcon_ingest_launch_file_ingestion(const char *path, 179 179 const char *remote_str, 180 180 const char *remote_cn, 181 181 const char *id_str) { 182 char msg[PATH_MAX + 7]; /*file:\r\n*/ 183 noitL(noit_error, " handoff -> %s\n", path); 182 char msg[PATH_MAX + 7], hfile[PATH_MAX]; /*file:\r\n*/ 183 if(strcmp(path + strlen(path) - 2, ".h")) { 184 snprintf(hfile, sizeof(hfile), "%s.h", path); 185 if(link(path, hfile) < 0) { 186 noitL(noit_error, "cannot link journal: %s\n", strerror(errno)); 187 return -1; 188 } 189 } 190 else 191 strlcpy(hfile, path, sizeof(hfile)); 192 193 noitL(noit_error, " handoff -> %s\n", hfile); 184 194 if(the_one_and_only) { 185 195 noit_http_session_ctx *ctx = the_one_and_only; 186 snprintf(msg, sizeof(msg), "file:%s\r\n", path);196 snprintf(msg, sizeof(msg), "file:%s\r\n", hfile); 187 197 if(noit_http_response_append(ctx,msg,strlen(msg)) == noit_false || 188 198 noit_http_response_flush(ctx, noit_false) == noit_false) { … … 191 201 } 192 202 } 203 return 0; 193 204 } 194 205 src/modules/postgres_ingestor.c
r7f6c1bc r9799031 1413 1413 } 1414 1414 1415 static void1415 static int 1416 1416 stratcon_ingest_launch_file_ingestion(const char *path, 1417 1417 const char *remote_str, … … 1419 1419 const char *id_str) { 1420 1420 pg_interim_journal_t *ij; 1421 char pgfile[PATH_MAX]; 1421 1422 eventer_t ingest; 1422 1423 1424 if(strcmp(path + strlen(path) - 3, ".pg")) { 1425 snprintf(pgfile, sizeof(pgfile), "%s.pg", path); 1426 if(link(path, pgfile) < 0) { 1427 noitL(noit_error, "cannot link journal: %s\n", strerror(errno)); 1428 free(ij); 1429 return -1; 1430 } 1431 } 1432 else 1433 strlcpy(pgfile, path, sizeof(pgfile)); 1423 1434 ij = calloc(1, sizeof(*ij)); 1424 ij->fd = open(p ath, O_RDONLY);1435 ij->fd = open(pgfile, O_RDONLY); 1425 1436 if(ij->fd < 0) { 1426 1437 noitL(noit_error, "cannot open journal '%s': %s\n", 1427 p ath, strerror(errno));1438 pgfile, strerror(errno)); 1428 1439 free(ij); 1429 return ;1440 return -1; 1430 1441 } 1431 1442 close(ij->fd); 1432 1443 ij->fd = -1; 1433 ij->filename = strdup(p ath);1444 ij->filename = strdup(pgfile); 1434 1445 ij->remote_str = strdup(remote_str); 1435 1446 ij->remote_cn = strdup(remote_cn); … … 1443 1454 ingest->closure = ij; 1444 1455 eventer_add_asynch(ij->cpool->jobq, ingest); 1445 } 1446 static void 1447 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third) { 1448 char path[PATH_MAX]; 1449 DIR *root; 1450 struct dirent *de, *entry; 1451 int i = 0, cnt = 0; 1452 char **entries; 1453 int size = 0; 1454 1455 snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 1456 first ? "/" : "", first ? first : "", 1457 second ? "/" : "", second ? second : "", 1458 third ? "/" : "", third ? third : ""); 1459 #ifdef _PC_NAME_MAX 1460 size = pathconf(path, _PC_NAME_MAX); 1461 #endif 1462 size = MAX(size, PATH_MAX + 128); 1463 de = alloca(size); 1464 root = opendir(path); 1465 if(!root) return; 1466 while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++; 1467 closedir(root); 1468 root = opendir(path); 1469 if(!root) return; 1470 entries = malloc(sizeof(*entries) * cnt); 1471 while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) { 1472 if(i < cnt) { 1473 entries[i++] = strdup(entry->d_name); 1474 } 1475 } 1476 closedir(root); 1477 cnt = i; /* could have changed, directories are fickle */ 1478 qsort(entries, i, sizeof(*entries), 1479 (int (*)(const void *, const void *))strcasecmp); 1480 for(i=0; i<cnt; i++) { 1481 if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 1482 noitL(ds_deb, "Processing L%d entry '%s'\n", 1483 third ? 4 : second ? 3 : first ? 2 : 1, entries[i]); 1484 if(!first) 1485 stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL); 1486 else if(!second) 1487 stratcon_ingest_sweep_journals_int(first, entries[i], NULL); 1488 else if(!third) 1489 stratcon_ingest_sweep_journals_int(first, second, entries[i]); 1490 else if(strlen(entries[i]) == 16) { 1491 char fullpath[PATH_MAX]; 1492 snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath, 1493 first,second,third,entries[i]); 1494 stratcon_ingest_launch_file_ingestion(fullpath,first,second,third); 1495 } 1496 } 1497 for(i=0; i<cnt; i++) 1498 free(entries[i]); 1499 free(entries); 1500 } 1501 static void 1502 stratcon_ingest_sweep_journals() { 1503 stratcon_ingest_sweep_journals_int(NULL,NULL,NULL); 1456 return 0; 1504 1457 } 1505 1458 … … 1680 1633 return 0; 1681 1634 } 1635 static int is_postgres_ingestor_file(const char *file) { 1636 return (strlen(file) == 19 && !strcmp(file + 16, ".pg")); 1637 } 1682 1638 static int postgres_ingestor_init(noit_module_generic_t *self) { 1683 1639 pthread_mutex_init(&ds_conns_lock, NULL); … … 1696 1652 stratcon_ingest_all_check_info(); 1697 1653 stratcon_ingest_all_storagenode_info(); 1698 stratcon_ingest_sweep_journals(); 1654 stratcon_ingest_sweep_journals(is_postgres_ingestor_file, 1655 stratcon_ingest_launch_file_ingestion); 1699 1656 return stratcon_datastore_set_ingestor(&postgres_ingestor_api); 1700 1657 } src/stratcon_datastore.c
r4b21a98 r9799031 62 62 63 63 static ingestor_api_t *ingestor = NULL; 64 typedef struct ingest_chain_t { 65 ingestor_api_t *ingestor; 66 struct ingest_chain_t *next; 67 } ingest_chain_t; 68 69 static ingest_chain_t *ingestor_chain; 70 64 71 static int ds_system_enabled = 1; 65 72 int stratcon_datastore_get_enabled() { return ds_system_enabled; } 66 73 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; } 67 74 int stratcon_datastore_set_ingestor(ingestor_api_t *ni) { 68 if(ingestor) return -1; 69 ingestor = ni; 75 ingest_chain_t *p, *i = calloc(1, sizeof(*i)); 76 i->ingestor = ni; 77 if(!ingestor_chain) ingestor_chain = i; 78 else { 79 for(p = ingestor_chain; p->next; p = p->next); 80 p->next = i; 81 } 82 ingestor = ingestor_chain->ingestor; 70 83 return 0; 71 84 } … … 92 105 if(ij->fqdn) free(ij->fqdn); 93 106 free(ij); 107 } 108 109 static void 110 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third, 111 int (*test)(const char *), 112 int (*ingest)(const char *fullpath, 113 const char *remote_str, 114 const char *remote_cn, 115 const char *id_str)) { 116 char path[PATH_MAX]; 117 DIR *root; 118 struct dirent *de, *entry; 119 int i = 0, cnt = 0; 120 char **entries; 121 int size = 0; 122 123 snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 124 first ? "/" : "", first ? first : "", 125 second ? "/" : "", second ? second : "", 126 third ? "/" : "", third ? third : ""); 127 #ifdef _PC_NAME_MAX 128 size = pathconf(path, _PC_NAME_MAX); 129 #endif 130 size = MAX(size, PATH_MAX + 128); 131 de = alloca(size); 132 root = opendir(path); 133 if(!root) return; 134 while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++; 135 closedir(root); 136 root = opendir(path); 137 if(!root) return; 138 entries = malloc(sizeof(*entries) * cnt); 139 while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) { 140 if(i < cnt) { 141 entries[i++] = strdup(entry->d_name); 142 } 143 } 144 closedir(root); 145 cnt = i; /* could have changed, directories are fickle */ 146 qsort(entries, i, sizeof(*entries), 147 (int (*)(const void *, const void *))strcasecmp); 148 for(i=0; i<cnt; i++) { 149 if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 150 noitL(ds_deb, "Processing L%d entry '%s'\n", 151 third ? 4 : second ? 3 : first ? 2 : 1, entries[i]); 152 if(!first) 153 stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL, test, ingest); 154 else if(!second) 155 stratcon_ingest_sweep_journals_int(first, entries[i], NULL, test, ingest); 156 else if(!third) 157 stratcon_ingest_sweep_journals_int(first, second, entries[i], test, ingest); 158 else if(test(entries[i])) { 159 char fullpath[PATH_MAX]; 160 snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath, 161 first,second,third,entries[i]); 162 ingest(fullpath,first,second,third); 163 } 164 } 165 for(i=0; i<cnt; i++) 166 free(entries[i]); 167 free(entries); 168 } 169 void 170 stratcon_ingest_sweep_journals(int (*test)(const char *), 171 int (*ingest)(const char *fullpath, 172 const char *remote_str, 173 const char *remote_cn, 174 const char *id_str)) { 175 stratcon_ingest_sweep_journals_int(NULL,NULL,NULL, test, ingest); 176 } 177 178 static int 179 stratcon_ingest(const char *fullpath, const char *remote_str, 180 const char *remote_cn, const char *id_str) { 181 ingest_chain_t *ic; 182 int err = 0; 183 for(ic = ingestor_chain; ic; ic = ic->next) 184 if(ic->ingestor->launch_file_ingestion(fullpath, remote_str, 185 remote_cn, id_str)) 186 err = -1; 187 if(err == 0) { 188 unlink(fullpath); 189 } 190 return err; 94 191 } 95 192 static int … … 139 236 ij->fd = -1; 140 237 snprintf(id_str, sizeof(id_str), "%d", ij->storagenode_id); 141 ingestor->launch_file_ingestion(ij->filename, ij->remote_str,142 ij->remote_cn, id_str);238 stratcon_ingest(ij->filename, ij->remote_str, 239 ij->remote_cn, id_str); 143 240 } 144 241 noit_hash_destroy(syncset->ws, free, interim_journal_free); … … 368 465 } 369 466 370 void 371 stratcon_datastore_init() { 467 static int is_raw_ingestion_file(const char *file) { 468 return (strlen(file) == 16); 469 } 470 471 void 472 stratcon_datastore_core_init() { 372 473 ds_err = noit_log_stream_find("error/datastore"); 373 474 ds_deb = noit_log_stream_find("debug/datastore"); … … 381 482 exit(-1); 382 483 } 484 } 485 void 486 stratcon_datastore_init() { 487 stratcon_datastore_core_init(); 488 489 stratcon_ingest_sweep_journals(is_raw_ingestion_file, 490 stratcon_ingest); 383 491 384 492 assert(noit_http_rest_register_auth( src/stratcon_datastore.h
r5f816fc r9799031 43 43 44 44 typedef struct { 45 void(*launch_file_ingestion)(const char *file, const char *ip,46 const char *cn, const char *store);45 int (*launch_file_ingestion)(const char *file, const char *ip, 46 const char *cn, const char *store); 47 47 void (*iep_check_preload)(); 48 48 int (*storage_node_lookup)(const char *uuid_str, const char *remote_cn, … … 83 83 84 84 API_EXPORT(void) 85 stratcon_datastore_core_init(); 86 87 API_EXPORT(void) 85 88 stratcon_datastore_init(); 86 89 … … 98 101 stratcon_datastore_set_enabled(int); 99 102 103 API_EXPORT(void) 104 stratcon_ingest_sweep_journals(int (*test)(const char *), 105 int (*ingest)(const char *fullpath, 106 const char *remote_str, 107 const char *remote_cn, 108 const char *id_str)); 100 109 #endif
