Changeset 5360a1ee7f4ade3bd6299e566204ee1820aeffa3
- Timestamp:
- 10/19/09 01:22:39 (4 years ago)
- git-parent:
- Files:
-
- src/stratcon.conf.in (modified) (4 diffs)
- src/stratcon_datastore.c (modified) (24 diffs)
- src/stratcon_datastore.h (modified) (1 diff)
- src/stratcon_iep.c (modified) (7 diffs)
- src/stratcon_iep.h (modified) (1 diff)
- src/stratcon_jlog_streamer.c (modified) (2 diffs)
- src/stratcon_jlog_streamer.h (modified) (1 diff)
- src/stratcon_realtime_http.c (modified) (2 diffs)
- src/stratcond.c (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/stratcon.conf.in
r8504a3b r5360a1e 9 9 <log name="error/iep"/> 10 10 <log name="error/eventer" disabled="true"/> 11 <log name="error/datastore" disabled="true"/> 11 12 <log name="debug/eventer" disabled="true"/> 12 13 </console_output> … … 31 32 </noits> 32 33 33 <iep disabled=" false"> <!-- false the default -->34 <iep disabled="true"> <!-- false the default --> 34 35 <start directory="%iepdbdir%" 35 36 command="%iepbindir%/run-iep.sh" /> … … 62 63 63 64 <database> 65 <journal> 66 <path>/var/log/stratcon.persist</path> 67 </journal> 64 68 <dbconfig> 65 69 <host>localhost</host> … … 69 73 </dbconfig> 70 74 <statements> 75 <!-- These are optional and used for stuff like setting search paths --> 76 <!-- 77 <metanodepostconnect><![CDATA[ 78 SELECT do_some_magic(); 79 ]]></metanodepostconnect> 80 <storagepostconnect><![CDATA[ 81 SELECT do_some_magic($1,$2); 82 ]]></storagepostconnect> 83 --> 71 84 <allchecks><![CDATA[ 72 85 SELECT remote_address, id, target, module, name 73 FROM stratcon.mv_loading_dock_check_s86 FROM noit.get_checks() 74 87 ]]></allchecks> 75 88 <findcheck><![CDATA[ 76 SELECT remote_address, id 77 FROM stratcon.mv_loading_dock_check_s 78 WHERE sid = $1 89 SELECT remote_address, id, target, module, name 90 FROM noit.get_check($1,$2) 79 91 ]]></findcheck> 92 <allstoragenodes><![CDATA[ 93 SELECT storage_node_id, fqdn, dsn 94 FROM stratcon.storage_node 95 ]]></allstoragenodes> 96 <findstoragenode><![CDATA[ 97 SELECT fqdn, dsn 98 FROM stratcon.storage_node 99 WHERE storage_node_id = $1 100 ]]></findstoragenode> 101 <mapallchecks><![CDATA[ 102 SELECT id, sid, storage_node_id, fqdn, dsn 103 FROM stratcon.map_uuid_to_sid LEFT JOIN stratcon.storage_node USING (storage_node_id) 104 ]]></mapallchecks> 105 <mapchecktostoragenode><![CDATA[ 106 SELECT o_storage_node_id as storage_node_id, o_sid as sid, o_fqdn as fqdn, o_dsn as dsn from stratcon.map_uuid_to_sid($1,$2) 107 ]]></mapchecktostoragenode> 80 108 <check><![CDATA[ 81 109 INSERT INTO stratcon.loading_dock_check_s src/stratcon_datastore.c
r4201a42 r5360a1e 1 1 /* 2 * Copyright (c) 2007 , OmniTI Computer Consulting, Inc.2 * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc. 3 3 * All rights reserved. 4 4 * … … 35 35 #include "utils/noit_log.h" 36 36 #include "utils/noit_b64.h" 37 #include "utils/noit_str.h" 38 #include "utils/noit_mkdir.h" 37 39 #include "stratcon_datastore.h" 38 40 #include "stratcon_realtime_http.h" … … 41 43 #include "noit_check.h" 42 44 #include <unistd.h> 45 #include <fcntl.h> 43 46 #include <netinet/in.h> 44 47 #include <sys/un.h> 48 #include <dirent.h> 45 49 #include <arpa/inet.h> 50 #include <sys/mman.h> 46 51 #include <libpq-fe.h> 47 52 #include <zlib.h> 48 53 #include <assert.h> 49 50 static char *check_loadall = NULL; 51 static const char *check_loadall_conf = "/stratcon/database/statements/allchecks"; 52 static char *check_find = NULL; 53 static const char *check_find_conf = "/stratcon/database/statements/findcheck"; 54 static char *check_insert = NULL; 55 static const char *check_insert_conf = "/stratcon/database/statements/check"; 56 static char *status_insert = NULL; 57 static const char *status_insert_conf = "/stratcon/database/statements/status"; 58 static char *metric_insert_numeric = NULL; 59 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric"; 60 static char *metric_insert_text = NULL; 61 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text"; 62 static char *config_insert = NULL; 63 static const char *config_insert_conf = "/stratcon/database/statements/config"; 54 #include <errno.h> 55 56 #define DECL_STMT(codename,confname) \ 57 static char *codename = NULL; \ 58 static const char *codename##_conf = "/stratcon/database/statements/" #confname 59 60 DECL_STMT(storage_post_connect, storagepostconnect); 61 DECL_STMT(metanode_post_connect, metanodepostconnect); 62 DECL_STMT(find_storage, findstoragenode); 63 DECL_STMT(all_storage, allstoragenodes); 64 DECL_STMT(check_map, mapchecktostoragenode); 65 DECL_STMT(check_mapall, mapallchecks); 66 DECL_STMT(check_loadall, allchecks); 67 DECL_STMT(check_find, findcheck); 68 DECL_STMT(check_insert, check); 69 DECL_STMT(status_insert, status); 70 DECL_STMT(metric_insert_numeric, metric_numeric); 71 DECL_STMT(metric_insert_text, metric_text); 72 DECL_STMT(config_insert, config); 73 74 static noit_log_stream_t ds_err = NULL; 75 static noit_log_stream_t ingest_err = NULL; 64 76 65 77 static struct datastore_onlooker_list { 66 void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, void *); 78 void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, 79 const char *, void *); 67 80 struct datastore_onlooker_list *next; 68 81 } *onlookers = NULL; … … 73 86 goto bad_row; \ 74 87 } while(0) 88 89 struct conn_q; 90 91 typedef struct { 92 char *queue_name; /* the key fqdn+remote_sn */ 93 eventer_jobq_t *jobq; 94 struct conn_q *head; 95 pthread_mutex_t lock; 96 pthread_cond_t cv; 97 int ttl; 98 int in_pool; 99 int outstanding; 100 int max_allocated; 101 int max_in_pool; 102 } conn_pool; 103 typedef struct conn_q { 104 time_t last_use; 105 char *dsn; /* Pg connect string */ 106 char *remote_str; /* the IP of the noit*/ 107 char *remote_cn; /* the Cert CN of the noit */ 108 char *fqdn; /* the fqdn of the storage node */ 109 conn_pool *pool; 110 struct conn_q *next; 111 /* Postgres specific stuff */ 112 PGconn *dbh; 113 } conn_q; 114 75 115 76 116 #define MAX_PARAMS 8 … … 89 129 POSTGRES_PARTS 90 130 } ds_single_detail; 91 typedef struct ds_job_detail{131 typedef struct { 92 132 /* Postgres specific stuff */ 93 133 POSTGRES_PARTS 94 95 char *data; /* The raw string, NULL means the stream is done -- commit. */96 134 struct realtime_tracker *rt; 97 135 conn_q *cq; /* connection on which to perform this job */ 136 eventer_t completion_event; /* This event should be registered if non NULL */ 137 } ds_rt_detail; 138 typedef struct ds_line_detail { 139 /* Postgres specific stuff */ 140 POSTGRES_PARTS 141 char *data; 98 142 int problematic; 99 eventer_t completion_event; /* This event should be registered if non NULL */ 100 struct ds_job_detail *next; 101 } ds_job_detail; 143 struct ds_line_detail *next; 144 } ds_line_detail; 102 145 103 146 typedef struct { 104 struct sockaddr *remote; 105 eventer_jobq_t *jobq; 106 /* Postgres specific stuff */ 107 PGconn *dbh; 108 ds_job_detail *head; 109 ds_job_detail *tail; 110 } conn_q; 147 noit_hash_table *ws; 148 eventer_t completion; 149 } syncset_t; 150 151 typedef struct { 152 char *remote_str; 153 char *remote_cn; 154 char *fqdn; 155 int storagenode_id; 156 int fd; 157 char *filename; 158 conn_pool *cpool; 159 } interim_journal_t; 111 160 112 161 static int stratcon_database_connect(conn_q *cq); … … 120 169 } 121 170 122 static void 123 __append(conn_q *q, ds_job_detail *d) { 124 d->next = NULL; 125 if(!q->head) q->head = q->tail = d; 126 else { 127 q->tail->next = d; 128 q->tail = d; 129 } 130 } 131 static void 132 __remove_until(conn_q *q, ds_job_detail *d) { 133 ds_job_detail *next; 134 while(q->head && q->head != d) { 135 next = q->head; 136 q->head = q->head->next; 137 free_params((ds_single_detail *)next); 138 if(next->data) free(next->data); 139 free(next); 140 } 141 if(!q->head) q->tail = NULL; 142 } 143 171 char *basejpath = NULL; 172 pthread_mutex_t ds_conns_lock; 144 173 noit_hash_table ds_conns; 145 146 conn_q * 147 __get_conn_q_for_remote(struct sockaddr *remote) { 148 void *vcq; 149 conn_q *cq; 150 char queue_name[128] = "datastore_"; 151 static const char __zeros[4] = { 0 }; 152 int len = 0; 174 noit_hash_table working_sets; 175 176 /* the fqdn cache needs to be thread safe */ 177 typedef struct { 178 char *uuid_str; 179 int storagenode_id; 180 int sid; 181 } uuid_info; 182 typedef struct { 183 int storagenode_id; 184 char *fqdn; 185 char *dsn; 186 } storagenode_info; 187 noit_hash_table uuid_to_info_cache; 188 pthread_mutex_t fqdn_to_info_cache_lock; 189 noit_hash_table fqdn_to_info_cache; 190 191 int 192 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) { 193 char name[128] = ""; 194 buff[0] = '\0'; 153 195 if(remote) { 196 int len = 0; 154 197 switch(remote->sa_family) { 155 198 case AF_INET: 156 199 len = sizeof(struct sockaddr_in); 157 200 inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr, 158 queue_name + strlen("datastore_"), len);201 name, len); 159 202 break; 160 203 case AF_INET6: 161 204 len = sizeof(struct sockaddr_in6); 162 205 inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr, 163 queue_name + strlen("datastore_"), len);206 name, len); 164 207 break; 165 208 case AF_UNIX: 166 209 len = SUN_LEN(((struct sockaddr_un *)remote)); 167 snprintf( queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path);210 snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path); 168 211 break; 169 default: return NULL; 170 } 212 default: return 0; 213 } 214 } 215 strlcpy(buff, name, blen); 216 return strlen(buff); 217 } 218 219 /* Thread-safe connection pools */ 220 221 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */ 222 static void 223 release_conn_q_forceable(conn_q *cq, int forcefree) { 224 int putback = 0; 225 cq->last_use = time(NULL); 226 pthread_mutex_lock(&cq->pool->lock); 227 cq->pool->outstanding--; 228 if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) { 229 putback = 1; 230 cq->next = cq->pool->head; 231 cq->pool->head = cq; 232 cq->pool->in_pool++; 233 } 234 pthread_mutex_unlock(&cq->pool->lock); 235 noitL(noit_debug, "[%p] release %s [%s]\n", (void *)pthread_self(), 236 putback ? "to pool" : "and destroy", cq->pool->queue_name); 237 pthread_cond_signal(&cq->pool->cv); 238 if(putback) return; 239 240 /* Not put back, release it */ 241 if(cq->dbh) PQfinish(cq->dbh); 242 if(cq->remote_str) free(cq->remote_str); 243 if(cq->remote_cn) free(cq->remote_cn); 244 if(cq->fqdn) free(cq->fqdn); 245 if(cq->dsn) free(cq->dsn); 246 free(cq); 247 } 248 static void 249 ttl_purge_conn_pool(conn_pool *pool) { 250 int old_cnt, new_cnt; 251 time_t now = time(NULL); 252 conn_q *cq, *prev = NULL, *iter; 253 /* because we always replace on the head and update the last_use time when 254 doing so, we know they are ordered LRU on the end. So, once we hit an 255 old one, we know all the others are old too. 256 */ 257 if(!pool->head) return; /* hack short circuit for no locks */ 258 pthread_mutex_lock(&pool->lock); 259 old_cnt = pool->in_pool; 260 cq = pool->head; 261 while(cq) { 262 if(cq->last_use + cq->pool->ttl < now) { 263 if(prev) prev->next = NULL; 264 else pool->head = NULL; 265 break; 266 } 267 prev = cq; 268 cq = cq->next; 269 } 270 /* Now pool->head is a chain of unexpired and cq is a chain of expired */ 271 /* Fix accounting */ 272 for(iter=cq; iter; iter=iter->next) pool->in_pool--; 273 new_cnt = pool->in_pool; 274 pthread_mutex_unlock(&pool->lock); 275 276 /* Force release these without holding the lock */ 277 while(cq) { 278 prev = cq; 279 cq = cq->next; 280 release_conn_q_forceable(cq, 1); 281 } 282 if(old_cnt != new_cnt) 283 noitL(noit_debug, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt, 284 pool->queue_name); 285 } 286 static void 287 release_conn_q(conn_q *cq) { 288 ttl_purge_conn_pool(cq->pool); 289 release_conn_q_forceable(cq, 0); 290 } 291 static conn_pool * 292 get_conn_pool_for_remote(const char *remote_str, 293 const char *remote_cn, const char *fqdn) { 294 void *vcpool; 295 conn_pool *cpool = NULL; 296 char queue_name[256] = "datastore_"; 297 snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s", 298 (remote_str && *remote_str) ? remote_str : "0.0.0.0", 299 fqdn ? fqdn : "default", 300 remote_cn ? remote_cn : "default"); 301 pthread_mutex_lock(&ds_conns_lock); 302 if(noit_hash_retrieve(&ds_conns, (const char *)queue_name, 303 strlen(queue_name), &vcpool)) 304 cpool = vcpool; 305 pthread_mutex_unlock(&ds_conns_lock); 306 if(!cpool) { 307 vcpool = cpool = calloc(1, sizeof(*cpool)); 308 cpool->queue_name = strdup(queue_name); 309 pthread_mutex_init(&cpool->lock, NULL); 310 pthread_cond_init(&cpool->cv, NULL); 311 cpool->in_pool = 0; 312 cpool->outstanding = 0; 313 cpool->max_in_pool = 1; 314 cpool->max_allocated = 1; 315 pthread_mutex_lock(&ds_conns_lock); 316 if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name), 317 cpool)) { 318 noit_hash_retrieve(&ds_conns, (const char *)queue_name, 319 strlen(queue_name), &vcpool); 320 } 321 pthread_mutex_unlock(&ds_conns_lock); 322 if(vcpool != cpool) { 323 /* someone beat us to it */ 324 free(cpool->queue_name); 325 pthread_mutex_destroy(&cpool->lock); 326 pthread_cond_destroy(&cpool->cv); 327 free(cpool); 328 } 329 else { 330 int i; 331 /* Our job to setup the pool */ 332 cpool->jobq = calloc(1, sizeof(*cpool->jobq)); 333 eventer_jobq_init(cpool->jobq, queue_name); 334 cpool->jobq->backq = eventer_default_backq(); 335 /* Add one thread */ 336 for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++) 337 eventer_jobq_increase_concurrency(cpool->jobq); 338 } 339 cpool = vcpool; 340 } 341 return cpool; 342 } 343 static conn_q * 344 get_conn_q_for_remote(const char *remote_str, 345 const char *remote_cn, const char *fqdn, 346 const char *dsn) { 347 conn_pool *cpool; 348 conn_q *cq; 349 cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn); 350 again: 351 noitL(noit_debug, "[%p] requesting [%s]\n", (void *)pthread_self(), 352 cpool->queue_name); 353 pthread_mutex_lock(&cpool->lock); 354 if(cpool->head) { 355 assert(cpool->in_pool > 0); 356 cq = cpool->head; 357 cpool->head = cq->next; 358 cpool->in_pool--; 359 cpool->outstanding++; 360 cq->next = NULL; 361 pthread_mutex_unlock(&cpool->lock); 362 return cq; 363 } 364 if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) { 365 noitL(noit_debug, "[%p] over-subscribed, waiting [%s]\n", 366 (void *)pthread_self(), cpool->queue_name); 367 pthread_cond_wait(&cpool->cv, &cpool->lock); 368 goto again; 171 369 } 172 370 else { 173 /* This is a dummy connection */ 174 remote = (struct sockaddr *)__zeros; 175 snprintf(queue_name, sizeof(queue_name), "datastore_default"); 176 len = 4; 177 } 178 if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, &vcq)) 179 return vcq; 371 cpool->outstanding++; 372 pthread_mutex_unlock(&cpool->lock); 373 } 374 180 375 cq = calloc(1, sizeof(*cq)); 181 cq->remote = malloc(len); 182 memcpy(cq->remote, remote, len); 183 cq->jobq = calloc(1, sizeof(*cq->jobq)); 184 eventer_jobq_init(cq->jobq, queue_name); 185 cq->jobq->backq = eventer_default_backq(); 186 /* Add one thread */ 187 eventer_jobq_increase_concurrency(cq->jobq); 188 noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq); 376 cq->pool = cpool; 377 cq->remote_str = remote_str ? strdup(remote_str) : NULL; 378 cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL; 379 cq->fqdn = fqdn ? strdup(fqdn) : NULL; 380 cq->dsn = dsn ? strdup(dsn) : NULL; 189 381 return cq; 382 } 383 static conn_q * 384 get_conn_q_for_metanode() { 385 return get_conn_q_for_remote(NULL,NULL,NULL,NULL); 190 386 } 191 387 … … 243 439 if(d->rv != PGRES_COMMAND_OK && \ 244 440 d->rv != PGRES_TUPLES_OK) { \ 245 noitL( noit_error, "stratcon datasource bad (%d): %s\n'%s'\n", \441 noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s'\n", \ 246 442 d->rv, PQresultErrorMessage(d->res), cmd); \ 247 443 PQclear(d->res); \ … … 262 458 if(d->rv != PGRES_COMMAND_OK && \ 263 459 d->rv != PGRES_TUPLES_OK) { \ 264 noitL( noit_error, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \460 noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \ 265 461 d->rv, PQresultErrorMessage(d->res), cmdbuf, \ 266 462 (long long unsigned)whence); \ … … 274 470 struct timeval *now) { 275 471 conn_q *cq = closure; 276 ds_ job_detail *d;472 ds_single_detail *d; 277 473 int i, row_count = 0, good = 0; 278 474 char buff[1024]; … … 315 511 316 512 /* stratcon_iep_line_processor takes an allocated operand and frees it */ 317 stratcon_iep_line_processor(DS_OP_INSERT, sin, strdup(buff));513 stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL); 318 514 good++; 319 515 } 320 516 noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count); 321 517 bad_row: 322 PQclear(d->res);518 free_params((ds_single_detail *)d); 323 519 free(d); 520 release_conn_q(cq); 324 521 return 0; 325 522 } … … 328 525 eventer_t e; 329 526 conn_q *cq; 330 cq = __get_conn_q_for_remote(NULL);527 cq = get_conn_q_for_metanode(); 331 528 332 529 e = eventer_alloc(); … … 334 531 e->callback = stratcon_datastore_asynch_drive_iep; 335 532 e->closure = cq; 336 eventer_add_asynch(cq-> jobq, e);533 eventer_add_asynch(cq->pool->jobq, e); 337 534 } 338 535 execute_outcome_t 339 stratcon_datastore_find(conn_q *cq, ds_ job_detail *d) {536 stratcon_datastore_find(conn_q *cq, ds_rt_detail *d) { 340 537 char *val; 341 538 int row_count; 342 343 if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid); 539 struct realtime_tracker *node; 540 344 541 GET_QUERY(check_find); 345 PG_EXEC(check_find); 346 row_count = PQntuples(d->res); 347 if(row_count != 1) goto bad_row; 348 349 /* Get the check uuid */ 350 PG_GET_STR_COL(val, 0, "id"); 351 if(!val) goto bad_row; 352 if(uuid_parse(val, d->rt->checkid)) goto bad_row; 353 354 /* Get the remote_address (which noit owns this) */ 355 PG_GET_STR_COL(val, 0, "remote_address"); 356 if(!val) goto bad_row; 357 d->rt->noit = strdup(val); 358 359 PQclear(d->res); 542 for(node = d->rt; node; node = node->next) { 543 DECLARE_PARAM_INT(node->sid); 544 PG_EXEC(check_find); 545 row_count = PQntuples(d->res); 546 if(row_count != 1) { 547 PQclear(d->res); 548 goto bad_row; 549 } 550 551 /* Get the check uuid */ 552 PG_GET_STR_COL(val, 0, "id"); 553 if(!val) { 554 PQclear(d->res); 555 goto bad_row; 556 } 557 if(uuid_parse(val, node->checkid)) { 558 PQclear(d->res); 559 goto bad_row; 560 } 561 562 /* Get the remote_address (which noit owns this) */ 563 PG_GET_STR_COL(val, 0, "remote_address"); 564 if(!val) { 565 PQclear(d->res); 566 goto bad_row; 567 } 568 node->noit = strdup(val); 569 570 bad_row: 571 free_params((ds_single_detail *)d); 572 d->nparams = 0; 573 } 360 574 return DS_EXEC_SUCCESS; 361 bad_row:362 return DS_EXEC_ROW_FAILED;363 575 } 364 576 execute_outcome_t 365 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {577 stratcon_datastore_execute(conn_q *cq, const char *r, ds_line_detail *d) { 366 578 int type, len; 367 579 char *final_buff; 368 uLong final_len, actual_final_len; ;580 uLong final_len, actual_final_len; 369 581 char *token; 582 char raddr_blank[1] = ""; 583 const char *raddr; 370 584 371 585 type = d->data[0]; 586 raddr = r ? r : raddr_blank; 372 587 373 588 /* Parse the log line, but only if we haven't already */ 374 589 if(!d->nparams) { 375 char raddr[128];376 590 char *scp, *ecp; 377 591 378 /* setup our remote address */379 raddr[0] = '\0';380 switch(r->sa_family) {381 case AF_INET:382 inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),383 raddr, sizeof(raddr));384 break;385 case AF_INET6:386 inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),387 raddr, sizeof(raddr));388 break;389 default:390 noitL(noit_error, "remote address of family %d\n", r->sa_family);391 }392 393 592 scp = d->data; 394 593 #define PROCESS_NEXT_FIELD(t,l) do { \ … … 547 746 } 548 747 static int 748 stratcon_database_post_connect(conn_q *cq) { 749 int rv = 0; 750 ds_single_detail _d = { 0 }, *d = &_d; 751 if(cq->remote_str) { 752 char *remote_str, *remote_cn; 753 /* This is the silly way we get null's in through our declare_param_str */ 754 remote_str = cq->remote_str ? cq->remote_str : "[[null]]"; 755 remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]"; 756 /* This is a storage node, it gets the storage node post_connect */ 757 GET_QUERY(storage_post_connect); 758 rv = -1; /* now we're serious */ 759 DECLARE_PARAM_STR(remote_str, strlen(remote_str)); 760 DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 761 PG_EXEC(storage_post_connect); 762 PQclear(d->res); 763 rv = 0; 764 } 765 else { 766 /* Metanode post_connect */ 767 GET_QUERY(metanode_post_connect); 768 rv = -1; /* now we're serious */ 769 PG_EXEC(metanode_post_connect); 770 PQclear(d->res); 771 rv = 0; 772 } 773 bad_row: 774 free_params(d); 775 if(rv == -1) { 776 /* Post-connect intentions are serious and fatal */ 777 PQfinish(cq->dbh); 778 cq->dbh = NULL; 779 } 780 return rv; 781 } 782 static int 549 783 stratcon_database_connect(conn_q *cq) { 550 char dsn[512];784 char *dsn, dsn_meta[512]; 551 785 noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 552 786 const char *k, *v; … … 554 788 noit_hash_table *t; 555 789 556 dsn[0] = '\0'; 557 t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 558 while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 559 if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); 560 strlcat(dsn, k, sizeof(dsn)); 561 strlcat(dsn, "=", sizeof(dsn)); 562 strlcat(dsn, v, sizeof(dsn)); 563 } 564 noit_hash_destroy(t, free, free); 565 free(t); 790 dsn_meta[0] = '\0'; 791 if(!cq->dsn) { 792 t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 793 while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 794 if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta)); 795 strlcat(dsn_meta, k, sizeof(dsn_meta)); 796 strlcat(dsn_meta, "=", sizeof(dsn_meta)); 797 strlcat(dsn_meta, v, sizeof(dsn_meta)); 798 } 799 noit_hash_destroy(t, free, free); 800 free(t); 801 dsn = dsn_meta; 802 } 803 else dsn = cq->dsn; 566 804 567 805 if(cq->dbh) { 568 806 if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 569 807 PQreset(cq->dbh); 808 if(stratcon_database_post_connect(cq)) return -1; 570 809 if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 571 810 noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", … … 576 815 cq->dbh = PQconnectdb(dsn); 577 816 if(!cq->dbh) return -1; 817 if(stratcon_database_post_connect(cq)) return -1; 578 818 if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 579 819 noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", … … 625 865 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, 626 866 struct timeval *now) { 627 conn_q *cq= closure;628 ds_job_detail *current, *next;867 ds_rt_detail *dsjd = closure; 868 conn_q *cq = dsjd->cq; 629 869 if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 630 870 631 if(!cq->head) return 0;632 633 871 stratcon_database_connect(cq); 634 635 current = cq->head; 636 while(current) { 637 if(current->rt) { 638 next = current->next; 639 stratcon_datastore_find(cq, current); 640 current = next; 641 } 642 else if(current->completion_event) { 643 next = current->next; 644 eventer_add(current->completion_event); 645 current = next; 646 __remove_until(cq, current); 647 } 648 else current = current->next; 649 } 872 assert(dsjd->rt); 873 stratcon_datastore_find(cq, dsjd); 874 if(dsjd->completion_event) 875 eventer_add(dsjd->completion_event); 876 877 free_params((ds_single_detail *)dsjd); 878 free(dsjd); 879 release_conn_q(cq); 650 880 return 0; 881 } 882 static const char * 883 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) { 884 void *vinfo; 885 const char *dsn = NULL, *fqdn = NULL; 886 int found = 0; 887 storagenode_info *info = NULL; 888 pthread_mutex_lock(&fqdn_to_info_cache_lock); 889 if(noit_hash_retrieve(&fqdn_to_info_cache, (void *)&id, sizeof(id), 890 &vinfo)) { 891 found = 1; 892 info = vinfo; 893 } 894 pthread_mutex_unlock(&fqdn_to_info_cache_lock); 895 if(found) { 896 if(fqdn_out) *fqdn_out = info->fqdn; 897 return info->dsn; 898 } 899 900 if(!found && can_use_db) { 901 ds_single_detail *d; 902 conn_q *cq; 903 int row_count; 904 /* Look it up and store it */ 905 d = calloc(1, sizeof(*d)); 906 cq = get_conn_q_for_metanode(); 907 GET_QUERY(find_storage); 908 DECLARE_PARAM_INT(id); 909 PG_EXEC(find_storage); 910 row_count = PQntuples(d->res); 911 if(row_count) { 912 PG_GET_STR_COL(dsn, 0, "dsn"); 913 PG_GET_STR_COL(fqdn, 0, "fqdn"); 914 } 915 PQclear(d->res); 916 bad_row: 917 free_params(d); 918 free(d); 919 } 920 if(fqdn) { 921 info = calloc(1, sizeof(*info)); 922 info->fqdn = strdup(fqdn); 923 if(fqdn_out) *fqdn_out = info->fqdn; 924 info->dsn = dsn ? strdup(dsn) : NULL; 925 info->storagenode_id = id; 926 pthread_mutex_lock(&fqdn_to_info_cache_lock); 927 noit_hash_store(&fqdn_to_info_cache, 928 (void *)&info->storagenode_id, sizeof(int), info); 929 pthread_mutex_unlock(&fqdn_to_info_cache_lock); 930 } 931 return info ? info->dsn : NULL; 932 } 933 static ds_line_detail * 934 build_insert_batch(interim_journal_t *ij) { 935 int rv; 936 off_t len; 937 const char *buff, *cp, *lcp; 938 struct stat st; 939 ds_line_detail *head = NULL, *last = NULL, *next = NULL; 940 941 while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR); 942 assert(rv != -1); 943 len = st.st_size; 944 buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0); 945 if(buff == (void *)-1) { 946 noitL(noit_error, "mmap(%s) => %s\n", ij->filename, strerror(errno)); 947 assert(buff != (void *)-1); 948 } 949 lcp = buff; 950 while(lcp < (buff + len) && 951 NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) { 952 next = calloc(1, sizeof(*next)); 953 next->data = malloc(cp - lcp + 1); 954 memcpy(next->data, lcp, cp - lcp); 955 next->data[cp - lcp] = '\0'; 956 if(!head) head = next; 957 if(last) last->next = next; 958 last = next; 959 lcp = cp + 1; 960 } 961 munmap((void *)buff, len); 962 return head; 963 } 964 static void 965 interim_journal_remove(interim_journal_t *ij) { 966 unlink(ij->filename); 967 if(ij->filename) free(ij->filename); 968 if(ij->remote_str) free(ij->remote_str); 969 if(ij->remote_cn) free(ij->remote_cn); 970 if(ij->fqdn) free(ij->fqdn); 651 971 } 652 972 int … … 654 974 struct timeval *now) { 655 975 int i; 656 conn_q *cq = closure; 657 ds_job_detail *current, *last_sp; 976 interim_journal_t *ij; 977 ds_line_detail *head, *current, *last_sp; 978 const char *dsn; 979 conn_q *cq; 658 980 if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 659 981 if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 660 if(!cq->head) return 0; 661 982 983 ij = closure; 984 dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn); 985 cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn, 986 ij->fqdn, dsn); 987 noitL(noit_debug, "stratcon_datastore_asynch_execute[%s,%s,%s]\n", 988 ij->remote_str, ij->remote_cn, ij->fqdn); 662 989 full_monty: 663 990 /* Make sure we have a connection */ … … 670 997 } 671 998 672 current = cq->head; 999 head = build_insert_batch(ij); 1000 current = head; 673 1001 last_sp = NULL; 674 1002 if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq); … … 679 1007 680 1008 if(current->problematic) { 681 noitL( noit_error, "[%s] Failed noit line: %s", cq->jobq->queue_name, current->data);1009 noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data); 682 1010 RELEASE_SAVEPOINT("batch"); 683 1011 current = current->next; 684 1012 continue; 685 1013 } 686 rv = stratcon_datastore_execute(cq, cq->remote , current);1014 rv = stratcon_datastore_execute(cq, cq->remote_str, current); 687 1015 switch(rv) { 688 1016 case DS_EXEC_SUCCESS: … … 699 1027 } 700 1028 } 701 if(current->completion_event) { 702 if(last_sp) RELEASE_SAVEPOINT("batch"); 703 if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); 704 eventer_add(current->completion_event); 705 current = current->next; 706 __remove_until(cq, current); 707 } 708 } 1029 } 1030 if(last_sp) RELEASE_SAVEPOINT("batch"); 1031 if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); 1032 /* Cleanup the mess */ 1033 while(head) { 1034 ds_line_detail *tofree; 1035 tofree = head; 1036 head = head->next; 1037 if(tofree->data) free(tofree->data); 1038 free_params((ds_single_detail *)tofree); 1039 free(tofree); 1040 } 1041 interim_journal_remove(ij); 1042 release_conn_q(cq); 709 1043 return 0; 1044 } 1045 static int 1046 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure, 1047 struct timeval *now) { 1048 noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 1049 const char *k; 1050 int klen; 1051 void *vij; 1052 interim_journal_t *ij; 1053 syncset_t *syncset = closure; 1054 1055 if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 1056 if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 1057 1058 noitL(noit_debug, "Syncing journal sets...\n"); 1059 while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) { 1060 eventer_t ingest; 1061 ij = vij; 1062 noitL(noit_debug, "Syncing journal set [%s,%s,%s]\n", 1063 ij->remote_str, ij->remote_cn, ij->fqdn); 1064 fsync(ij->fd); 1065 ingest = eventer_alloc(); 1066 ingest->mask = EVENTER_ASYNCH; 1067 ingest->callback = stratcon_datastore_asynch_execute; 1068 ingest->closure = ij; 1069 eventer_add_asynch(ij->cpool->jobq, ingest); 1070 } 1071 noit_hash_destroy(syncset->ws, free, NULL); 1072 free(syncset->ws); 1073 eventer_add(syncset->completion); 1074 free(syncset); 1075 return 0; 1076 } 1077 static interim_journal_t * 1078 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in, 1079 int storagenode_id, const char *fqdn_in) { 1080 void *vhash, *vij; 1081 noit_hash_table *working_set; 1082 interim_journal_t *ij; 1083 struct timeval now; 1084 char jpath[PATH_MAX]; 1085 char remote_str[128]; 1086 const char *remote_cn = remote_cn_in ? remote_cn_in : "default"; 1087 const char *fqdn = fqdn_in ? fqdn_in : "default"; 1088 1089 convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote); 1090 if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str)); 1091 1092 /* Lookup the working set */ 1093 if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) { 1094 working_set = calloc(1, sizeof(*working_set)); 1095 noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn), 1096 working_set); 1097 } 1098 else 1099 working_set = vhash; 1100 1101 /* Lookup the interim journal within the working set */ 1102 if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) { 1103 ij = calloc(1, sizeof(*ij)); 1104 gettimeofday(&now, NULL); 1105 snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x", 1106 basejpath, remote_str, remote_cn, storagenode_id, 1107 (unsigned int)now.tv_sec, (unsigned int)now.tv_usec); 1108 ij->remote_str = strdup(remote_str); 1109 ij->remote_cn = strdup(remote_cn); 1110 ij->fqdn = strdup(fqdn); 1111 ij->storagenode_id = storagenode_id; 1112 ij->filename = strdup(jpath); 1113 ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 1114 ij->fqdn); 1115 ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640); 1116 if(ij->fd < 0 && errno == ENOENT) { 1117 if(mkdir_for_file(ij->filename, 0750)) { 1118 noitL(noit_error, "Failed to create dir for '%s': %s\n", 1119 ij->filename, strerror(errno)); 1120 exit(-1); 1121 } 1122 ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640); 1123 } 1124 if(ij->fd < 0) { 1125 noitL(noit_error, "Failed to open interim journal '%s': %s\n", 1126 ij->filename, strerror(errno)); 1127 exit(-1); 1128 } 1129 noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij); 1130 } 1131 else 1132 ij = vij; 1133 1134 return ij; 1135 } 1136 static void 1137 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn, 1138 int *sid_out, int *storagenode_id_out, 1139 char **fqdn_out, char **dsn_out) { 1140 /* only called from the main thread -- no safety issues */ 1141 void *vuuidinfo, *vinfo; 1142 uuid_info *uuidinfo; 1143 storagenode_info *info = NULL; 1144 char *fqdn = NULL; 1145 char *dsn = NULL; 1146 int storagenode_id = 0, sid = 0; 1147 if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str), 1148 &vuuidinfo)) { 1149 int row_count; 1150 char *tmpint; 1151 ds_single_detail *d; 1152 conn_q *cq; 1153 d = calloc(1, sizeof(*d)); 1154 cq = get_conn_q_for_metanode(); 1155 if(stratcon_database_connect(cq) == 0) { 1156 /* Blocking call to service the cache miss */ 1157 GET_QUERY(check_map); 1158 DECLARE_PARAM_STR(uuid_str, strlen(uuid_str)); 1159 DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 1160 PG_EXEC(check_map); 1161 row_count = PQntuples(d->res); 1162 if(row_count != 1) { 1163 PQclear(d->res); 1164 goto bad_row; 1165 } 1166 PG_GET_STR_COL(tmpint, 0, "sid"); 1167 sid = atoi(tmpint); 1168 PG_GET_STR_COL(tmpint, 0, "storage_node_id"); 1169 if(tmpint) storagenode_id = atoi(tmpint); 1170 PG_GET_STR_COL(fqdn, 0, "fqdn"); 1171 PG_GET_STR_COL(dsn, 0, "dsn"); 1172 PQclear(d->res); 1173 } 1174 bad_row: 1175 free_params((ds_single_detail *)d); 1176 free(d); 1177 release_conn_q(cq); 1178 /* Place in cache */ 1179 if(fqdn) fqdn = strdup(fqdn); 1180 uuidinfo = calloc(1, sizeof(*uuidinfo)); 1181 uuidinfo->sid = sid; 1182 uuidinfo->uuid_str = strdup(uuid_str); 1183 noit_hash_store(&uuid_to_info_cache, 1184 uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 1185 /* Also, we may have just witnessed a new storage node, store it */ 1186 if(storagenode_id) { 1187 int needs_free = 0; 1188 info = calloc(1, sizeof(*info)); 1189 info->storagenode_id = storagenode_id; 1190 info->dsn = dsn ? strdup(dsn) : NULL; 1191 info->fqdn = fqdn ? strdup(fqdn) : NULL; 1192 pthread_mutex_lock(&fqdn_to_info_cache_lock); 1193 if(!noit_hash_retrieve(&fqdn_to_info_cache, 1194 (void *)&storagenode_id, sizeof(int), &vinfo)) { 1195 /* hack to save memory -- we *never* remove from these caches, 1196 so we can use the same fqdn value in the above cache for the key 1197 in the cache below -- (no strdup) */ 1198 noit_hash_store(&fqdn_to_info_cache, 1199 (void *)&info->storagenode_id, sizeof(int), info); 1200 } 1201 else needs_free = 1; 1202 pthread_mutex_unlock(&fqdn_to_info_cache_lock); 1203 if(needs_free) { 1204 if(info->dsn) free(info->dsn); 1205 if(info->fqdn) free(info->fqdn); 1206 free(info); 1207 } 1208 } 1209 } 1210 else 1211 uuidinfo = vuuidinfo; 1212 1213 if(storagenode_id) { 1214 if(uuidinfo && 1215 ((!dsn && dsn_out) || (!fqdn && fqdn_out))) { 1216 /* we don't have dsn and we actually want it */ 1217 pthread_mutex_lock(&fqdn_to_info_cache_lock); 1218 if(noit_hash_retrieve(&fqdn_to_info_cache, 1219 (void *)&storagenode_id, sizeof(int), &vinfo)) 1220 info = vinfo; 1221 pthread_mutex_unlock(&fqdn_to_info_cache_lock); 1222 } 1223 } 1224 1225 if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL); 1226 if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL); 1227 if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id; 1228 if(sid_out) *sid_out = uuidinfo->sid; 1229 } 1230 static void 1231 stratcon_datastore_journal(struct sockaddr *remote, 1232 const char *remote_cn, const char *line) { 1233 interim_journal_t *ij = NULL; 1234 char uuid_str[UUID_STR_LEN+1], *cp, *fqdn, *dsn; 1235 int storagenode_id = 0; 1236 uuid_t checkid; 1237 if(!line) return; 1238 /* if it is a UUID based thing, find the storage node */ 1239 switch(*line) { 1240 case 'C': 1241 case 'S': 1242 case 'M': 1243 if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) { 1244 strlcpy(uuid_str, cp + 1, sizeof(uuid_str)); 1245 if(!uuid_parse(uuid_str, checkid)) { 1246 storage_node_quick_lookup(uuid_str, remote_cn, NULL, 1247 &storagenode_id, &fqdn, &dsn); 1248 ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn); 1249 } 1250 } 1251 break; 1252 case 'n': 1253 ij = interim_journal_get(remote,remote_cn,0,NULL); 1254 break; 1255 default: 1256 break; 1257 } 1258 if(!ij && fqdn) { 1259 noitL(ingest_err, "%d\t%s\n", storagenode_id, line); 1260 } 1261 else { 1262 int len; 1263 len = write(ij->fd, line, strlen(line)); 1264 if(len < 0) { 1265 noitL(noit_error, "write to %s failed: %s\n", 1266 ij->filename, strerror(errno)); 1267 } 1268 } 1269 return; 1270 } 1271 static noit_hash_table * 1272 stratcon_datastore_journal_remove(struct sockaddr *remote, 1273 const char *remote_cn) { 1274 void *vhash = NULL; 1275 if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) { 1276 /* pluck it out */ 1277 noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL); 1278 } 1279 else { 1280 noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n", 1281 remote_cn); 1282 abort(); 1283 } 1284 return vhash; 710 1285 } 711 1286 void 712 1287 stratcon_datastore_push(stratcon_datastore_op_t op, 713 struct sockaddr *remote, void *operand) { 1288 struct sockaddr *remote, 1289 const char *remote_cn, void *operand, 1290 eventer_t completion) { 714 1291 conn_q *cq; 1292 syncset_t *syncset; 715 1293 eventer_t e; 716 ds_ job_detail *dsjd;1294 ds_rt_detail *rtdetail; 717 1295 struct datastore_onlooker_list *nnode; 718 1296 719 1297 for(nnode = onlookers; nnode; nnode = nnode->next) 720 nnode->dispatch(op,remote,operand); 721 722 cq = __get_conn_q_for_remote(remote); 723 dsjd = calloc(1, sizeof(*dsjd)); 1298 nnode->dispatch(op,remote,remote_cn,operand); 1299 724 1300 switch(op) { 725 case DS_OP_FIND: 726 dsjd->rt = operand; 727 __append(cq, dsjd); 1301 case DS_OP_INSERT: 1302 stratcon_datastore_journal(remote, remote_cn, (const char *)operand); 728 1303 break; 729 case DS_OP_INSERT: 730 dsjd->data = operand; 731 __append(cq, dsjd); 1304 case DS_OP_CHKPT: 1305 e = eventer_alloc(); 1306 syncset = calloc(1, sizeof(*syncset)); 1307 e->mask = EVENTER_ASYNCH; 1308 e->callback = stratcon_datastore_journal_sync; 1309 syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn); 1310 syncset->completion = completion; 1311 e->closure = syncset; 1312 eventer_add(e); 732 1313 break; 733 1314 case DS_OP_FIND_COMPLETE: 734 case DS_OP_CHKPT: 735 dsjd->completion_event = operand; 736 __append(cq,dsjd); 1315 cq = get_conn_q_for_metanode(); 1316 rtdetail = calloc(1, sizeof(*rtdetail)); 1317 rtdetail->rt = operand; 1318 rtdetail->completion_event = completion; 737 1319 e = eventer_alloc(); 738 1320 e->mask = EVENTER_ASYNCH; 739 if(op == DS_OP_FIND_COMPLETE) 740 e->callback = stratcon_datastore_asynch_lookup; 741 else if(op == DS_OP_CHKPT) 742 e->callback = stratcon_datastore_asynch_execute; 743 e->closure = cq; 744 eventer_add_asynch(cq->jobq, e); 1321 e->callback = stratcon_datastore_asynch_lookup; 1322 e->closure = rtdetail; 1323 eventer_add_asynch(cq->pool->jobq, e); 745 1324 break; 746 1325 } … … 750 1329 stratcon_datastore_saveconfig(void *unused) { 751 1330 int rv = -1; 752 conn_q _cq = { 0 }, *cq = &_cq;753 1331 char *buff; 754 1332 ds_single_detail _d = { 0 }, *d = &_d; 1333 conn_q *cq; 1334 cq = get_conn_q_for_metanode(); 755 1335 756 1336 if(stratcon_database_connect(cq) == 0) { … … 775 1355 free_params(d); 776 1356 } 777 if(cq->dbh) PQfinish(cq->dbh);1357 release_conn_q(cq); 778 1358 return rv; 779 1359 } … … 781 1361 void 782 1362 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t, 783 struct sockaddr *, void *)) { 1363 struct sockaddr *, 1364 const char *, void *)) { 784 1365 struct datastore_onlooker_list *nnode; 785 1366 nnode = calloc(1, sizeof(*nnode)); … … 789 1370 nnode->next = onlookers; 790 1371 } 1372 static void 1373 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn, 1374 char *id_str, char *file) { 1375 char path[PATH_MAX]; 1376 interim_journal_t *ij; 1377 eventer_t ingest; 1378 1379 snprintf(path, sizeof(path), "%s/%s/%s/%s/%s", 1380 basejpath, remote_str, remote_cn, id_str, file); 1381 ij = calloc(1, sizeof(*ij)); 1382 ij->fd = open(path, O_RDONLY); 1383 if(ij->fd < 0) { 1384 noitL(noit_error, "cannot open journal '%s': %s\n", 1385 path, strerror(errno)); 1386 free(ij); 1387 return; 1388 } 1389 ij->filename = strdup(path); 1390 ij->remote_str = strdup(remote_str); 1391 ij->remote_cn = strdup(remote_cn); 1392 ij->storagenode_id = atoi(id_str); 1393 ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 1394 ij->fqdn); 1395 1396 noitL(noit_error, "ingesting old payload: %s\n", ij->filename); 1397 ingest = eventer_alloc(); 1398 ingest->mask = EVENTER_ASYNCH; 1399 ingest->callback = stratcon_datastore_asynch_execute; 1400 ingest->closure = ij; 1401 eventer_add_asynch(ij->cpool->jobq, ingest); 1402 } 1403 static void 1404 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) { 1405 char path[PATH_MAX]; 1406 DIR *root; 1407 struct dirent de, *entry; 1408 int i = 0, cnt = 0; 1409 char **entries; 1410 1411 snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 1412 first ? "/" : "", first ? first : "", 1413 second ? "/" : "", second ? second : "", 1414 third ? "/" : "", third ? third : ""); 1415 root = opendir(path); 1416 if(!root) return; 1417 while(readdir_r(root, &de, &entry) == 0 && entry != NULL) cnt++; 1418 rewinddir(root); 1419 entries = malloc(sizeof(*entries) * cnt); 1420 while(readdir_r(root, &de, &entry) == 0 && entry != NULL) { 1421 if(i < cnt) { 1422 entries[i++] = strdup(entry->d_name); 1423 } 1424 } 1425 closedir(root); 1426 cnt = i; /* could have changed, directories are fickle */ 1427 qsort(entries, i, sizeof(*entries), 1428 (int (*)(const void *, const void *))strcasecmp); 1429 for(i=0; i<cnt; i++) { 1430 if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 1431 if(!first) 1432 stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL); 1433 else if(!second) 1434 stratcon_datastore_sweep_journals_int(first, entries[i], NULL); 1435 else if(!third) 1436 stratcon_datastore_sweep_journals_int(first, second, entries[i]); 1437 else if(strlen(entries[i]) == 16) 1438 stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]); 1439 } 1440 } 1441 static void 1442 stratcon_datastore_sweep_journals() { 1443 stratcon_datastore_sweep_journals_int(NULL,NULL,NULL); 1444 } 1445 1446 int 1447 stratcon_datastore_ingest_all_storagenode_info() { 1448 int i, cnt; 1449 ds_single_detail _d = { 0 }, *d = &_d; 1450 conn_q *cq; 1451 cq = get_conn_q_for_metanode(); 1452 1453 while(stratcon_database_connect(cq)) { 1454 noitL(noit_error, "Error connecting to database\n"); 1455 sleep(1); 1456 } 1457 1458 GET_QUERY(all_storage); 1459 PG_EXEC(all_storage); 1460 cnt = PQntuples(d->res); 1461 for(i=0; i<cnt; i++) { 1462 void *vinfo; 1463 char *tmpint, *fqdn, *dsn; 1464 int storagenode_id; 1465 PG_GET_STR_COL(tmpint, i, "storage_node_id"); 1466 storagenode_id = atoi(tmpint); 1467 PG_GET_STR_COL(fqdn, i, "fqdn"); 1468 PG_GET_STR_COL(dsn, i, "dsn"); 1469 PG_GET_STR_COL(tmpint, i, "storage_node_id"); 1470 storagenode_id = tmpint ? atoi(tmpint) : 0; 1471 1472 if(!noit_hash_retrieve(&fqdn_to_info_cache, 1473 (void *)&storagenode_id, sizeof(int), &vinfo)) { 1474 storagenode_info *info; 1475 info = calloc(1, sizeof(*info)); 1476 info->storagenode_id = storagenode_id; 1477 info->fqdn = fqdn ? strdup(fqdn) : NULL; 1478 info->dsn = dsn ? strdup(dsn) : NULL; 1479 noit_hash_store(&fqdn_to_info_cache, 1480 (void *)&info->storagenode_id, sizeof(int), info); 1481 } 1482 } 1483 PQclear(d->res); 1484 bad_row: 1485 free_params(d); 1486 1487 release_conn_q(cq); 1488 noitL(noit_error, "Loaded %d storage nodes\n", cnt); 1489 return cnt; 1490 } 1491 int 1492 stratcon_datastore_ingest_all_check_info() { 1493 int i, cnt, loaded = 0; 1494 ds_single_detail _d = { 0 }, *d = &_d; 1495 conn_q *cq; 1496 cq = get_conn_q_for_metanode(); 1497 1498 while(stratcon_database_connect(cq)) { 1499 noitL(noit_error, "Error connecting to database\n"); 1500 sleep(1); 1501 } 1502 1503 GET_QUERY(check_mapall); 1504 PG_EXEC(check_mapall); 1505 cnt = PQntuples(d->res); 1506 for(i=0; i<cnt; i++) { 1507 void *vinfo; 1508 char *tmpint, *fqdn, *dsn, *uuid_str; 1509 int sid, storagenode_id; 1510 uuid_info *uuidinfo; 1511 PG_GET_STR_COL(uuid_str, i, "id"); 1512 if(!uuid_str) continue; 1513 PG_GET_STR_COL(tmpint, i, "sid"); 1514 if(!tmpint) continue; 1515 sid = atoi(tmpint); 1516 PG_GET_STR_COL(fqdn, i, "fqdn"); 1517 PG_GET_STR_COL(dsn, i, "dsn"); 1518 PG_GET_STR_COL(tmpint, i, "storage_node_id"); 1519 storagenode_id = tmpint ? atoi(tmpint) : 0; 1520 1521 uuidinfo = calloc(1, sizeof(*uuidinfo)); 1522 uuidinfo->uuid_str = strdup(uuid_str); 1523 uuidinfo->sid = sid; 1524 noit_hash_store(&uuid_to_info_cache, 1525 uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 1526 loaded++; 1527 if(!noit_hash_retrieve(&fqdn_to_info_cache, 1528 (void *)&storagenode_id, sizeof(int), &vinfo)) { 1529 storagenode_info *info; 1530 info = calloc(1, sizeof(*info)); 1531 info->storagenode_id = storagenode_id; 1532 info->fqdn = fqdn ? strdup(fqdn) : NULL; 1533 info->dsn = dsn ? strdup(dsn) : NULL; 1534 noit_hash_store(&fqdn_to_info_cache, 1535 (void *)&info->storagenode_id, sizeof(int), info); 1536 } 1537 } 1538 PQclear(d->res); 1539 bad_row: 1540 free_params(d); 1541 1542 release_conn_q(cq); 1543 noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded); 1544 return loaded; 1545 } 1546 void 1547 stratcon_datastore_init() { 1548 pthread_mutex_init(&ds_conns_lock, NULL); 1549 pthread_mutex_init(&fqdn_to_info_cache_lock, NULL); 1550 ds_err = noit_log_stream_find("error/datastore"); 1551 ingest_err = noit_log_stream_find("error/ingest"); 1552 if(!ds_err) ds_err = noit_error; 1553 if(!ingest_err) ingest_err = noit_error; 1554 if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path", 1555 &basejpath)) { 1556 noitL(noit_error, "/stratcon/database/journal/path is unspecified\n"); 1557 exit(-1); 1558 } 1559 stratcon_datastore_ingest_all_check_info(); 1560 stratcon_datastore_ingest_all_storagenode_info(); 1561 stratcon_datastore_sweep_journals(); 1562 } src/stratcon_datastore.h
r88a7178 r5360a1e 44 44 DS_OP_INSERT = 1, 45 45 DS_OP_CHKPT = 2, 46 DS_OP_FIND = 3, 47 DS_OP_FIND_COMPLETE = 4 46 DS_OP_FIND_COMPLETE = 3 48 47 } stratcon_datastore_op_t; 49 48 50 49 API_EXPORT(void) 51 50 stratcon_datastore_push(stratcon_datastore_op_t, 52 struct sockaddr *, void *);51 struct sockaddr *, const char *, void *, eventer_t); 53 52 54 53 API_EXPORT(void) 55 54 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t, 56 struct sockaddr *, void *)); 55 struct sockaddr *, 56 const char *, void *)); 57 58 API_EXPORT(void) 59 stratcon_datastore_init(); 57 60 58 61 API_EXPORT(int) src/stratcon_iep.c
r0335d9d r5360a1e 142 142 } 143 143 noitL(noit_error, "submitting statement: %s\n", line); 144 stratcon_iep_line_processor(DS_OP_INSERT, NULL, line);144 stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL); 145 145 stmt->marked = 1; 146 146 } … … 163 163 /* Phase 1: sweep in all the statements */ 164 164 for(i=0; i<cnt; i++) { 165 char id[UUID_STR_LEN ];165 char id[UUID_STR_LEN+1]; 166 166 char provides[256]; 167 167 char *statement; … … 202 202 /* Phase 2: load the requires graph */ 203 203 for(i=0; i<cnt; i++) { 204 char id[UUID_STR_LEN ];204 char id[UUID_STR_LEN+1]; 205 205 int rcnt, j; 206 206 char *requires; … … 280 280 noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 281 281 for(i=0; i<cnt; i++) { 282 char id[UUID_STR_LEN ];282 char id[UUID_STR_LEN+1]; 283 283 char topic[256]; 284 284 char *query; … … 314 314 query++; 315 315 } 316 stratcon_iep_line_processor(DS_OP_INSERT, NULL, line);316 stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL); 317 317 } 318 318 free(query_configs); … … 419 419 setup_iep_connection_callback(eventer_t e, int mask, void *closure, 420 420 struct timeval *now) { 421 stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL );421 stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, NULL, NULL); 422 422 return 0; 423 423 } … … 526 526 void 527 527 stratcon_iep_line_processor(stratcon_datastore_op_t op, 528 struct sockaddr *remote, void *operand) { 528 struct sockaddr *remote, const char *remote_cn, 529 void *operand, eventer_t completion) { 529 530 int len; 530 531 char remote_str[128]; src/stratcon_iep.h
r8ad126b r5360a1e 48 48 API_EXPORT(void) 49 49 stratcon_iep_line_processor(stratcon_datastore_op_t op, 50 struct sockaddr *remote, void *operand); 50 struct sockaddr *remote, const char *remote_cn, 51 void *operand, eventer_t completion); 51 52 52 53 API_EXPORT(jlog_streamer_ctx_t *) src/stratcon_jlog_streamer.c
ra060696 r5360a1e 371 371 FULLREAD(e, ctx, (unsigned long)ctx->header.message_len); 372 372 if(ctx->header.message_len > 0) 373 ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 373 ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn, 374 ctx->buffer, NULL); 374 375 else if(ctx->buffer) 375 376 free(ctx->buffer); … … 385 386 completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION; 386 387 ctx->state = JLOG_STREAMER_IS_ASYNC; 387 ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 388 ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn, 389 NULL, completion_e); 388 390 noitL(noit_debug, "Pushing batch asynch...\n"); 389 391 return 0; src/stratcon_jlog_streamer.h
rbe29b6f r5360a1e 91 91 u_int64_t total_bytes_read; 92 92 93 void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *);93 void (*push)(stratcon_datastore_op_t, struct sockaddr *, const char *, void *, eventer_t); 94 94 } jlog_streamer_ctx_t; 95 95 src/stratcon_realtime_http.c
r3c56016 r5360a1e 427 427 for(node = rc->checklist; node; node = node->next) { 428 428 noit_atomic_inc32(&ctx->ref_cnt); 429 stratcon_datastore_push(DS_OP_FIND, NULL, node);430 429 noitL(noit_error, "Resolving sid: %d\n", node->sid); 431 430 } … … 435 434 completion->closure = ctx; 436 435 gettimeofday(&completion->whence, NULL); 437 stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion); 436 stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL, 437 rc->checklist, completion); 438 438 } 439 439 return EVENTER_EXCEPTION; src/stratcond.c
r8ad126b r5360a1e 171 171 noit_watchdog_child_eventer_heartbeat(); 172 172 173 stratcon_datastore_init(); 173 174 noit_console_init(APPNAME); 174 175 noit_console_conf_init();
