| 180 | | free_params(ds_single_detail *d) { |
|---|
| 181 | | int i; |
|---|
| 182 | | for(i=0; i<d->nparams; i++) |
|---|
| 183 | | if(d->paramAllocd[i] && d->paramValues[i]) |
|---|
| 184 | | free(d->paramValues[i]); |
|---|
| 185 | | } |
|---|
| 186 | | |
|---|
| 187 | | char *basejpath = NULL; |
|---|
| 188 | | pthread_mutex_t ds_conns_lock; |
|---|
| 189 | | noit_hash_table ds_conns; |
|---|
| 190 | | noit_hash_table working_sets; |
|---|
| 191 | | |
|---|
| 192 | | /* the fqdn cache needs to be thread safe */ |
|---|
| 193 | | typedef struct { |
|---|
| 194 | | char *uuid_str; |
|---|
| 195 | | char *remote_cn; |
|---|
| 196 | | int storagenode_id; |
|---|
| 197 | | int sid; |
|---|
| 198 | | } uuid_info; |
|---|
| 199 | | typedef struct { |
|---|
| 200 | | int storagenode_id; |
|---|
| 201 | | char *fqdn; |
|---|
| 202 | | char *dsn; |
|---|
| 203 | | } storagenode_info; |
|---|
| 204 | | noit_hash_table uuid_to_info_cache; |
|---|
| 205 | | pthread_mutex_t storagenode_to_info_cache_lock; |
|---|
| 206 | | noit_hash_table storagenode_to_info_cache; |
|---|
| 207 | | |
|---|
| 208 | | /* Thread-safe connection pools */ |
|---|
| 209 | | |
|---|
| 210 | | /* Forcefree -> 1 prevents it from going to the pool and it gets freed */ |
|---|
| 211 | | static void |
|---|
| 212 | | release_conn_q_forceable(conn_q *cq, int forcefree) { |
|---|
| 213 | | int putback = 0; |
|---|
| 214 | | cq->last_use = time(NULL); |
|---|
| 215 | | pthread_mutex_lock(&cq->pool->lock); |
|---|
| 216 | | cq->pool->outstanding--; |
|---|
| 217 | | if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) { |
|---|
| 218 | | putback = 1; |
|---|
| 219 | | cq->next = cq->pool->head; |
|---|
| 220 | | cq->pool->head = cq; |
|---|
| 221 | | cq->pool->in_pool++; |
|---|
| 222 | | } |
|---|
| 223 | | pthread_mutex_unlock(&cq->pool->lock); |
|---|
| 224 | | noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(), |
|---|
| 225 | | putback ? "to pool" : "and destroy", cq->pool->queue_name); |
|---|
| 226 | | pthread_cond_signal(&cq->pool->cv); |
|---|
| 227 | | if(putback) return; |
|---|
| 228 | | |
|---|
| 229 | | /* Not put back, release it */ |
|---|
| 230 | | if(cq->dbh) PQfinish(cq->dbh); |
|---|
| 231 | | if(cq->remote_str) free(cq->remote_str); |
|---|
| 232 | | if(cq->remote_cn) free(cq->remote_cn); |
|---|
| 233 | | if(cq->fqdn) free(cq->fqdn); |
|---|
| 234 | | if(cq->dsn) free(cq->dsn); |
|---|
| 235 | | free(cq); |
|---|
| 236 | | } |
|---|
| 237 | | static void |
|---|
| 238 | | ttl_purge_conn_pool(conn_pool *pool) { |
|---|
| 239 | | int old_cnt, new_cnt; |
|---|
| 240 | | time_t now = time(NULL); |
|---|
| 241 | | conn_q *cq, *prev = NULL, *iter; |
|---|
| 242 | | /* because we always replace on the head and update the last_use time when |
|---|
| 243 | | doing so, we know they are ordered LRU on the end. So, once we hit an |
|---|
| 244 | | old one, we know all the others are old too. |
|---|
| 245 | | */ |
|---|
| 246 | | if(!pool->head) return; /* hack short circuit for no locks */ |
|---|
| 247 | | pthread_mutex_lock(&pool->lock); |
|---|
| 248 | | old_cnt = pool->in_pool; |
|---|
| 249 | | cq = pool->head; |
|---|
| 250 | | while(cq) { |
|---|
| 251 | | if(cq->last_use + cq->pool->ttl < now) { |
|---|
| 252 | | if(prev) prev->next = NULL; |
|---|
| 253 | | else pool->head = NULL; |
|---|
| 254 | | break; |
|---|
| 255 | | } |
|---|
| 256 | | prev = cq; |
|---|
| 257 | | cq = cq->next; |
|---|
| 258 | | } |
|---|
| 259 | | /* Now pool->head is a chain of unexpired and cq is a chain of expired */ |
|---|
| 260 | | /* Fix accounting */ |
|---|
| 261 | | for(iter=cq; iter; iter=iter->next) pool->in_pool--; |
|---|
| 262 | | new_cnt = pool->in_pool; |
|---|
| 263 | | pthread_mutex_unlock(&pool->lock); |
|---|
| 264 | | |
|---|
| 265 | | /* Force release these without holding the lock */ |
|---|
| 266 | | while(cq) { |
|---|
| 267 | | cq = cq->next; |
|---|
| 268 | | release_conn_q_forceable(cq, 1); |
|---|
| 269 | | } |
|---|
| 270 | | if(old_cnt != new_cnt) |
|---|
| 271 | | noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt, |
|---|
| 272 | | pool->queue_name); |
|---|
| 273 | | } |
|---|
| 274 | | static void |
|---|
| 275 | | release_conn_q(conn_q *cq) { |
|---|
| 276 | | ttl_purge_conn_pool(cq->pool); |
|---|
| 277 | | release_conn_q_forceable(cq, 0); |
|---|
| 278 | | } |
|---|
| 279 | | static conn_pool * |
|---|
| 280 | | get_conn_pool_for_remote(const char *remote_str, |
|---|
| 281 | | const char *remote_cn, const char *fqdn) { |
|---|
| 282 | | void *vcpool; |
|---|
| 283 | | conn_pool *cpool = NULL; |
|---|
| 284 | | char queue_name[256] = "datastore_"; |
|---|
| 285 | | snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s", |
|---|
| 286 | | (remote_str && *remote_str) ? remote_str : "0.0.0.0", |
|---|
| 287 | | fqdn ? fqdn : "default", |
|---|
| 288 | | remote_cn ? remote_cn : "default"); |
|---|
| 289 | | pthread_mutex_lock(&ds_conns_lock); |
|---|
| 290 | | if(noit_hash_retrieve(&ds_conns, (const char *)queue_name, |
|---|
| 291 | | strlen(queue_name), &vcpool)) |
|---|
| 292 | | cpool = vcpool; |
|---|
| 293 | | pthread_mutex_unlock(&ds_conns_lock); |
|---|
| 294 | | if(!cpool) { |
|---|
| 295 | | vcpool = cpool = calloc(1, sizeof(*cpool)); |
|---|
| 296 | | cpool->queue_name = strdup(queue_name); |
|---|
| 297 | | pthread_mutex_init(&cpool->lock, NULL); |
|---|
| 298 | | pthread_cond_init(&cpool->cv, NULL); |
|---|
| 299 | | cpool->in_pool = 0; |
|---|
| 300 | | cpool->outstanding = 0; |
|---|
| 301 | | cpool->max_in_pool = 1; |
|---|
| 302 | | cpool->max_allocated = 1; |
|---|
| 303 | | pthread_mutex_lock(&ds_conns_lock); |
|---|
| 304 | | if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name), |
|---|
| 305 | | cpool)) { |
|---|
| 306 | | noit_hash_retrieve(&ds_conns, (const char *)queue_name, |
|---|
| 307 | | strlen(queue_name), &vcpool); |
|---|
| 308 | | } |
|---|
| 309 | | pthread_mutex_unlock(&ds_conns_lock); |
|---|
| 310 | | if(vcpool != cpool) { |
|---|
| 311 | | /* someone beat us to it */ |
|---|
| 312 | | free(cpool->queue_name); |
|---|
| 313 | | pthread_mutex_destroy(&cpool->lock); |
|---|
| 314 | | pthread_cond_destroy(&cpool->cv); |
|---|
| 315 | | free(cpool); |
|---|
| 316 | | } |
|---|
| 317 | | else { |
|---|
| 318 | | int i; |
|---|
| 319 | | /* Our job to setup the pool */ |
|---|
| 320 | | cpool->jobq = calloc(1, sizeof(*cpool->jobq)); |
|---|
| 321 | | eventer_jobq_init(cpool->jobq, queue_name); |
|---|
| 322 | | cpool->jobq->backq = eventer_default_backq(); |
|---|
| 323 | | /* Add one thread */ |
|---|
| 324 | | for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++) |
|---|
| 325 | | eventer_jobq_increase_concurrency(cpool->jobq); |
|---|
| 326 | | } |
|---|
| 327 | | cpool = vcpool; |
|---|
| 328 | | } |
|---|
| 329 | | return cpool; |
|---|
| 330 | | } |
|---|
| 331 | | static conn_q * |
|---|
| 332 | | get_conn_q_for_remote(const char *remote_str, |
|---|
| 333 | | const char *remote_cn, const char *fqdn, |
|---|
| 334 | | const char *dsn) { |
|---|
| 335 | | conn_pool *cpool; |
|---|
| 336 | | conn_q *cq; |
|---|
| 337 | | cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn); |
|---|
| 338 | | noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(), |
|---|
| 339 | | cpool->queue_name); |
|---|
| 340 | | pthread_mutex_lock(&cpool->lock); |
|---|
| 341 | | again: |
|---|
| 342 | | if(cpool->head) { |
|---|
| 343 | | assert(cpool->in_pool > 0); |
|---|
| 344 | | cq = cpool->head; |
|---|
| 345 | | cpool->head = cq->next; |
|---|
| 346 | | cpool->in_pool--; |
|---|
| 347 | | cpool->outstanding++; |
|---|
| 348 | | cq->next = NULL; |
|---|
| 349 | | pthread_mutex_unlock(&cpool->lock); |
|---|
| 350 | | return cq; |
|---|
| 351 | | } |
|---|
| 352 | | if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) { |
|---|
| 353 | | noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n", |
|---|
| 354 | | (void *)pthread_self(), cpool->queue_name); |
|---|
| 355 | | pthread_cond_wait(&cpool->cv, &cpool->lock); |
|---|
| 356 | | noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n", |
|---|
| 357 | | (void *)pthread_self(), cpool->queue_name); |
|---|
| 358 | | goto again; |
|---|
| 359 | | } |
|---|
| 360 | | else { |
|---|
| 361 | | cpool->outstanding++; |
|---|
| 362 | | pthread_mutex_unlock(&cpool->lock); |
|---|
| 363 | | } |
|---|
| 364 | | |
|---|
| 365 | | cq = calloc(1, sizeof(*cq)); |
|---|
| 366 | | cq->pool = cpool; |
|---|
| 367 | | cq->remote_str = remote_str ? strdup(remote_str) : NULL; |
|---|
| 368 | | cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL; |
|---|
| 369 | | cq->fqdn = fqdn ? strdup(fqdn) : NULL; |
|---|
| 370 | | cq->dsn = dsn ? strdup(dsn) : NULL; |
|---|
| 371 | | return cq; |
|---|
| 372 | | } |
|---|
| 373 | | static conn_q * |
|---|
| 374 | | get_conn_q_for_metanode() { |
|---|
| 375 | | return get_conn_q_for_remote(NULL,NULL,NULL,NULL); |
|---|
| 376 | | } |
|---|
| 377 | | |
|---|
| 378 | | typedef enum { |
|---|
| 379 | | DS_EXEC_SUCCESS = 0, |
|---|
| 380 | | DS_EXEC_ROW_FAILED = 1, |
|---|
| 381 | | DS_EXEC_TXN_FAILED = 2, |
|---|
| 382 | | } execute_outcome_t; |
|---|
| 383 | | |
|---|
| 384 | | #define DECLARE_PARAM_STR(str, len) do { \ |
|---|
| 385 | | d->paramValues[d->nparams] = noit__strndup(str, len); \ |
|---|
| 386 | | d->paramLengths[d->nparams] = len; \ |
|---|
| 387 | | d->paramFormats[d->nparams] = 0; \ |
|---|
| 388 | | d->paramAllocd[d->nparams] = 1; \ |
|---|
| 389 | | if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \ |
|---|
| 390 | | free(d->paramValues[d->nparams]); \ |
|---|
| 391 | | d->paramValues[d->nparams] = NULL; \ |
|---|
| 392 | | d->paramLengths[d->nparams] = 0; \ |
|---|
| 393 | | d->paramAllocd[d->nparams] = 0; \ |
|---|
| 394 | | } \ |
|---|
| 395 | | d->nparams++; \ |
|---|
| 396 | | } while(0) |
|---|
| 397 | | #define DECLARE_PARAM_INT(i) do { \ |
|---|
| 398 | | int buffer__len; \ |
|---|
| 399 | | char buffer__[32]; \ |
|---|
| 400 | | snprintf(buffer__, sizeof(buffer__), "%d", (i)); \ |
|---|
| 401 | | buffer__len = strlen(buffer__); \ |
|---|
| 402 | | DECLARE_PARAM_STR(buffer__, buffer__len); \ |
|---|
| 403 | | } while(0) |
|---|
| 404 | | |
|---|
| 405 | | #define PG_GET_STR_COL(dest, row, name) do { \ |
|---|
| 406 | | int colnum = PQfnumber(d->res, name); \ |
|---|
| 407 | | dest = NULL; \ |
|---|
| 408 | | if (colnum >= 0) \ |
|---|
| 409 | | dest = PQgetisnull(d->res, row, colnum) \ |
|---|
| 410 | | ? NULL : PQgetvalue(d->res, row, colnum); \ |
|---|
| 411 | | } while(0) |
|---|
| 412 | | |
|---|
| 413 | | #define PG_EXEC(cmd) do { \ |
|---|
| 414 | | d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ |
|---|
| 415 | | (const char * const *)d->paramValues, \ |
|---|
| 416 | | d->paramLengths, d->paramFormats, 0); \ |
|---|
| 417 | | d->rv = PQresultStatus(d->res); \ |
|---|
| 418 | | if(d->rv != PGRES_COMMAND_OK && \ |
|---|
| 419 | | d->rv != PGRES_TUPLES_OK) { \ |
|---|
| 420 | | const char *pgerr = PQresultErrorMessage(d->res); \ |
|---|
| 421 | | const char *pgerr_end = strchr(pgerr, '\n'); \ |
|---|
| 422 | | if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \ |
|---|
| 423 | | noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \ |
|---|
| 424 | | cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \ |
|---|
| 425 | | (int)(pgerr_end - pgerr), pgerr); \ |
|---|
| 426 | | PQclear(d->res); \ |
|---|
| 427 | | goto bad_row; \ |
|---|
| 428 | | } \ |
|---|
| 429 | | } while(0) |
|---|
| 430 | | |
|---|
| 431 | | #define PG_TM_EXEC(cmd, whence) do { \ |
|---|
| 432 | | time_t __w = whence; \ |
|---|
| 433 | | char cmdbuf[4096]; \ |
|---|
| 434 | | struct tm tbuf, *tm; \ |
|---|
| 435 | | tm = gmtime_r(&__w, &tbuf); \ |
|---|
| 436 | | strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \ |
|---|
| 437 | | d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \ |
|---|
| 438 | | (const char * const *)d->paramValues, \ |
|---|
| 439 | | d->paramLengths, d->paramFormats, 0); \ |
|---|
| 440 | | d->rv = PQresultStatus(d->res); \ |
|---|
| 441 | | if(d->rv != PGRES_COMMAND_OK && \ |
|---|
| 442 | | d->rv != PGRES_TUPLES_OK) { \ |
|---|
| 443 | | const char *pgerr = PQresultErrorMessage(d->res); \ |
|---|
| 444 | | const char *pgerr_end = strchr(pgerr, '\n'); \ |
|---|
| 445 | | if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \ |
|---|
| 446 | | noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \ |
|---|
| 447 | | __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \ |
|---|
| 448 | | (long long unsigned)whence); \ |
|---|
| 449 | | PQclear(d->res); \ |
|---|
| 450 | | goto bad_row; \ |
|---|
| 451 | | } \ |
|---|
| 452 | | } while(0) |
|---|
| 453 | | |
|---|
| 454 | | static void * |
|---|
| 455 | | stratcon_datastore_check_loadall(void *vsn) { |
|---|
| 456 | | storagenode_info *sn = vsn; |
|---|
| 457 | | ds_single_detail *d; |
|---|
| 458 | | int i, row_count = 0, good = 0; |
|---|
| 459 | | char buff[1024]; |
|---|
| 460 | | conn_q *cq = NULL; |
|---|
| 461 | | |
|---|
| 462 | | d = calloc(1, sizeof(*d)); |
|---|
| 463 | | GET_QUERY(check_loadall); |
|---|
| 464 | | cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn); |
|---|
| 465 | | i = 0; |
|---|
| 466 | | while(stratcon_database_connect(cq)) { |
|---|
| 467 | | if(i++ > 4) { |
|---|
| 468 | | noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn); |
|---|
| 469 | | release_conn_q(cq); |
|---|
| 470 | | return (void *)(vpsized_int)good; |
|---|
| 471 | | } |
|---|
| 472 | | sleep(1); |
|---|
| 473 | | } |
|---|
| 474 | | PG_EXEC(check_loadall); |
|---|
| 475 | | row_count = PQntuples(d->res); |
|---|
| 476 | | |
|---|
| 477 | | for(i=0; i<row_count; i++) { |
|---|
| 478 | | int rv; |
|---|
| 479 | | int8_t family; |
|---|
| 480 | | struct sockaddr *sin; |
|---|
| 481 | | struct sockaddr_in sin4 = { .sin_family = AF_INET }; |
|---|
| 482 | | struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 }; |
|---|
| 483 | | char *remote, *id, *target, *module, *name; |
|---|
| 484 | | PG_GET_STR_COL(remote, i, "remote_address"); |
|---|
| 485 | | PG_GET_STR_COL(id, i, "id"); |
|---|
| 486 | | PG_GET_STR_COL(target, i, "target"); |
|---|
| 487 | | PG_GET_STR_COL(module, i, "module"); |
|---|
| 488 | | PG_GET_STR_COL(name, i, "name"); |
|---|
| 489 | | snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name); |
|---|
| 490 | | |
|---|
| 491 | | family = AF_INET; |
|---|
| 492 | | sin = (struct sockaddr *)&sin4; |
|---|
| 493 | | rv = inet_pton(family, remote, &sin4.sin_addr); |
|---|
| 494 | | if(rv != 1) { |
|---|
| 495 | | family = AF_INET6; |
|---|
| 496 | | sin = (struct sockaddr *)&sin6; |
|---|
| 497 | | rv = inet_pton(family, remote, &sin6.sin6_addr); |
|---|
| 498 | | if(rv != 1) { |
|---|
| 499 | | noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote); |
|---|
| 500 | | sin = NULL; |
|---|
| 501 | | } |
|---|
| 502 | | } |
|---|
| 503 | | |
|---|
| 504 | | /* stratcon_iep_line_processor takes an allocated operand and frees it */ |
|---|
| 505 | | stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL); |
|---|
| 506 | | good++; |
|---|
| 507 | | } |
|---|
| 508 | | noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n", |
|---|
| 509 | | good, row_count, sn->fqdn); |
|---|
| 510 | | bad_row: |
|---|
| 511 | | free_params((ds_single_detail *)d); |
|---|
| 512 | | free(d); |
|---|
| 513 | | if(cq) release_conn_q(cq); |
|---|
| 514 | | return (void *)(vpsized_int)good; |
|---|
| 515 | | } |
|---|
| 516 | | static int |
|---|
| 517 | | stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure, |
|---|
| 518 | | struct timeval *now) { |
|---|
| 519 | | storagenode_info self = { 0, NULL, NULL }, **sns = NULL; |
|---|
| 520 | | pthread_t *jobs = NULL; |
|---|
| 521 | | int nodes, i = 0, tcnt = 0; |
|---|
| 522 | | if(!(mask & EVENTER_ASYNCH_WORK)) return 0; |
|---|
| 523 | | if(mask & EVENTER_ASYNCH_CLEANUP) return 0; |
|---|
| 524 | | |
|---|
| 525 | | pthread_mutex_lock(&storagenode_to_info_cache_lock); |
|---|
| 526 | | nodes = storagenode_to_info_cache.size; |
|---|
| 527 | | jobs = calloc(MAX(1,nodes), sizeof(*jobs)); |
|---|
| 528 | | sns = calloc(MAX(1,nodes), sizeof(*sns)); |
|---|
| 529 | | if(nodes == 0) sns[nodes++] = &self; |
|---|
| 530 | | else { |
|---|
| 531 | | noit_hash_iter iter = NOIT_HASH_ITER_ZERO; |
|---|
| 532 | | const char *k; |
|---|
| 533 | | void *v; |
|---|
| 534 | | int klen; |
|---|
| 535 | | while(noit_hash_next(&storagenode_to_info_cache, |
|---|
| 536 | | &iter, &k, &klen, &v)) { |
|---|
| 537 | | sns[i++] = (storagenode_info *)v; |
|---|
| 538 | | } |
|---|
| 539 | | } |
|---|
| 540 | | pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
|---|
| 541 | | |
|---|
| 542 | | for(i=0; i<nodes; i++) { |
|---|
| 543 | | if(pthread_create(&jobs[i], NULL, |
|---|
| 544 | | stratcon_datastore_check_loadall, sns[i]) != 0) { |
|---|
| 545 | | noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno)); |
|---|
| 546 | | } |
|---|
| 547 | | } |
|---|
| 548 | | for(i=0; i<nodes; i++) { |
|---|
| 549 | | void *good; |
|---|
| 550 | | pthread_join(jobs[i], &good); |
|---|
| 551 | | tcnt += (int)(vpsized_int)good; |
|---|
| 552 | | } |
|---|
| 553 | | noitL(noit_error, "Loaded all %d check states.\n", tcnt); |
|---|
| 554 | | return 0; |
|---|
| 555 | | } |
|---|
| 556 | | void |
|---|
| 557 | | stratcon_datastore_iep_check_preload() { |
|---|
| 558 | | eventer_t e; |
|---|
| 559 | | conn_pool *cpool; |
|---|
| 560 | | |
|---|
| 561 | | cpool = get_conn_pool_for_remote(NULL,NULL,NULL); |
|---|
| 562 | | e = eventer_alloc(); |
|---|
| 563 | | e->mask = EVENTER_ASYNCH; |
|---|
| 564 | | e->callback = stratcon_datastore_asynch_drive_iep; |
|---|
| 565 | | e->closure = NULL; |
|---|
| 566 | | eventer_add_asynch(cpool->jobq, e); |
|---|
| 567 | | } |
|---|
| 568 | | execute_outcome_t |
|---|
| 569 | | stratcon_datastore_find(ds_rt_detail *d) { |
|---|
| 570 | | conn_q *cq; |
|---|
| 571 | | char *val; |
|---|
| 572 | | int row_count; |
|---|
| 573 | | struct realtime_tracker *node; |
|---|
| 574 | | |
|---|
| 575 | | for(node = d->rt; node; node = node->next) { |
|---|
| 576 | | char uuid_str[UUID_STR_LEN+1]; |
|---|
| 577 | | const char *fqdn, *dsn, *remote_cn; |
|---|
| 578 | | char remote_ip[32]; |
|---|
| 579 | | int storagenode_id; |
|---|
| 580 | | |
|---|
| 581 | | uuid_unparse_lower(node->checkid, uuid_str); |
|---|
| 582 | | if(storage_node_quick_lookup(uuid_str, NULL, &node->sid, |
|---|
| 583 | | &storagenode_id, &remote_cn, &fqdn, &dsn)) |
|---|
| 584 | | continue; |
|---|
| 585 | | |
|---|
| 586 | | noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n", |
|---|
| 587 | | node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)"); |
|---|
| 588 | | |
|---|
| 589 | | /* We might be able to find the IP from our config if someone has |
|---|
| 590 | | * specified the expected cn in the noit definition. |
|---|
| 591 | | */ |
|---|
| 592 | | if(stratcon_find_noit_ip_by_cn(remote_cn, |
|---|
| 593 | | remote_ip, sizeof(remote_ip)) == 0) { |
|---|
| 594 | | node->noit = strdup(remote_ip); |
|---|
| 595 | | noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit); |
|---|
| 596 | | continue; |
|---|
| 597 | | } |
|---|
| 598 | | |
|---|
| 599 | | cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn); |
|---|
| 600 | | stratcon_database_connect(cq); |
|---|
| 601 | | |
|---|
| 602 | | GET_QUERY(check_find); |
|---|
| 603 | | DECLARE_PARAM_INT(node->sid); |
|---|
| 604 | | PG_EXEC(check_find); |
|---|
| 605 | | row_count = PQntuples(d->res); |
|---|
| 606 | | if(row_count != 1) { |
|---|
| 607 | | noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid); |
|---|
| 608 | | PQclear(d->res); |
|---|
| 609 | | goto bad_row; |
|---|
| 610 | | } |
|---|
| 611 | | |
|---|
| 612 | | /* Get the remote_address (which noit owns this) */ |
|---|
| 613 | | PG_GET_STR_COL(val, 0, "remote_address"); |
|---|
| 614 | | if(!val) { |
|---|
| 615 | | noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn); |
|---|
| 616 | | PQclear(d->res); |
|---|
| 617 | | goto bad_row; |
|---|
| 618 | | } |
|---|
| 619 | | node->noit = strdup(val); |
|---|
| 620 | | noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit); |
|---|
| 621 | | bad_row: |
|---|
| 622 | | free_params((ds_single_detail *)d); |
|---|
| 623 | | d->nparams = 0; |
|---|
| 624 | | release_conn_q(cq); |
|---|
| 625 | | } |
|---|
| 626 | | return DS_EXEC_SUCCESS; |
|---|
| 627 | | } |
|---|
| 628 | | execute_outcome_t |
|---|
| 629 | | stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn, |
|---|
| 630 | | ds_line_detail *d) { |
|---|
| 631 | | int type, len, sid; |
|---|
| 632 | | char *final_buff; |
|---|
| 633 | | uLong final_len, actual_final_len; |
|---|
| 634 | | char *token; |
|---|
| 635 | | char raddr_blank[1] = ""; |
|---|
| 636 | | const char *raddr; |
|---|
| 637 | | |
|---|
| 638 | | type = d->data[0]; |
|---|
| 639 | | raddr = r ? r : raddr_blank; |
|---|
| 640 | | |
|---|
| 641 | | /* Parse the log line, but only if we haven't already */ |
|---|
| 642 | | if(!d->nparams) { |
|---|
| 643 | | char *scp, *ecp; |
|---|
| 644 | | |
|---|
| 645 | | scp = d->data; |
|---|
| 646 | | #define PROCESS_NEXT_FIELD(t,l) do { \ |
|---|
| 647 | | if(!*scp) goto bad_row; \ |
|---|
| 648 | | ecp = strchr(scp, '\t'); \ |
|---|
| 649 | | if(!ecp) goto bad_row; \ |
|---|
| 650 | | token = scp; \ |
|---|
| 651 | | len = (ecp-scp); \ |
|---|
| 652 | | scp = ecp + 1; \ |
|---|
| 653 | | } while(0) |
|---|
| 654 | | #define PROCESS_LAST_FIELD(t,l) do { \ |
|---|
| 655 | | if(!*scp) ecp = scp; \ |
|---|
| 656 | | else { \ |
|---|
| 657 | | ecp = scp + strlen(scp); /* Puts us at the '\0' */ \ |
|---|
| 658 | | if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \ |
|---|
| 659 | | } \ |
|---|
| 660 | | t = scp; \ |
|---|
| 661 | | l = (ecp-scp); \ |
|---|
| 662 | | } while(0) |
|---|
| 663 | | |
|---|
| 664 | | PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */ |
|---|
| 665 | | switch(type) { |
|---|
| 666 | | /* See noit_check_log.c for log description */ |
|---|
| 667 | | case 'n': |
|---|
| 668 | | DECLARE_PARAM_STR(raddr, strlen(raddr)); |
|---|
| 669 | | DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); |
|---|
| 670 | | DECLARE_PARAM_STR("noitd",5); /* node_type */ |
|---|
| 671 | | PROCESS_NEXT_FIELD(token,len); |
|---|
| 672 | | d->whence = (time_t)strtoul(token, NULL, 10); |
|---|
| 673 | | DECLARE_PARAM_STR(token,len); /* timestamp */ |
|---|
| 674 | | |
|---|
| 675 | | /* This is the expected uncompressed len */ |
|---|
| 676 | | PROCESS_NEXT_FIELD(token,len); |
|---|
| 677 | | final_len = atoi(token); |
|---|
| 678 | | final_buff = malloc(final_len); |
|---|
| 679 | | if(!final_buff) goto bad_row; |
|---|
| 680 | | |
|---|
| 681 | | /* The last token is b64 endoded and compressed. |
|---|
| 682 | | * we need to decode it, declare it and then free it. |
|---|
| 683 | | */ |
|---|
| 684 | | PROCESS_LAST_FIELD(token, len); |
|---|
| 685 | | /* We can in-place decode this */ |
|---|
| 686 | | len = noit_b64_decode((char *)token, len, |
|---|
| 687 | | (unsigned char *)token, len); |
|---|
| 688 | | if(len <= 0) { |
|---|
| 689 | | noitL(noit_error, "noitd config base64 decoding error.\n"); |
|---|
| 690 | | free(final_buff); |
|---|
| 691 | | goto bad_row; |
|---|
| 692 | | } |
|---|
| 693 | | actual_final_len = final_len; |
|---|
| 694 | | if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len, |
|---|
| 695 | | (unsigned char *)token, len)) { |
|---|
| 696 | | noitL(noit_error, "noitd config decompression failure.\n"); |
|---|
| 697 | | free(final_buff); |
|---|
| 698 | | goto bad_row; |
|---|
| 699 | | } |
|---|
| 700 | | if(final_len != actual_final_len) { |
|---|
| 701 | | noitL(noit_error, "noitd config decompression error.\n"); |
|---|
| 702 | | free(final_buff); |
|---|
| 703 | | goto bad_row; |
|---|
| 704 | | } |
|---|
| 705 | | DECLARE_PARAM_STR(final_buff, final_len); |
|---|
| 706 | | free(final_buff); |
|---|
| 707 | | break; |
|---|
| 708 | | case 'C': |
|---|
| 709 | | DECLARE_PARAM_STR(raddr, strlen(raddr)); |
|---|
| 710 | | PROCESS_NEXT_FIELD(token,len); |
|---|
| 711 | | DECLARE_PARAM_STR(token,len); /* timestamp */ |
|---|
| 712 | | d->whence = (time_t)strtoul(token, NULL, 10); |
|---|
| 713 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 714 | | /* uuid is last 36 bytes */ |
|---|
| 715 | | if(len > 36) { token += (len-36); len = 36; } |
|---|
| 716 | | sid = uuid_to_sid(token, remote_cn); |
|---|
| 717 | | if(sid == 0) goto bad_row; |
|---|
| 718 | | DECLARE_PARAM_INT(sid); /* sid */ |
|---|
| 719 | | DECLARE_PARAM_STR(token,len); /* uuid */ |
|---|
| 720 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 721 | | DECLARE_PARAM_STR(token,len); /* target */ |
|---|
| 722 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 723 | | DECLARE_PARAM_STR(token,len); /* module */ |
|---|
| 724 | | PROCESS_LAST_FIELD(token, len); |
|---|
| 725 | | DECLARE_PARAM_STR(token,len); /* name */ |
|---|
| 726 | | break; |
|---|
| 727 | | case 'M': |
|---|
| 728 | | PROCESS_NEXT_FIELD(token,len); |
|---|
| 729 | | DECLARE_PARAM_STR(token,len); /* timestamp */ |
|---|
| 730 | | d->whence = (time_t)strtoul(token, NULL, 10); |
|---|
| 731 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 732 | | /* uuid is last 36 bytes */ |
|---|
| 733 | | if(len > 36) { token += (len-36); len = 36; } |
|---|
| 734 | | sid = uuid_to_sid(token, remote_cn); |
|---|
| 735 | | if(sid == 0) goto bad_row; |
|---|
| 736 | | DECLARE_PARAM_INT(sid); /* sid */ |
|---|
| 737 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 738 | | DECLARE_PARAM_STR(token,len); /* name */ |
|---|
| 739 | | PROCESS_NEXT_FIELD(token,len); |
|---|
| 740 | | d->metric_type = *token; |
|---|
| 741 | | PROCESS_LAST_FIELD(token,len); |
|---|
| 742 | | DECLARE_PARAM_STR(token,len); /* value */ |
|---|
| 743 | | break; |
|---|
| 744 | | case 'S': |
|---|
| 745 | | PROCESS_NEXT_FIELD(token,len); |
|---|
| 746 | | DECLARE_PARAM_STR(token,len); /* timestamp */ |
|---|
| 747 | | d->whence = (time_t)strtoul(token, NULL, 10); |
|---|
| 748 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 749 | | /* uuid is last 36 bytes */ |
|---|
| 750 | | if(len > 36) { token += (len-36); len = 36; } |
|---|
| 751 | | sid = uuid_to_sid(token, remote_cn); |
|---|
| 752 | | if(sid == 0) goto bad_row; |
|---|
| 753 | | DECLARE_PARAM_INT(sid); /* sid */ |
|---|
| 754 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 755 | | DECLARE_PARAM_STR(token,len); /* state */ |
|---|
| 756 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 757 | | DECLARE_PARAM_STR(token,len); /* availability */ |
|---|
| 758 | | PROCESS_NEXT_FIELD(token, len); |
|---|
| 759 | | DECLARE_PARAM_STR(token,len); /* duration */ |
|---|
| 760 | | PROCESS_LAST_FIELD(token,len); |
|---|
| 761 | | DECLARE_PARAM_STR(token,len); /* status */ |
|---|
| 762 | | break; |
|---|
| 763 | | default: |
|---|
| 764 | | goto bad_row; |
|---|
| 765 | | } |
|---|
| 766 | | |
|---|
| 767 | | } |
|---|
| 768 | | |
|---|
| 769 | | /* Now execute the query */ |
|---|
| 770 | | switch(type) { |
|---|
| 771 | | case 'n': |
|---|
| 772 | | GET_QUERY(config_insert); |
|---|
| 773 | | PG_EXEC(config_insert); |
|---|
| 774 | | PQclear(d->res); |
|---|
| 775 | | break; |
|---|
| 776 | | case 'C': |
|---|
| 777 | | GET_QUERY(check_insert); |
|---|
| 778 | | PG_TM_EXEC(check_insert, d->whence); |
|---|
| 779 | | PQclear(d->res); |
|---|
| 780 | | break; |
|---|
| 781 | | case 'S': |
|---|
| 782 | | GET_QUERY(status_insert); |
|---|
| 783 | | PG_TM_EXEC(status_insert, d->whence); |
|---|
| 784 | | PQclear(d->res); |
|---|
| 785 | | break; |
|---|
| 786 | | case 'M': |
|---|
| 787 | | switch(d->metric_type) { |
|---|
| 788 | | case METRIC_INT32: |
|---|
| 789 | | case METRIC_UINT32: |
|---|
| 790 | | case METRIC_INT64: |
|---|
| 791 | | case METRIC_UINT64: |
|---|
| 792 | | case METRIC_DOUBLE: |
|---|
| 793 | | GET_QUERY(metric_insert_numeric); |
|---|
| 794 | | PG_TM_EXEC(metric_insert_numeric, d->whence); |
|---|
| 795 | | PQclear(d->res); |
|---|
| 796 | | break; |
|---|
| 797 | | case METRIC_STRING: |
|---|
| 798 | | GET_QUERY(metric_insert_text); |
|---|
| 799 | | PG_TM_EXEC(metric_insert_text, d->whence); |
|---|
| 800 | | PQclear(d->res); |
|---|
| 801 | | break; |
|---|
| 802 | | default: |
|---|
| 803 | | goto bad_row; |
|---|
| 804 | | } |
|---|
| 805 | | break; |
|---|
| 806 | | default: |
|---|
| 807 | | /* should never get here */ |
|---|
| 808 | | goto bad_row; |
|---|
| 809 | | } |
|---|
| 810 | | return DS_EXEC_SUCCESS; |
|---|
| 811 | | bad_row: |
|---|
| 812 | | return DS_EXEC_ROW_FAILED; |
|---|
| 813 | | } |
|---|
| 814 | | static int |
|---|
| 815 | | stratcon_database_post_connect(conn_q *cq) { |
|---|
| 816 | | int rv = 0; |
|---|
| 817 | | ds_single_detail _d = { 0 }, *d = &_d; |
|---|
| 818 | | if(cq->fqdn) { |
|---|
| 819 | | char *remote_str, *remote_cn; |
|---|
| 820 | | /* This is the silly way we get null's in through our declare_param_str */ |
|---|
| 821 | | remote_str = cq->remote_str ? cq->remote_str : "[[null]]"; |
|---|
| 822 | | remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]"; |
|---|
| 823 | | /* This is a storage node, it gets the storage node post_connect */ |
|---|
| 824 | | GET_QUERY(storage_post_connect); |
|---|
| 825 | | rv = -1; /* now we're serious */ |
|---|
| 826 | | DECLARE_PARAM_STR(remote_str, strlen(remote_str)); |
|---|
| 827 | | DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); |
|---|
| 828 | | PG_EXEC(storage_post_connect); |
|---|
| 829 | | PQclear(d->res); |
|---|
| 830 | | rv = 0; |
|---|
| 831 | | } |
|---|
| 832 | | else { |
|---|
| 833 | | /* Metanode post_connect */ |
|---|
| 834 | | GET_QUERY(metanode_post_connect); |
|---|
| 835 | | rv = -1; /* now we're serious */ |
|---|
| 836 | | PG_EXEC(metanode_post_connect); |
|---|
| 837 | | PQclear(d->res); |
|---|
| 838 | | rv = 0; |
|---|
| 839 | | } |
|---|
| 840 | | bad_row: |
|---|
| 841 | | free_params(d); |
|---|
| 842 | | if(rv == -1) { |
|---|
| 843 | | /* Post-connect intentions are serious and fatal */ |
|---|
| 844 | | PQfinish(cq->dbh); |
|---|
| 845 | | cq->dbh = NULL; |
|---|
| 846 | | } |
|---|
| 847 | | return rv; |
|---|
| 848 | | } |
|---|
| 849 | | static int |
|---|
| 850 | | stratcon_database_connect(conn_q *cq) { |
|---|
| 851 | | char *dsn, dsn_meta[512]; |
|---|
| 852 | | noit_hash_iter iter = NOIT_HASH_ITER_ZERO; |
|---|
| 853 | | const char *k, *v; |
|---|
| 854 | | int klen; |
|---|
| 855 | | noit_hash_table *t; |
|---|
| 856 | | |
|---|
| 857 | | dsn_meta[0] = '\0'; |
|---|
| 858 | | if(!cq->dsn) { |
|---|
| 859 | | t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); |
|---|
| 860 | | while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { |
|---|
| 861 | | if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta)); |
|---|
| 862 | | strlcat(dsn_meta, k, sizeof(dsn_meta)); |
|---|
| 863 | | strlcat(dsn_meta, "=", sizeof(dsn_meta)); |
|---|
| 864 | | strlcat(dsn_meta, v, sizeof(dsn_meta)); |
|---|
| 865 | | } |
|---|
| 866 | | noit_hash_destroy(t, free, free); |
|---|
| 867 | | free(t); |
|---|
| 868 | | dsn = dsn_meta; |
|---|
| 869 | | } |
|---|
| 870 | | else { |
|---|
| 871 | | char options[32]; |
|---|
| 872 | | strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta)); |
|---|
| 873 | | if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user", |
|---|
| 874 | | options, sizeof(options))) { |
|---|
| 875 | | strlcat(dsn_meta, " ", sizeof(dsn_meta)); |
|---|
| 876 | | strlcat(dsn_meta, "user", sizeof(dsn_meta)); |
|---|
| 877 | | strlcat(dsn_meta, "=", sizeof(dsn_meta)); |
|---|
| 878 | | strlcat(dsn_meta, options, sizeof(dsn_meta)); |
|---|
| 879 | | } |
|---|
| 880 | | if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password", |
|---|
| 881 | | options, sizeof(options))) { |
|---|
| 882 | | strlcat(dsn_meta, " ", sizeof(dsn_meta)); |
|---|
| 883 | | strlcat(dsn_meta, "password", sizeof(dsn_meta)); |
|---|
| 884 | | strlcat(dsn_meta, "=", sizeof(dsn_meta)); |
|---|
| 885 | | strlcat(dsn_meta, options, sizeof(dsn_meta)); |
|---|
| 886 | | } |
|---|
| 887 | | dsn = dsn_meta; |
|---|
| 888 | | } |
|---|
| 889 | | |
|---|
| 890 | | if(cq->dbh) { |
|---|
| 891 | | if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
|---|
| 892 | | PQreset(cq->dbh); |
|---|
| 893 | | if(PQstatus(cq->dbh) != CONNECTION_OK) { |
|---|
| 894 | | noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", |
|---|
| 895 | | dsn, PQerrorMessage(cq->dbh)); |
|---|
| 896 | | return -1; |
|---|
| 897 | | } |
|---|
| 898 | | if(stratcon_database_post_connect(cq)) return -1; |
|---|
| 899 | | if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
|---|
| 900 | | noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", |
|---|
| 901 | | dsn, PQerrorMessage(cq->dbh)); |
|---|
| 902 | | return -1; |
|---|
| 903 | | } |
|---|
| 904 | | |
|---|
| 905 | | cq->dbh = PQconnectdb(dsn); |
|---|
| 906 | | if(!cq->dbh) return -1; |
|---|
| 907 | | if(PQstatus(cq->dbh) != CONNECTION_OK) { |
|---|
| 908 | | noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", |
|---|
| 909 | | dsn, PQerrorMessage(cq->dbh)); |
|---|
| 910 | | return -1; |
|---|
| 911 | | } |
|---|
| 912 | | if(stratcon_database_post_connect(cq)) return -1; |
|---|
| 913 | | if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; |
|---|
| 914 | | noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", |
|---|
| 915 | | dsn, PQerrorMessage(cq->dbh)); |
|---|
| 916 | | return -1; |
|---|
| 917 | | } |
|---|
| 918 | | static int |
|---|
| 919 | | stratcon_datastore_savepoint_op(conn_q *cq, const char *p, |
|---|
| 920 | | const char *name) { |
|---|
| 921 | | int rv = -1; |
|---|
| 922 | | PGresult *res; |
|---|
| 923 | | char cmd[128]; |
|---|
| 924 | | strlcpy(cmd, p, sizeof(cmd)); |
|---|
| 925 | | strlcat(cmd, name, sizeof(cmd)); |
|---|
| 926 | | if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; |
|---|
| 927 | | if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; |
|---|
| 928 | | PQclear(res); |
|---|
| 929 | | return rv; |
|---|
| 930 | | } |
|---|
| 931 | | static int |
|---|
| 932 | | stratcon_datastore_do(conn_q *cq, const char *cmd) { |
|---|
| 933 | | PGresult *res; |
|---|
| 934 | | int rv = -1; |
|---|
| 935 | | if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; |
|---|
| 936 | | if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; |
|---|
| 937 | | PQclear(res); |
|---|
| 938 | | return rv; |
|---|
| 939 | | } |
|---|
| 940 | | #define BUSTED(cq) do { \ |
|---|
| 941 | | PQfinish((cq)->dbh); \ |
|---|
| 942 | | (cq)->dbh = NULL; \ |
|---|
| 943 | | goto full_monty; \ |
|---|
| 944 | | } while(0) |
|---|
| 945 | | #define SAVEPOINT(name) do { \ |
|---|
| 946 | | if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \ |
|---|
| 947 | | last_sp = current; \ |
|---|
| 948 | | } while(0) |
|---|
| 949 | | #define ROLLBACK_TO_SAVEPOINT(name) do { \ |
|---|
| 950 | | if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \ |
|---|
| 951 | | BUSTED(cq); \ |
|---|
| 952 | | last_sp = NULL; \ |
|---|
| 953 | | } while(0) |
|---|
| 954 | | #define RELEASE_SAVEPOINT(name) do { \ |
|---|
| 955 | | if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \ |
|---|
| 956 | | BUSTED(cq); \ |
|---|
| 957 | | last_sp = NULL; \ |
|---|
| 958 | | } while(0) |
|---|
| 959 | | int |
|---|
| 960 | | stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, |
|---|
| 961 | | struct timeval *now) { |
|---|
| 962 | | ds_rt_detail *dsjd = closure; |
|---|
| 963 | | if(!(mask & EVENTER_ASYNCH_WORK)) return 0; |
|---|
| 964 | | if(mask & EVENTER_ASYNCH_CLEANUP) return 0; |
|---|
| 965 | | |
|---|
| 966 | | assert(dsjd->rt); |
|---|
| 967 | | stratcon_datastore_find(dsjd); |
|---|
| 968 | | if(dsjd->completion_event) |
|---|
| 969 | | eventer_add(dsjd->completion_event); |
|---|
| 970 | | |
|---|
| 971 | | free_params((ds_single_detail *)dsjd); |
|---|
| 972 | | free(dsjd); |
|---|
| 973 | | return 0; |
|---|
| 974 | | } |
|---|
| 975 | | static const char * |
|---|
| 976 | | get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) { |
|---|
| 977 | | void *vinfo; |
|---|
| 978 | | char *dsn = NULL, *fqdn = NULL; |
|---|
| 979 | | int found = 0; |
|---|
| 980 | | storagenode_info *info = NULL; |
|---|
| 981 | | pthread_mutex_lock(&storagenode_to_info_cache_lock); |
|---|
| 982 | | if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id), |
|---|
| 983 | | &vinfo)) { |
|---|
| 984 | | found = 1; |
|---|
| 985 | | info = vinfo; |
|---|
| 986 | | } |
|---|
| 987 | | pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
|---|
| 988 | | if(found) { |
|---|
| 989 | | if(fqdn_out) *fqdn_out = info->fqdn; |
|---|
| 990 | | return info->dsn; |
|---|
| 991 | | } |
|---|
| 992 | | |
|---|
| 993 | | if(!found && can_use_db) { |
|---|
| 994 | | ds_single_detail *d; |
|---|
| 995 | | conn_q *cq; |
|---|
| 996 | | int row_count; |
|---|
| 997 | | /* Look it up and store it */ |
|---|
| 998 | | d = calloc(1, sizeof(*d)); |
|---|
| 999 | | cq = get_conn_q_for_metanode(); |
|---|
| 1000 | | GET_QUERY(find_storage); |
|---|
| 1001 | | DECLARE_PARAM_INT(id); |
|---|
| 1002 | | PG_EXEC(find_storage); |
|---|
| 1003 | | row_count = PQntuples(d->res); |
|---|
| 1004 | | if(row_count) { |
|---|
| 1005 | | PG_GET_STR_COL(dsn, 0, "dsn"); |
|---|
| 1006 | | PG_GET_STR_COL(fqdn, 0, "fqdn"); |
|---|
| 1007 | | fqdn = fqdn ? strdup(fqdn) : NULL; |
|---|
| 1008 | | dsn = dsn ? strdup(dsn) : NULL; |
|---|
| 1009 | | } |
|---|
| 1010 | | PQclear(d->res); |
|---|
| 1011 | | bad_row: |
|---|
| 1012 | | free_params(d); |
|---|
| 1013 | | free(d); |
|---|
| 1014 | | release_conn_q(cq); |
|---|
| 1015 | | } |
|---|
| 1016 | | if(fqdn) { |
|---|
| 1017 | | info = calloc(1, sizeof(*info)); |
|---|
| 1018 | | info->fqdn = fqdn; |
|---|
| 1019 | | if(fqdn_out) *fqdn_out = info->fqdn; |
|---|
| 1020 | | info->dsn = dsn; |
|---|
| 1021 | | info->storagenode_id = id; |
|---|
| 1022 | | pthread_mutex_lock(&storagenode_to_info_cache_lock); |
|---|
| 1023 | | noit_hash_store(&storagenode_to_info_cache, |
|---|
| 1024 | | (void *)&info->storagenode_id, sizeof(int), info); |
|---|
| 1025 | | pthread_mutex_unlock(&storagenode_to_info_cache_lock); |
|---|
| 1026 | | } |
|---|
| 1027 | | return info ? info->dsn : NULL; |
|---|
| 1028 | | } |
|---|
| 1029 | | static ds_line_detail * |
|---|
| 1030 | | build_insert_batch(interim_journal_t *ij) { |
|---|
| 1031 | | int rv; |
|---|
| 1032 | | off_t len; |
|---|
| 1033 | | const char *buff, *cp, *lcp; |
|---|
| 1034 | | struct stat st; |
|---|
| 1035 | | ds_line_detail *head = NULL, *last = NULL, *next = NULL; |
|---|
| 1036 | | |
|---|
| 1037 | | if(ij->fd < 0) { |
|---|
| 1038 | | ij->fd = open(ij->filename, O_RDONLY); |
|---|
| 1039 | | if(ij->fd < 0) { |
|---|
| 1040 | | noitL(noit_error, "Cannot open interim journal '%s': %s\n", |
|---|
| 1041 | | ij->filename, strerror(errno)); |
|---|
| 1042 | | assert(ij->fd >= 0); |
|---|
| 1043 | | } |
|---|
| 1044 | | } |
|---|
| 1045 | | while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR); |
|---|
| 1046 | | if(rv == -1) { |
|---|
| 1047 | | noitL(noit_error, "Cannot stat interim journal '%s': %s\n", |
|---|
| 1048 | | ij->filename, strerror(errno)); |
|---|
| 1049 | | assert(rv != -1); |
|---|
| 1050 | | } |
|---|
| 1051 | | len = st.st_size; |
|---|
| 1052 | | if(len > 0) { |
|---|
| 1053 | | buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0); |
|---|
| 1054 | | if(buff == (void *)-1) { |
|---|
| 1055 | | noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd, |
|---|
| 1056 | | ij->filename, strerror(errno)); |
|---|
| 1057 | | assert(buff != (void *)-1); |
|---|
| 1058 | | } |
|---|
| 1059 | | lcp = buff; |
|---|
| 1060 | | while(lcp < (buff + len) && |
|---|
| 1061 | | NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) { |
|---|
| 1062 | | next = calloc(1, sizeof(*next)); |
|---|
| 1063 | | next->data = malloc(cp - lcp + 1); |
|---|
| 1064 | | memcpy(next->data, lcp, cp - lcp); |
|---|
| 1065 | | next->data[cp - lcp] = '\0'; |
|---|
| 1066 | | if(!head) head = next; |
|---|
| 1067 | | if(last) last->next = next; |
|---|
| 1068 | | last = next; |
|---|
| 1069 | | lcp = cp + 1; |
|---|
| 1070 | | } |
|---|
| 1071 | | munmap((void *)buff, len); |
|---|
| 1072 | | } |
|---|
| 1073 | | close(ij->fd); |
|---|
| 1074 | | return head; |
|---|
| 1075 | | } |
|---|
| 1076 | | static void |
|---|
| 1077 | | interim_journal_remove(interim_journal_t *ij) { |
|---|
| 1078 | | unlink(ij->filename); |
|---|
| | 87 | interim_journal_free(void *vij) { |
|---|
| | 88 | interim_journal_t *ij = vij; |
|---|
| 1602 | | static void |
|---|
| 1603 | | stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn, |
|---|
| 1604 | | char *id_str, char *file) { |
|---|
| 1605 | | char path[PATH_MAX]; |
|---|
| 1606 | | interim_journal_t *ij; |
|---|
| 1607 | | eventer_t ingest; |
|---|
| 1608 | | |
|---|
| 1609 | | snprintf(path, sizeof(path), "%s/%s/%s/%s/%s", |
|---|
| 1610 | | basejpath, remote_str, remote_cn, id_str, file); |
|---|
| 1611 | | ij = calloc(1, sizeof(*ij)); |
|---|
| 1612 | | ij->fd = open(path, O_RDONLY); |
|---|
| 1613 | | if(ij->fd < 0) { |
|---|
| 1614 | | noitL(noit_error, "cannot open journal '%s': %s\n", |
|---|
| 1615 | | path, strerror(errno)); |
|---|
| 1616 | | free(ij); |
|---|
| 1617 | | return; |
|---|
| 1618 | | } |
|---|
| 1619 | | close(ij->fd); |
|---|
| 1620 | | ij->fd = -1; |
|---|
| 1621 | | ij->filename = strdup(path); |
|---|
| 1622 | | ij->remote_str = strdup(remote_str); |
|---|
| 1623 | | ij->remote_cn = strdup(remote_cn); |
|---|
| 1624 | | ij->storagenode_id = atoi(id_str); |
|---|
| 1625 | | ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, |
|---|
| 1626 | | ij->fqdn); |
|---|
| 1627 | | noitL(noit_error, "ingesting old payload: %s\n", ij->filename); |
|---|
| 1628 | | ingest = eventer_alloc(); |
|---|
| 1629 | | ingest->mask = EVENTER_ASYNCH; |
|---|
| 1630 | | ingest->callback = stratcon_datastore_asynch_execute; |
|---|
| 1631 | | ingest->closure = ij; |
|---|
| 1632 | | eventer_add_asynch(ij->cpool->jobq, ingest); |
|---|
| 1633 | | } |
|---|
| 1634 | | static void |
|---|
| 1635 | | stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) { |
|---|
| 1636 | | char path[PATH_MAX]; |
|---|
| 1637 | | DIR *root; |
|---|
| 1638 | | struct dirent *de, *entry; |
|---|
| 1639 | | int i = 0, cnt = 0; |
|---|
| 1640 | | char **entries; |
|---|
| 1641 | | int size = 0; |
|---|
| 1642 | | |
|---|
| 1643 | | snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, |
|---|
| 1644 | | first ? "/" : "", first ? first : "", |
|---|
| 1645 | | second ? "/" : "", second ? second : "", |
|---|
| 1646 | | third ? "/" : "", third ? third : ""); |
|---|
| 1647 | | #ifdef _PC_NAME_MAX |
|---|
| 1648 | | size = pathconf(path, _PC_NAME_MAX); |
|---|
| 1649 | | #endif |
|---|
| 1650 | | size = MIN(size, PATH_MAX + 128); |
|---|
| 1651 | | de = alloca(size); |
|---|
| 1652 | | root = opendir(path); |
|---|
| 1653 | | if(!root) return; |
|---|
| 1654 | | while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++; |
|---|
| 1655 | | closedir(root); |
|---|
| 1656 | | root = opendir(path); |
|---|
| 1657 | | if(!root) return; |
|---|
| 1658 | | entries = malloc(sizeof(*entries) * cnt); |
|---|
| 1659 | | while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) { |
|---|
| 1660 | | if(i < cnt) { |
|---|
| 1661 | | entries[i++] = strdup(entry->d_name); |
|---|
| 1662 | | } |
|---|
| 1663 | | } |
|---|
| 1664 | | closedir(root); |
|---|
| 1665 | | cnt = i; /* could have changed, directories are fickle */ |
|---|
| 1666 | | qsort(entries, i, sizeof(*entries), |
|---|
| 1667 | | (int (*)(const void *, const void *))strcasecmp); |
|---|
| 1668 | | for(i=0; i<cnt; i++) { |
|---|
| 1669 | | if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; |
|---|
| 1670 | | noitL(ds_deb, "Processing L%d entry '%s'\n", |
|---|
| 1671 | | third ? 4 : second ? 3 : first ? 2 : 1, entries[i]); |
|---|
| 1672 | | if(!first) |
|---|
| 1673 | | stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL); |
|---|
| 1674 | | else if(!second) |
|---|
| 1675 | | stratcon_datastore_sweep_journals_int(first, entries[i], NULL); |
|---|
| 1676 | | else if(!third) |
|---|
| 1677 | | stratcon_datastore_sweep_journals_int(first, second, entries[i]); |
|---|
| 1678 | | else if(strlen(entries[i]) == 16) |
|---|
| 1679 | | stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]); |
|---|
| 1680 | | } |
|---|
| 1681 | | for(i=0; i<cnt; i++) |
|---|
| 1682 | | free(entries[i]); |
|---|
| 1683 | | free(entries); |
|---|
| 1684 | | } |
|---|
| 1685 | | static void |
|---|
| 1686 | | stratcon_datastore_sweep_journals() { |
|---|
| 1687 | | stratcon_datastore_sweep_journals_int(NULL,NULL,NULL); |
|---|
| 1688 | | } |
|---|
| 1689 | | |
|---|
| 1690 | | int |
|---|
| 1691 | | stratcon_datastore_ingest_all_storagenode_info() { |
|---|
| 1692 | | int i, cnt = 0; |
|---|
| 1693 | | ds_single_detail _d = { 0 }, *d = &_d; |
|---|
| 1694 | | conn_q *cq; |
|---|
| 1695 | | cq = get_conn_q_for_metanode(); |
|---|
| 1696 | | |
|---|
| 1697 | | while(stratcon_database_connect(cq)) { |
|---|
| 1698 | | noitL(noit_error, "Error connecting to database\n"); |
|---|
| 1699 | | sleep(1); |
|---|
| 1700 | | } |
|---|
| 1701 | | |
|---|
| 1702 | | GET_QUERY(all_storage); |
|---|
| 1703 | | PG_EXEC(all_storage); |
|---|
| 1704 | | cnt = PQntuples(d->res); |
|---|
| 1705 | | for(i=0; i<cnt; i++) { |
|---|
| 1706 | | void *vinfo; |
|---|
| 1707 | | char *tmpint, *fqdn, *dsn; |
|---|
| 1708 | | int storagenode_id; |
|---|
| 1709 | | PG_GET_STR_COL(tmpint, i, "storage_node_id"); |
|---|
| 1710 | | storagenode_id = atoi(tmpint); |
|---|
| 1711 | | PG_GET_STR_COL(fqdn, i, "fqdn"); |
|---|
| 1712 | | PG_GET_STR_COL(dsn, i, "dsn"); |
|---|
| 1713 | | PG_GET_STR_COL(tmpint, i, "storage_node_id"); |
|---|
| 1714 | | storagenode_id = tmpint ? atoi(tmpint) : 0; |
|---|
| 1715 | | |
|---|
| 1716 | | if(!noit_hash_retrieve(&storagenode_to_info_cache, |
|---|
| 1717 | | (void *)&storagenode_id, sizeof(int), &vinfo)) { |
|---|
| 1718 | | storagenode_info *info; |
|---|
| 1719 | | info = calloc(1, sizeof(*info)); |
|---|
| 1720 | | info->storagenode_id = storagenode_id; |
|---|
| 1721 | | info->fqdn = fqdn ? strdup(fqdn) : NULL; |
|---|
| 1722 | | info->dsn = dsn ? strdup(dsn) : NULL; |
|---|
| 1723 | | noit_hash_store(&storagenode_to_info_cache, |
|---|
| 1724 | | (void *)&info->storagenode_id, sizeof(int), info); |
|---|
| 1725 | | noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n", |
|---|
| 1726 | | info->storagenode_id, |
|---|
| 1727 | | info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : ""); |
|---|
| 1728 | | } |
|---|
| 1729 | | } |
|---|
| 1730 | | PQclear(d->res); |
|---|
| 1731 | | bad_row: |
|---|
| 1732 | | free_params(d); |
|---|
| 1733 | | |
|---|
| 1734 | | release_conn_q(cq); |
|---|
| 1735 | | noitL(noit_error, "Loaded %d storage nodes\n", cnt); |
|---|
| 1736 | | return cnt; |
|---|
| 1737 | | } |
|---|
| 1738 | | int |
|---|
| 1739 | | stratcon_datastore_ingest_all_check_info() { |
|---|
| 1740 | | int i, cnt, loaded = 0; |
|---|
| 1741 | | ds_single_detail _d = { 0 }, *d = &_d; |
|---|
| 1742 | | conn_q *cq; |
|---|
| 1743 | | cq = get_conn_q_for_metanode(); |
|---|
| 1744 | | |
|---|
| 1745 | | while(stratcon_database_connect(cq)) { |
|---|
| 1746 | | noitL(noit_error, "Error connecting to database\n"); |
|---|
| 1747 | | sleep(1); |
|---|
| 1748 | | } |
|---|
| 1749 | | |
|---|
| 1750 | | GET_QUERY(check_mapall); |
|---|
| 1751 | | PG_EXEC(check_mapall); |
|---|
| 1752 | | cnt = PQntuples(d->res); |
|---|
| 1753 | | for(i=0; i<cnt; i++) { |
|---|
| 1754 | | void *vinfo; |
|---|
| 1755 | | char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn; |
|---|
| 1756 | | int sid, storagenode_id; |
|---|
| 1757 | | uuid_info *uuidinfo; |
|---|
| 1758 | | PG_GET_STR_COL(uuid_str, i, "id"); |
|---|
| 1759 | | if(!uuid_str) continue; |
|---|
| 1760 | | PG_GET_STR_COL(tmpint, i, "sid"); |
|---|
| 1761 | | if(!tmpint) continue; |
|---|
| 1762 | | sid = atoi(tmpint); |
|---|
| 1763 | | PG_GET_STR_COL(fqdn, i, "fqdn"); |
|---|
| 1764 | | PG_GET_STR_COL(dsn, i, "dsn"); |
|---|
| 1765 | | PG_GET_STR_COL(remote_cn, i, "remote_cn"); |
|---|
| 1766 | | PG_GET_STR_COL(tmpint, i, "storage_node_id"); |
|---|
| 1767 | | storagenode_id = tmpint ? atoi(tmpint) : 0; |
|---|
| 1768 | | |
|---|
| 1769 | | uuidinfo = calloc(1, sizeof(*uuidinfo)); |
|---|
| 1770 | | uuidinfo->uuid_str = strdup(uuid_str); |
|---|
| 1771 | | uuidinfo->remote_cn = strdup(remote_cn); |
|---|
| 1772 | | uuidinfo->storagenode_id = storagenode_id; |
|---|
| 1773 | | uuidinfo->sid = sid; |
|---|
| 1774 | | noit_hash_store(&uuid_to_info_cache, |
|---|
| 1775 | | uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); |
|---|
| 1776 | | noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n", |
|---|
| 1777 | | uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id); |
|---|
| 1778 | | loaded++; |
|---|
| 1779 | | if(!noit_hash_retrieve(&storagenode_to_info_cache, |
|---|
| 1780 | | (void *)&storagenode_id, sizeof(int), &vinfo)) { |
|---|
| 1781 | | storagenode_info *info; |
|---|
| 1782 | | info = calloc(1, sizeof(*info)); |
|---|
| 1783 | | info->storagenode_id = storagenode_id; |
|---|
| 1784 | | info->fqdn = fqdn ? strdup(fqdn) : NULL; |
|---|
| 1785 | | info->dsn = dsn ? strdup(dsn) : NULL; |
|---|
| 1786 | | noit_hash_store(&storagenode_to_info_cache, |
|---|
| 1787 | | (void *)&info->storagenode_id, sizeof(int), info); |
|---|
| 1788 | | noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n", |
|---|
| 1789 | | info->storagenode_id, |
|---|
| 1790 | | info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : ""); |
|---|
| 1791 | | } |
|---|
| 1792 | | } |
|---|
| 1793 | | PQclear(d->res); |
|---|
| 1794 | | bad_row: |
|---|
| 1795 | | free_params(d); |
|---|
| 1796 | | |
|---|
| 1797 | | release_conn_q(cq); |
|---|
| 1798 | | noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded); |
|---|
| 1799 | | return loaded; |
|---|
| 1800 | | } |
|---|